Data science Software Course Training in Ameerpet Hyderabad

Data science Software Course Training in Ameerpet Hyderabad

Monday, 1 May 2017

Spark Grouping Aggregations


demo grouping aggregations on structured data.
----------------------------------------------
[cloudera@quickstart ~]$ ls emp
emp
[cloudera@quickstart ~]$ cat emp
101,aaaa,40000,m,11
102,bbbbbb,50000,f,12
103,cccc,50000,m,12
104,dd,90000,f,13
105,ee,10000,m,12
106,dkd,40000,m,12
107,sdkfj,80000,f,13
108,iiii,50000,m,11
[cloudera@quickstart ~]$ hadoop fs -ls spLab
ls: `spLab': No such file or directory
[cloudera@quickstart ~]$ hadoop fs -mkdir spLab
[cloudera@quickstart ~]$ hadoop fs -

copyFromLocal emp spLab


scala> val data = sc.textFile

("/user/cloudera/spLab/emp")
data: org.apache.spark.rdd.RDD[String] =

/user/cloudera/spLab/emp MapPartitionsRDD[1] at

textFile at <console>:27

scala> data.collect.foreach(println)
101,aaaa,40000,m,11
102,bbbbbb,50000,f,12
103,cccc,50000,m,12
104,dd,90000,f,13
105,ee,10000,m,12
106,dkd,40000,m,12
107,sdkfj,80000,f,13
108,iiii,50000,m,11

scala>

scala> val arr = data.map(_.split(","))
arr: org.apache.spark.rdd.RDD[Array[String]] =

MapPartitionsRDD[2] at map at <console>:29

scala> arr.collect
res1: Array[Array[String]] = Array(Array(101,

aaaa, 40000, m, 11), Array(102, bbbbbb, 50000,

f, 12), Array(103, cccc, 50000, m, 12), Array

(104, dd, 90000, f, 13), Array(105, ee, 10000,

m, 12), Array(106, dkd, 40000, m, 12), Array

(107, sdkfj, 80000, f, 13), Array(108, iiii,

50000, m, 11))

scala>

scala> val pair1 = arr.map(x => (x(3), x

(2).toInt) )
pair1: org.apache.spark.rdd.RDD[(String, Int)] =

MapPartitionsRDD[3] at map at <console>:31

scala> // or

scala> val pair1 = arr.map{ x =>
     |      val sex = x(3)
     |      val sal = x(2).toInt
     |     (sex, sal)
     | }
pair1: org.apache.spark.rdd.RDD[(String, Int)] =

MapPartitionsRDD[4] at map at <console>:31

scala>

scala> pair1.collect.foreach(println)
(m,40000)
(f,50000)
(m,50000)
(f,90000)
(m,10000)
(m,40000)
(f,80000)
(m,50000)

scala>

scala> // select sex, sum(sal) from emp group by

sex

scala> val rsum = pair1.reduceByKey((a,b) => a

+b)
rsum: org.apache.spark.rdd.RDD[(String, Int)] =

ShuffledRDD[5] at reduceByKey at <console>:33

scala> // or

scala> val rsum = pair1.reduceByKey(_+_)
rsum: org.apache.spark.rdd.RDD[(String, Int)] =

ShuffledRDD[6] at reduceByKey at <console>:33

scala> rsum.collect
res3: Array[(String, Int)] = Array((f,220000),

(m,190000))

scala>

// select sex, max(sal) from emp group by sex;

scala> val rmax = pair1.reduceByKey(Math.max

(_,_))
rmax: org.apache.spark.rdd.RDD[(String, Int)] =

ShuffledRDD[7] at reduceByKey at <console>:33

scala> rmax.collect
res4: Array[(String, Int)] = Array((f,90000),

(m,50000))

scala>

// select sex, min(sal) from emp group by sex;

scala> val rmin = pair1.reduceByKey(Math.min

(_,_))
rmin: org.apache.spark.rdd.RDD[(String, Int)] =

ShuffledRDD[8] at reduceByKey at <console>:33

scala> rmin.collect
res5: Array[(String, Int)] = Array((f,50000),

(m,10000))

scala>

// select sex, count(*) from emp
  group by sex

scala> pair1.collect
res6: Array[(String, Int)] = Array((m,40000),

(f,50000), (m,50000), (f,90000), (m,10000),

(m,40000), (f,80000), (m,50000))

scala> pair1.countByKey
res7: scala.collection.Map[String,Long] = Map(f

-> 3, m -> 5)

scala> val pair2 = pair1.map(x => (x._1 ,    1)

)
pair2: org.apache.spark.rdd.RDD[(String, Int)] =

MapPartitionsRDD[11] at map at <console>:33

scala> pair2.collect
res8: Array[(String, Int)] = Array((m,1), (f,1),

(m,1), (f,1), (m,1), (m,1), (f,1), (m,1))

scala> val rcnt = pair2.reduceByKey(_+_)
rcnt: org.apache.spark.rdd.RDD[(String, Int)] =

ShuffledRDD[12] at reduceByKey at <console>:35

scala> rcnt.collect
res9: Array[(String, Int)] = Array((f,3), (m,5))

scala>

// select sex, avg(sal) from emp group by sex;
scala> rsum.collect.foreach(println)
(f,220000)
(m,190000)

scala> rcnt.collect.foreach(println)
(f,3)
(m,5)

scala> val j = rsum.join(rcnt)
j: org.apache.spark.rdd.RDD[(String, (Int,

Int))] = MapPartitionsRDD[15] at join at

<console>:39

scala> j.collect
res12: Array[(String, (Int, Int))] = Array((f,

(220000,3)), (m,(190000,5)))

scala>

scala> j.collect
res13: Array[(String, (Int, Int))] = Array((f,

(220000,3)), (m,(190000,5)))
scala> val ravg = j.map{ x =>
     |             val sex = x._1
     |            val v = x._2
     |            val tot = v._1
     |           val cnt = v._2
     |           val avg = tot/cnt
     |           (sex, avg.toInt)
     |       }
ravg: org.apache.spark.rdd.RDD[(String, Int)] =

MapPartitionsRDD[17] at map at <console>:41

scala> ravg.collect
res15: Array[(String, Int)] = Array((f,73333),

(m,38000))

scala>


// select dno, range(sal) from emp
    group by dno;

  --> range is a difference between max and min.

scala> val pair3 = arr.map(x => ( x(4), x

(2).toInt ) )
pair3: org.apache.spark.rdd.RDD[(String, Int)] =

MapPartitionsRDD[18] at map at <console>:31

scala> pair3.collect.foreach(println)
(11,40000)
(12,50000)
(12,50000)
(13,90000)
(12,10000)
(12,40000)
(13,80000)
(11,50000)

scala>
scala> val dmax = pair3.reduceByKey(Math.max

(_,_))
dmax: org.apache.spark.rdd.RDD[(String, Int)] =

ShuffledRDD[19] at reduceByKey at <console>:33

scala> val dmin = pair3.reduceByKey(Math.min

(_,_))
dmin: org.apache.spark.rdd.RDD[(String, Int)] =

ShuffledRDD[20] at reduceByKey at <console>:33

scala> val dj = dmax.join(dmin)
dj: org.apache.spark.rdd.RDD[(String, (Int,

Int))] = MapPartitionsRDD[23] at join at

<console>:37

scala>  val drange = dj.map{ x =>
     |              val dno = x._1
     |             val max = x._2._1
     |             val min = x._2._2
     |             val r = max-min
     |             (dno, r)
     |       }
drange: org.apache.spark.rdd.RDD[(String, Int)]

= MapPartitionsRDD[25] at map at <console>:39

scala> drange.collect.foreach(println)
(12,40000)
(13,10000)
(11,10000)

scala>

-------------------------------------

scala> // multiple aggregations.

scala> pair1.collect
res18: Array[(String, Int)] = Array((m,40000),

(f,50000), (m,50000), (f,90000), (m,10000),

(m,40000), (f,80000), (m,50000))

scala> val grp = pair1.groupByKey()
grp: org.apache.spark.rdd.RDD[(String, Iterable

[Int])] = ShuffledRDD[26] at groupByKey at

<console>:33

scala> grp.collect
res19: Array[(String, Iterable[Int])] = Array

((f,CompactBuffer(50000, 90000, 80000)),

(m,CompactBuffer(40000, 50000, 10000, 40000,

50000)))

scala> val r1 = grp.map(x => (x._1 , x._2.sum )

)
r1: org.apache.spark.rdd.RDD[(String, Int)] =

MapPartitionsRDD[27] at map at <console>:35

scala> r1.collect.foreach(println)
(f,220000)
(m,190000)

scala>

// select sex, sum(sal), count(*) ,
        avg(sal) , max(sal), min(sal),
              max(sal)-min(sal) as range
    from emp group by sex;

scala> val rall = grp.map{ x =>
     |      val sex = x._1
     |      val cb = x._2
     |      val tot = cb.sum
     |      val cnt = cb.size
     |      val avg = (tot/cnt).toInt
     |      val max = cb.max
     |      val min = cb.min
     |      val r = max-min
     |      (sex,tot,cnt,avg,max,min,r)
     | }
rall: org.apache.spark.rdd.RDD[(String, Int,

Int, Int, Int, Int, Int)] = MapPartitionsRDD[28]

at map at <console>:35

scala> rall.collect.foreach(println)
(f,220000,3,73333,90000,50000,40000)
(m,190000,5,38000,50000,10000,40000)

scala>



1 comment:

  1. Good Post! Thank you so much for sharing this pretty post, it was so good to read and useful to improve my knowledge as updated one, keep blogging.
    Big Data Hadoop Training in electronic city

    ReplyDelete