Data science Software Course Training in Ameerpet Hyderabad

Data science Software Course Training in Ameerpet Hyderabad

Thursday, 4 May 2017

Spark : CoGroup And Handling Empty Compact Buffers



Co Grouping using Spark:-
-------------------------
scala> branch1.collect.foreach(println)
101,aaaa,40000,m,11
102,bbbbbb,50000,f,12
103,cccc,50000,m,12
104,dd,90000,f,13
105,ee,10000,m,12
106,dkd,40000,m,12
107,sdkfj,80000,f,13
108,iiii,50000,m,11
scala> branch2.collect.foreach(println)
201,Ravi,80000,m,12
202,Varun,90000,m,11
203,Varuna,100000,f,13
204,Vanila,50000,f,12
205,Mani,30000,m,14
206,Manisha,30000,f,14
scala> def toDnoSalPair(line:String)  = {
            val w = line.split(",")
            val dno = w(4).toInt
            val dname = dno match{
              case 11 => "Marketing"
              case 12 => "Hr"
             case 13 => "Finance"
             case _ => "Other"
            }
           val sal = w(2).toInt
          (dname, sal)
      }
toDnoSalPair: (line: String)(String, Int)
scala> toDnoSalPair("101,aaaaa,60000,m,12")
res22: (String, Int) = (Hr,60000)
scala>
scala> val pair1  = branch1.map(x => toDnoSalPair(x))
pair1: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[21] at map at <console>:33
scala> val pair2  = branch2.map(x => toDnoSalPair(x))
pair2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[22] at map at <console>:33
scala> pair1.collect.foreach(println)
(Marketing,40000)
(Hr,50000)
(Hr,50000)
(Finance,90000)
(Hr,10000)
(Hr,40000)
(Finance,80000)
(Marketing,50000)
scala> pair2.collect.foreach(println)
(Hr,80000)
(Marketing,90000)
(Finance,100000)
(Hr,50000)
(Other,30000)
(Other,30000)
scala>
scala> val cg = pair1.cogroup(pair2)
cg: org.apache.spark.rdd.RDD[(String, (Iterable[Int], Iterable[Int]))] = MapPartitionsRDD[24] at cogroup at <console>:39
scala> cg.collect.foreach(println)
(Hr,(CompactBuffer(50000, 50000, 10000, 40000),CompactBuffer(80000, 50000)))
(Other,(CompactBuffer(),CompactBuffer(30000, 30000)))
(Marketing,(CompactBuffer(40000, 50000),CompactBuffer(90000)))
(Finance,(CompactBuffer(90000, 80000),CompactBuffer(100000)))
scala>
scala> val res = cg.map{ x =>
            val dname = x._1
            val cb1 = x._2._1
            val cb2 = x._2._2
            val tot1 = cb1.sum
            val tot2 = cb2.sum
            val tot = tot1+tot2
            (dname,tot1,tot2,tot)
           }
scala> res.collect.foreach(println)
(Hr,150000,130000,280000)
(Other,0,60000,60000)
(Marketing,90000,90000,180000)
(Finance,170000,100000,270000)
from above , sum of empty compact buffer ,
        size of empty compact buffer are zero.
 but we get problem with
    sum/size and  max , min


val res = cg.map{ x =>
            val dname = x._1
            val cb1 = x._2._1
            val cb2 = x._2._2
            val max1 = cb1.max
            val max2 = cb2.max
            (dname,max1,max2)
           }
-- res.collect , can not execute.
   problem with max on empty compact buffer.
 -- same we get for min.
 val res = cg.map{ x =>
            val dname = x._1
            val cb1 = x._2._1
            val cb2 = x._2._2
            val tot1 = cb1.sum
            val tot2 = cb2.sum
            val cnt1 = cb1.size
            val cnt2 = cb2.size
             (dname, (tot1,cnt1), (tot2,cnt2))
           }
-- no problem with sum and size on empty compact buffer.
val res = cg.map{ x =>
            val dname = x._1
            val cb1 = x._2._1
            val cb2 = x._2._2
            val tot1 = cb1.sum
            val tot2 = cb2.sum
            val cnt1 = cb1.size
            val cnt2 = cb2.size
            val avg1 = (tot1/cnt1).toInt
            val avg2 = (tot2/cnt2).toInt
            (dname, avg1, avg2)
           
           }
res.collect  will be failed.
  bcoz, for avg in denominator zero is applied.
Solution:
----------
val res = cg.map{ x =>
            val dname = x._1
            val cb1 = x._2._1
            val cb2 = x._2._2
            val tot1 = cb1.sum
            val tot2 = cb2.sum
            val cnt1 = cb1.size
            val cnt2 = cb2.size
            var max1 = 0
            var min1 = 0
            var avg1 = 0
            if (cnt1!=0){
                  avg1 = tot1/cnt1
                  max1 = cb1.max
                  min1 = cb1.min
            }
            var max2 = 0
            var min2 = 0
            var avg2 = 0
            if (cnt2!=0){
                  avg2 = tot2/cnt2
                  max2 = cb2.max
                  min2 = cb2.min
            }
       (dname,(tot1,cnt1,avg1,max1,min1),
          (tot2,cnt2,avg2,max2,min2))             }
scala> res.collect.foreach(println)
(Hr,(150000,4,37500,50000,10000),(130000,2,65000,80000,50000))
(Other,(0,0,0,0,0),(60000,2,30000,30000,30000))
(Marketing,(90000,2,45000,50000,40000),(90000,1,90000,90000,90000))
(Finance,(170000,2,85000,90000,80000),(100000,1,100000,100000,100000))
-----------------------------
 Cogroup on more than two
scala> val p1 = sc.parallelize(List(("m",10000),("f",30000),("m",50000)))
p1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[30] at parallelize at <console>:27
scala> val p2 = sc.parallelize(List(("m",10000),("f",30000)))
p2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[31] at parallelize at <console>:27
scala> val p3 = sc.parallelize(List(("m",10000),("m",30000)))
p3: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[32] at parallelize at <console>:27
scala> val cg = p1.cogroup(p2,p3)
cg: org.apache.spark.rdd.RDD[(String, (Iterable[Int], Iterable[Int], Iterable[Int]))] = MapPartitionsRDD[34] at cogroup at <console>:33
scala> cg.collect.foreach(println)
(f,(CompactBuffer(30000),CompactBuffer(30000),CompactBuffer()))
(m,(CompactBuffer(10000, 50000),CompactBuffer(10000),CompactBuffer(10000, 30000)))
scala> val r = cg.map{x =>
     |       val sex = x._1
     |       val tot1 = x._2._1.sum
     |       val tot2 = x._2._2.sum
     |       val tot3 = x._2._3.sum
     |       (sex, tot1, tot2, tot3)
     |   }
r: org.apache.spark.rdd.RDD[(String, Int, Int, Int)] = MapPartitionsRDD[35] at map at <console>:37
scala> r.collect.foreach(println)
(f,30000,30000,0)
(m,60000,10000,40000)
scala>


















           







3 comments: