Data science Software Course Training in Ameerpet Hyderabad

Data science Software Course Training in Ameerpet Hyderabad

Thursday, 4 May 2017

Spark : CoGroup And Handling Empty Compact Buffers



Co Grouping using Spark:-
-------------------------
scala> branch1.collect.foreach(println)
101,aaaa,40000,m,11
102,bbbbbb,50000,f,12
103,cccc,50000,m,12
104,dd,90000,f,13
105,ee,10000,m,12
106,dkd,40000,m,12
107,sdkfj,80000,f,13
108,iiii,50000,m,11
scala> branch2.collect.foreach(println)
201,Ravi,80000,m,12
202,Varun,90000,m,11
203,Varuna,100000,f,13
204,Vanila,50000,f,12
205,Mani,30000,m,14
206,Manisha,30000,f,14
scala> def toDnoSalPair(line:String)  = {
            val w = line.split(",")
            val dno = w(4).toInt
            val dname = dno match{
              case 11 => "Marketing"
              case 12 => "Hr"
             case 13 => "Finance"
             case _ => "Other"
            }
           val sal = w(2).toInt
          (dname, sal)
      }
toDnoSalPair: (line: String)(String, Int)
scala> toDnoSalPair("101,aaaaa,60000,m,12")
res22: (String, Int) = (Hr,60000)
scala>
scala> val pair1  = branch1.map(x => toDnoSalPair(x))
pair1: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[21] at map at <console>:33
scala> val pair2  = branch2.map(x => toDnoSalPair(x))
pair2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[22] at map at <console>:33
scala> pair1.collect.foreach(println)
(Marketing,40000)
(Hr,50000)
(Hr,50000)
(Finance,90000)
(Hr,10000)
(Hr,40000)
(Finance,80000)
(Marketing,50000)
scala> pair2.collect.foreach(println)
(Hr,80000)
(Marketing,90000)
(Finance,100000)
(Hr,50000)
(Other,30000)
(Other,30000)
scala>
scala> val cg = pair1.cogroup(pair2)
cg: org.apache.spark.rdd.RDD[(String, (Iterable[Int], Iterable[Int]))] = MapPartitionsRDD[24] at cogroup at <console>:39
scala> cg.collect.foreach(println)
(Hr,(CompactBuffer(50000, 50000, 10000, 40000),CompactBuffer(80000, 50000)))
(Other,(CompactBuffer(),CompactBuffer(30000, 30000)))
(Marketing,(CompactBuffer(40000, 50000),CompactBuffer(90000)))
(Finance,(CompactBuffer(90000, 80000),CompactBuffer(100000)))
scala>
scala> val res = cg.map{ x =>
            val dname = x._1
            val cb1 = x._2._1
            val cb2 = x._2._2
            val tot1 = cb1.sum
            val tot2 = cb2.sum
            val tot = tot1+tot2
            (dname,tot1,tot2,tot)
           }
scala> res.collect.foreach(println)
(Hr,150000,130000,280000)
(Other,0,60000,60000)
(Marketing,90000,90000,180000)
(Finance,170000,100000,270000)
from above , sum of empty compact buffer ,
        size of empty compact buffer are zero.
 but we get problem with
    sum/size and  max , min


val res = cg.map{ x =>
            val dname = x._1
            val cb1 = x._2._1
            val cb2 = x._2._2
            val max1 = cb1.max
            val max2 = cb2.max
            (dname,max1,max2)
           }
-- res.collect , can not execute.
   problem with max on empty compact buffer.
 -- same we get for min.
 val res = cg.map{ x =>
            val dname = x._1
            val cb1 = x._2._1
            val cb2 = x._2._2
            val tot1 = cb1.sum
            val tot2 = cb2.sum
            val cnt1 = cb1.size
            val cnt2 = cb2.size
             (dname, (tot1,cnt1), (tot2,cnt2))
           }
-- no problem with sum and size on empty compact buffer.
val res = cg.map{ x =>
            val dname = x._1
            val cb1 = x._2._1
            val cb2 = x._2._2
            val tot1 = cb1.sum
            val tot2 = cb2.sum
            val cnt1 = cb1.size
            val cnt2 = cb2.size
            val avg1 = (tot1/cnt1).toInt
            val avg2 = (tot2/cnt2).toInt
            (dname, avg1, avg2)
           
           }
res.collect  will be failed.
  bcoz, for avg in denominator zero is applied.
Solution:
----------
val res = cg.map{ x =>
            val dname = x._1
            val cb1 = x._2._1
            val cb2 = x._2._2
            val tot1 = cb1.sum
            val tot2 = cb2.sum
            val cnt1 = cb1.size
            val cnt2 = cb2.size
            var max1 = 0
            var min1 = 0
            var avg1 = 0
            if (cnt1!=0){
                  avg1 = tot1/cnt1
                  max1 = cb1.max
                  min1 = cb1.min
            }
            var max2 = 0
            var min2 = 0
            var avg2 = 0
            if (cnt2!=0){
                  avg2 = tot2/cnt2
                  max2 = cb2.max
                  min2 = cb2.min
            }
       (dname,(tot1,cnt1,avg1,max1,min1),
          (tot2,cnt2,avg2,max2,min2))             }
scala> res.collect.foreach(println)
(Hr,(150000,4,37500,50000,10000),(130000,2,65000,80000,50000))
(Other,(0,0,0,0,0),(60000,2,30000,30000,30000))
(Marketing,(90000,2,45000,50000,40000),(90000,1,90000,90000,90000))
(Finance,(170000,2,85000,90000,80000),(100000,1,100000,100000,100000))
-----------------------------
 Cogroup on more than two
scala> val p1 = sc.parallelize(List(("m",10000),("f",30000),("m",50000)))
p1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[30] at parallelize at <console>:27
scala> val p2 = sc.parallelize(List(("m",10000),("f",30000)))
p2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[31] at parallelize at <console>:27
scala> val p3 = sc.parallelize(List(("m",10000),("m",30000)))
p3: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[32] at parallelize at <console>:27
scala> val cg = p1.cogroup(p2,p3)
cg: org.apache.spark.rdd.RDD[(String, (Iterable[Int], Iterable[Int], Iterable[Int]))] = MapPartitionsRDD[34] at cogroup at <console>:33
scala> cg.collect.foreach(println)
(f,(CompactBuffer(30000),CompactBuffer(30000),CompactBuffer()))
(m,(CompactBuffer(10000, 50000),CompactBuffer(10000),CompactBuffer(10000, 30000)))
scala> val r = cg.map{x =>
     |       val sex = x._1
     |       val tot1 = x._2._1.sum
     |       val tot2 = x._2._2.sum
     |       val tot3 = x._2._3.sum
     |       (sex, tot1, tot2, tot3)
     |   }
r: org.apache.spark.rdd.RDD[(String, Int, Int, Int)] = MapPartitionsRDD[35] at map at <console>:37
scala> r.collect.foreach(println)
(f,30000,30000,0)
(m,60000,10000,40000)
scala>


















           







5 comments:

  1. Good Post! Thank you so much for sharing this pretty post, it was so good to read and useful to improve my knowledge as updated one, keep blogging.

    https://www.emexotechnologies.com/online-courses/big-data-hadoop-training-in-electronic-city/

    ReplyDelete
  2. Good Post! Thank you so much for sharing this pretty post, it was so good to read and useful to improve my knowledge as updated one, keep blogging.
    Big Data Hadoop Training in electronic city

    ReplyDelete