Data science Software Course Training in Ameerpet Hyderabad

Data science Software Course Training in Ameerpet Hyderabad

Thursday, 4 May 2017

Spark : Union and Distinct



 Unions in spark.
val l1 = List(10,20,30,40,50)
val l2 = List(100,200,300,400,500)
val r1 = sc.parallelize(l1)
val r2 = sc.parallelize(l2)
val r = r1.union(r2)
scala> r.collect.foreach(println)
[Stage 0:>                                                          (0 + 0                                                                          10  
20
30
40
50
100
200
300
400
500
scala> r.count
res1: Long = 10

spark union allows duplicates.
Using ++ operatory, merging can be done.
scala> val r3 = r1 ++ r2
r3: org.apache.spark.rdd.RDD[Int] = UnionRDD[3] at $plus$plus at <console>:35
scala> r3.collect
res4: Array[Int] = Array(10, 20, 30, 40, 50, 100, 200, 300, 400, 500)
scala>
meging more than two sets.
                     ^
scala> val rr = r1.union(r2).union(rx)
rr: org.apache.spark.rdd.RDD[Int] = UnionRDD[6] at union at <console>:37
scala> rr.count
res5: Long = 13
scala> rr.collect
res6: Array[Int] = Array(10, 20, 30, 40, 50, 100, 200, 300, 400, 500, 15, 25, 35)
scala>// or
scala> val rr = r1 ++ r2 ++ rx
rr: org.apache.spark.rdd.RDD[Int] = UnionRDD[8] at $plus$plus at <console>:37
scala> rr.collect
res7: Array[Int] = Array(10, 20, 30, 40, 50, 100, 200, 300, 400, 500, 15, 25, 35)
scala>
--- eleminate duplicates.
scala> val x = List(10,20,30,40,10,10,20)
x: List[Int] = List(10, 20, 30, 40, 10, 10, 20)
scala> x.distinct
res8: List[Int] = List(10, 20, 30, 40)
scala> val y = sc.parallelize(x)
y: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at <console>:29
scala> r1.collect
res14: Array[Int] = Array(10, 20, 30, 40, 50)
scala> y.collect
res15: Array[Int] = Array(10, 20, 30, 40, 10, 10, 20)
scala> val nodupes = (r1 ++ y).distinct
nodupes: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[13] at distinct at <console>:35
scala> nodupes.collect
[Stage 10:>                                                         (0 + 0                                                                          res16: Array[Int] = Array(30, 50, 40, 20, 10)
scala>
---------------------------------------
[cloudera@quickstart ~]$ hadoop fs -cat spLab/emp
101,aaaa,40000,m,11
102,bbbbbb,50000,f,12
103,cccc,50000,m,12
104,dd,90000,f,13
105,ee,10000,m,12
106,dkd,40000,m,12
107,sdkfj,80000,f,13
108,iiii,50000,m,11
[cloudera@quickstart ~]$ hadoop fs -cat spLab/emp2
201,Ravi,80000,m,12
202,Varun,90000,m,11
203,Varuna,100000,f,13
204,Vanila,50000,f,12
205,Mani,30000,m,14
206,Manisha,30000,f,14
[cloudera@quickstart ~]$
scala> val branch1 = sc.textFile("/user/cloudera/spLab/emp")
branch1: org.apache.spark.rdd.RDD[String] = /user/cloudera/spLab/emp MapPartitionsRDD[15] at textFile at <console>:27
scala> val branch2 = sc.textFile("/user/cloudera/spLab/emp2")
branch2: org.apache.spark.rdd.RDD[String] = /user/cloudera/spLab/emp2 MapPartitionsRDD[17] at textFile at <console>:27
scala> val emp = branch1.union(branch2)
emp: org.apache.spark.rdd.RDD[String] = UnionRDD[18] at union at <console>:31
scala> emp.collect.foreach(println)
scala> emp.collect.foreach(println)
101,aaaa,40000,m,11
102,bbbbbb,50000,f,12
103,cccc,50000,m,12
104,dd,90000,f,13
105,ee,10000,m,12
106,dkd,40000,m,12
107,sdkfj,80000,f,13
108,iiii,50000,m,11
201,Ravi,80000,m,12
202,Varun,90000,m,11
203,Varuna,100000,f,13
204,Vanila,50000,f,12
205,Mani,30000,m,14
206,Manisha,30000,f,14
--------------------------------
 distinct:
  to eleminated duplicates
  based on entire row match.
 limitations: can not eleminated based on some  column(s) match.
   for this solution:
     by iterating compactBuffer.
   [ later we will see ]
grouping aggregation on merged set.
scala> val pair = emp.map{ x =>
     |       val w = x.split(",")
     |       val dno = w(4).toInt
     |       val sal = w(2).toInt
     |      (dno, sal)
     | }
pair: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[19] at map at <console>:35
scala> val eres = pair.reduceByKey(_+_)
eres: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[20] at reduceByKey at <console>:37
scala> eres.collect.foreach(println)
(14,60000)
(12,280000)
(13,270000)
(11,180000)
scala>
-- in this output we dont have seperate total for branch1 and branch2.
 for this solution: COGROUP.
    [NEXT DOCUMENT ]



























4 comments:

  1. Good Post! Thank you so much for sharing this pretty post, it was so good to read and useful to improve my knowledge as updated one, keep blogging.

    https://www.emexotechnologies.com/online-courses/big-data-hadoop-training-in-electronic-city/

    ReplyDelete
  2. Really Good blog post.provided a helpful information.I hope that you will post more updates like this Big Data Hadoop Online Training Bangalore

    ReplyDelete
  3. Good Post! Thank you so much for sharing this pretty post, it was so good to read and useful to improve my knowledge as updated one, keep blogging.
    Big Data Hadoop Training in electronic city

    ReplyDelete