Data science Software Course Training in Ameerpet Hyderabad

Data science Software Course Training in Ameerpet Hyderabad

Thursday, 4 May 2017

Spark : Union and Distinct



 Unions in spark.
val l1 = List(10,20,30,40,50)
val l2 = List(100,200,300,400,500)
val r1 = sc.parallelize(l1)
val r2 = sc.parallelize(l2)
val r = r1.union(r2)
scala> r.collect.foreach(println)
[Stage 0:>                                                          (0 + 0                                                                          10  
20
30
40
50
100
200
300
400
500
scala> r.count
res1: Long = 10

spark union allows duplicates.
Using ++ operatory, merging can be done.
scala> val r3 = r1 ++ r2
r3: org.apache.spark.rdd.RDD[Int] = UnionRDD[3] at $plus$plus at <console>:35
scala> r3.collect
res4: Array[Int] = Array(10, 20, 30, 40, 50, 100, 200, 300, 400, 500)
scala>
meging more than two sets.
                     ^
scala> val rr = r1.union(r2).union(rx)
rr: org.apache.spark.rdd.RDD[Int] = UnionRDD[6] at union at <console>:37
scala> rr.count
res5: Long = 13
scala> rr.collect
res6: Array[Int] = Array(10, 20, 30, 40, 50, 100, 200, 300, 400, 500, 15, 25, 35)
scala>// or
scala> val rr = r1 ++ r2 ++ rx
rr: org.apache.spark.rdd.RDD[Int] = UnionRDD[8] at $plus$plus at <console>:37
scala> rr.collect
res7: Array[Int] = Array(10, 20, 30, 40, 50, 100, 200, 300, 400, 500, 15, 25, 35)
scala>
--- eleminate duplicates.
scala> val x = List(10,20,30,40,10,10,20)
x: List[Int] = List(10, 20, 30, 40, 10, 10, 20)
scala> x.distinct
res8: List[Int] = List(10, 20, 30, 40)
scala> val y = sc.parallelize(x)
y: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at <console>:29
scala> r1.collect
res14: Array[Int] = Array(10, 20, 30, 40, 50)
scala> y.collect
res15: Array[Int] = Array(10, 20, 30, 40, 10, 10, 20)
scala> val nodupes = (r1 ++ y).distinct
nodupes: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[13] at distinct at <console>:35
scala> nodupes.collect
[Stage 10:>                                                         (0 + 0                                                                          res16: Array[Int] = Array(30, 50, 40, 20, 10)
scala>
---------------------------------------
[cloudera@quickstart ~]$ hadoop fs -cat spLab/emp
101,aaaa,40000,m,11
102,bbbbbb,50000,f,12
103,cccc,50000,m,12
104,dd,90000,f,13
105,ee,10000,m,12
106,dkd,40000,m,12
107,sdkfj,80000,f,13
108,iiii,50000,m,11
[cloudera@quickstart ~]$ hadoop fs -cat spLab/emp2
201,Ravi,80000,m,12
202,Varun,90000,m,11
203,Varuna,100000,f,13
204,Vanila,50000,f,12
205,Mani,30000,m,14
206,Manisha,30000,f,14
[cloudera@quickstart ~]$
scala> val branch1 = sc.textFile("/user/cloudera/spLab/emp")
branch1: org.apache.spark.rdd.RDD[String] = /user/cloudera/spLab/emp MapPartitionsRDD[15] at textFile at <console>:27
scala> val branch2 = sc.textFile("/user/cloudera/spLab/emp2")
branch2: org.apache.spark.rdd.RDD[String] = /user/cloudera/spLab/emp2 MapPartitionsRDD[17] at textFile at <console>:27
scala> val emp = branch1.union(branch2)
emp: org.apache.spark.rdd.RDD[String] = UnionRDD[18] at union at <console>:31
scala> emp.collect.foreach(println)
scala> emp.collect.foreach(println)
101,aaaa,40000,m,11
102,bbbbbb,50000,f,12
103,cccc,50000,m,12
104,dd,90000,f,13
105,ee,10000,m,12
106,dkd,40000,m,12
107,sdkfj,80000,f,13
108,iiii,50000,m,11
201,Ravi,80000,m,12
202,Varun,90000,m,11
203,Varuna,100000,f,13
204,Vanila,50000,f,12
205,Mani,30000,m,14
206,Manisha,30000,f,14
--------------------------------
 distinct:
  to eleminated duplicates
  based on entire row match.
 limitations: can not eleminated based on some  column(s) match.
   for this solution:
     by iterating compactBuffer.
   [ later we will see ]
grouping aggregation on merged set.
scala> val pair = emp.map{ x =>
     |       val w = x.split(",")
     |       val dno = w(4).toInt
     |       val sal = w(2).toInt
     |      (dno, sal)
     | }
pair: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[19] at map at <console>:35
scala> val eres = pair.reduceByKey(_+_)
eres: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[20] at reduceByKey at <console>:37
scala> eres.collect.foreach(println)
(14,60000)
(12,280000)
(13,270000)
(11,180000)
scala>
-- in this output we dont have seperate total for branch1 and branch2.
 for this solution: COGROUP.
    [NEXT DOCUMENT ]



























1 comment: