Co Grouping using Spark:-
-------------------------
scala> branch1.collect.foreach(println)
101,aaaa,40000,m,11
102,bbbbbb,50000,f,12
103,cccc,50000,m,12
104,dd,90000,f,13
105,ee,10000,m,12
106,dkd,40000,m,12
107,sdkfj,80000,f,13
108,iiii,50000,m,11
scala> branch2.collect.foreach(println)
201,Ravi,80000,m,12
202,Varun,90000,m,11
203,Varuna,100000,f,13
204,Vanila,50000,f,12
205,Mani,30000,m,14
206,Manisha,30000,f,14
scala> def toDnoSalPair(line:String) = {
val w = line.split(",")
val dno = w(4).toInt
val dname = dno match{
case 11 => "Marketing"
case 12 => "Hr"
case 13 => "Finance"
case _ => "Other"
}
val sal = w(2).toInt
(dname, sal)
}
toDnoSalPair: (line: String)(String, Int)
scala> toDnoSalPair("101,aaaaa,60000,m,12")
res22: (String, Int) = (Hr,60000)
scala>
scala> val pair1 = branch1.map(x => toDnoSalPair(x))
pair1: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[21] at map at <console>:33
scala> val pair2 = branch2.map(x => toDnoSalPair(x))
pair2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[22] at map at <console>:33
scala> pair1.collect.foreach(println)
(Marketing,40000)
(Hr,50000)
(Hr,50000)
(Finance,90000)
(Hr,10000)
(Hr,40000)
(Finance,80000)
(Marketing,50000)
scala> pair2.collect.foreach(println)
(Hr,80000)
(Marketing,90000)
(Finance,100000)
(Hr,50000)
(Other,30000)
(Other,30000)
scala>
scala> val cg = pair1.cogroup(pair2)
cg: org.apache.spark.rdd.RDD[(String, (Iterable[Int], Iterable[Int]))] = MapPartitionsRDD[24] at cogroup at <console>:39
scala> cg.collect.foreach(println)
(Hr,(CompactBuffer(50000, 50000, 10000, 40000),CompactBuffer(80000, 50000)))
(Other,(CompactBuffer(),CompactBuffer(30000, 30000)))
(Marketing,(CompactBuffer(40000, 50000),CompactBuffer(90000)))
(Finance,(CompactBuffer(90000, 80000),CompactBuffer(100000)))
scala>
scala> val res = cg.map{ x =>
val dname = x._1
val cb1 = x._2._1
val cb2 = x._2._2
val tot1 = cb1.sum
val tot2 = cb2.sum
val tot = tot1+tot2
(dname,tot1,tot2,tot)
}
scala> res.collect.foreach(println)
(Hr,150000,130000,280000)
(Other,0,60000,60000)
(Marketing,90000,90000,180000)
(Finance,170000,100000,270000)
from above , sum of empty compact buffer ,
size of empty compact buffer are zero.
but we get problem with
sum/size and max , min
val res = cg.map{ x =>
val dname = x._1
val cb1 = x._2._1
val cb2 = x._2._2
val max1 = cb1.max
val max2 = cb2.max
(dname,max1,max2)
}
-- res.collect , can not execute.
problem with max on empty compact buffer.
-- same we get for min.
val res = cg.map{ x =>
val dname = x._1
val cb1 = x._2._1
val cb2 = x._2._2
val tot1 = cb1.sum
val tot2 = cb2.sum
val cnt1 = cb1.size
val cnt2 = cb2.size
(dname, (tot1,cnt1), (tot2,cnt2))
}
-- no problem with sum and size on empty compact buffer.
val res = cg.map{ x =>
val dname = x._1
val cb1 = x._2._1
val cb2 = x._2._2
val tot1 = cb1.sum
val tot2 = cb2.sum
val cnt1 = cb1.size
val cnt2 = cb2.size
val avg1 = (tot1/cnt1).toInt
val avg2 = (tot2/cnt2).toInt
(dname, avg1, avg2)
}
res.collect will be failed.
bcoz, for avg in denominator zero is applied.
Solution:
----------
val res = cg.map{ x =>
val dname = x._1
val cb1 = x._2._1
val cb2 = x._2._2
val tot1 = cb1.sum
val tot2 = cb2.sum
val cnt1 = cb1.size
val cnt2 = cb2.size
var max1 = 0
var min1 = 0
var avg1 = 0
if (cnt1!=0){
avg1 = tot1/cnt1
max1 = cb1.max
min1 = cb1.min
}
var max2 = 0
var min2 = 0
var avg2 = 0
if (cnt2!=0){
avg2 = tot2/cnt2
max2 = cb2.max
min2 = cb2.min
}
(dname,(tot1,cnt1,avg1,max1,min1),
(tot2,cnt2,avg2,max2,min2)) }
scala> res.collect.foreach(println)
(Hr,(150000,4,37500,50000,10000),(130000,2,65000,80000,50000))
(Other,(0,0,0,0,0),(60000,2,30000,30000,30000))
(Marketing,(90000,2,45000,50000,40000),(90000,1,90000,90000,90000))
(Finance,(170000,2,85000,90000,80000),(100000,1,100000,100000,100000))
-----------------------------
Cogroup on more than two
scala> val p1 = sc.parallelize(List(("m",10000),("f",30000),("m",50000)))
p1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[30] at parallelize at <console>:27
scala> val p2 = sc.parallelize(List(("m",10000),("f",30000)))
p2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[31] at parallelize at <console>:27
scala> val p3 = sc.parallelize(List(("m",10000),("m",30000)))
p3: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[32] at parallelize at <console>:27
scala> val cg = p1.cogroup(p2,p3)
cg: org.apache.spark.rdd.RDD[(String, (Iterable[Int], Iterable[Int], Iterable[Int]))] = MapPartitionsRDD[34] at cogroup at <console>:33
scala> cg.collect.foreach(println)
(f,(CompactBuffer(30000),CompactBuffer(30000),CompactBuffer()))
(m,(CompactBuffer(10000, 50000),CompactBuffer(10000),CompactBuffer(10000, 30000)))
scala> val r = cg.map{x =>
| val sex = x._1
| val tot1 = x._2._1.sum
| val tot2 = x._2._2.sum
| val tot3 = x._2._3.sum
| (sex, tot1, tot2, tot3)
| }
r: org.apache.spark.rdd.RDD[(String, Int, Int, Int)] = MapPartitionsRDD[35] at map at <console>:37
scala> r.collect.foreach(println)
(f,30000,30000,0)
(m,60000,10000,40000)
scala>
No comments:
Post a Comment