Data science Software Course Training in Ameerpet Hyderabad

Data science Software Course Training in Ameerpet Hyderabad

Monday 1 May 2017

Spark Grouping Aggregations


demo grouping aggregations on structured data.
----------------------------------------------
[cloudera@quickstart ~]$ ls emp
emp
[cloudera@quickstart ~]$ cat emp
101,aaaa,40000,m,11
102,bbbbbb,50000,f,12
103,cccc,50000,m,12
104,dd,90000,f,13
105,ee,10000,m,12
106,dkd,40000,m,12
107,sdkfj,80000,f,13
108,iiii,50000,m,11
[cloudera@quickstart ~]$ hadoop fs -ls spLab
ls: `spLab': No such file or directory
[cloudera@quickstart ~]$ hadoop fs -mkdir spLab
[cloudera@quickstart ~]$ hadoop fs -

copyFromLocal emp spLab


scala> val data = sc.textFile

("/user/cloudera/spLab/emp")
data: org.apache.spark.rdd.RDD[String] =

/user/cloudera/spLab/emp MapPartitionsRDD[1] at

textFile at <console>:27

scala> data.collect.foreach(println)
101,aaaa,40000,m,11
102,bbbbbb,50000,f,12
103,cccc,50000,m,12
104,dd,90000,f,13
105,ee,10000,m,12
106,dkd,40000,m,12
107,sdkfj,80000,f,13
108,iiii,50000,m,11

scala>

scala> val arr = data.map(_.split(","))
arr: org.apache.spark.rdd.RDD[Array[String]] =

MapPartitionsRDD[2] at map at <console>:29

scala> arr.collect
res1: Array[Array[String]] = Array(Array(101,

aaaa, 40000, m, 11), Array(102, bbbbbb, 50000,

f, 12), Array(103, cccc, 50000, m, 12), Array

(104, dd, 90000, f, 13), Array(105, ee, 10000,

m, 12), Array(106, dkd, 40000, m, 12), Array

(107, sdkfj, 80000, f, 13), Array(108, iiii,

50000, m, 11))

scala>

scala> val pair1 = arr.map(x => (x(3), x

(2).toInt) )
pair1: org.apache.spark.rdd.RDD[(String, Int)] =

MapPartitionsRDD[3] at map at <console>:31

scala> // or

scala> val pair1 = arr.map{ x =>
     |      val sex = x(3)
     |      val sal = x(2).toInt
     |     (sex, sal)
     | }
pair1: org.apache.spark.rdd.RDD[(String, Int)] =

MapPartitionsRDD[4] at map at <console>:31

scala>

scala> pair1.collect.foreach(println)
(m,40000)
(f,50000)
(m,50000)
(f,90000)
(m,10000)
(m,40000)
(f,80000)
(m,50000)

scala>

scala> // select sex, sum(sal) from emp group by

sex

scala> val rsum = pair1.reduceByKey((a,b) => a

+b)
rsum: org.apache.spark.rdd.RDD[(String, Int)] =

ShuffledRDD[5] at reduceByKey at <console>:33

scala> // or

scala> val rsum = pair1.reduceByKey(_+_)
rsum: org.apache.spark.rdd.RDD[(String, Int)] =

ShuffledRDD[6] at reduceByKey at <console>:33

scala> rsum.collect
res3: Array[(String, Int)] = Array((f,220000),

(m,190000))

scala>

// select sex, max(sal) from emp group by sex;

scala> val rmax = pair1.reduceByKey(Math.max

(_,_))
rmax: org.apache.spark.rdd.RDD[(String, Int)] =

ShuffledRDD[7] at reduceByKey at <console>:33

scala> rmax.collect
res4: Array[(String, Int)] = Array((f,90000),

(m,50000))

scala>

// select sex, min(sal) from emp group by sex;

scala> val rmin = pair1.reduceByKey(Math.min

(_,_))
rmin: org.apache.spark.rdd.RDD[(String, Int)] =

ShuffledRDD[8] at reduceByKey at <console>:33

scala> rmin.collect
res5: Array[(String, Int)] = Array((f,50000),

(m,10000))

scala>

// select sex, count(*) from emp
  group by sex

scala> pair1.collect
res6: Array[(String, Int)] = Array((m,40000),

(f,50000), (m,50000), (f,90000), (m,10000),

(m,40000), (f,80000), (m,50000))

scala> pair1.countByKey
res7: scala.collection.Map[String,Long] = Map(f

-> 3, m -> 5)

scala> val pair2 = pair1.map(x => (x._1 ,    1)

)
pair2: org.apache.spark.rdd.RDD[(String, Int)] =

MapPartitionsRDD[11] at map at <console>:33

scala> pair2.collect
res8: Array[(String, Int)] = Array((m,1), (f,1),

(m,1), (f,1), (m,1), (m,1), (f,1), (m,1))

scala> val rcnt = pair2.reduceByKey(_+_)
rcnt: org.apache.spark.rdd.RDD[(String, Int)] =

ShuffledRDD[12] at reduceByKey at <console>:35

scala> rcnt.collect
res9: Array[(String, Int)] = Array((f,3), (m,5))

scala>

// select sex, avg(sal) from emp group by sex;
scala> rsum.collect.foreach(println)
(f,220000)
(m,190000)

scala> rcnt.collect.foreach(println)
(f,3)
(m,5)

scala> val j = rsum.join(rcnt)
j: org.apache.spark.rdd.RDD[(String, (Int,

Int))] = MapPartitionsRDD[15] at join at

<console>:39

scala> j.collect
res12: Array[(String, (Int, Int))] = Array((f,

(220000,3)), (m,(190000,5)))

scala>

scala> j.collect
res13: Array[(String, (Int, Int))] = Array((f,

(220000,3)), (m,(190000,5)))
scala> val ravg = j.map{ x =>
     |             val sex = x._1
     |            val v = x._2
     |            val tot = v._1
     |           val cnt = v._2
     |           val avg = tot/cnt
     |           (sex, avg.toInt)
     |       }
ravg: org.apache.spark.rdd.RDD[(String, Int)] =

MapPartitionsRDD[17] at map at <console>:41

scala> ravg.collect
res15: Array[(String, Int)] = Array((f,73333),

(m,38000))

scala>


// select dno, range(sal) from emp
    group by dno;

  --> range is a difference between max and min.

scala> val pair3 = arr.map(x => ( x(4), x

(2).toInt ) )
pair3: org.apache.spark.rdd.RDD[(String, Int)] =

MapPartitionsRDD[18] at map at <console>:31

scala> pair3.collect.foreach(println)
(11,40000)
(12,50000)
(12,50000)
(13,90000)
(12,10000)
(12,40000)
(13,80000)
(11,50000)

scala>
scala> val dmax = pair3.reduceByKey(Math.max

(_,_))
dmax: org.apache.spark.rdd.RDD[(String, Int)] =

ShuffledRDD[19] at reduceByKey at <console>:33

scala> val dmin = pair3.reduceByKey(Math.min

(_,_))
dmin: org.apache.spark.rdd.RDD[(String, Int)] =

ShuffledRDD[20] at reduceByKey at <console>:33

scala> val dj = dmax.join(dmin)
dj: org.apache.spark.rdd.RDD[(String, (Int,

Int))] = MapPartitionsRDD[23] at join at

<console>:37

scala>  val drange = dj.map{ x =>
     |              val dno = x._1
     |             val max = x._2._1
     |             val min = x._2._2
     |             val r = max-min
     |             (dno, r)
     |       }
drange: org.apache.spark.rdd.RDD[(String, Int)]

= MapPartitionsRDD[25] at map at <console>:39

scala> drange.collect.foreach(println)
(12,40000)
(13,10000)
(11,10000)

scala>

-------------------------------------

scala> // multiple aggregations.

scala> pair1.collect
res18: Array[(String, Int)] = Array((m,40000),

(f,50000), (m,50000), (f,90000), (m,10000),

(m,40000), (f,80000), (m,50000))

scala> val grp = pair1.groupByKey()
grp: org.apache.spark.rdd.RDD[(String, Iterable

[Int])] = ShuffledRDD[26] at groupByKey at

<console>:33

scala> grp.collect
res19: Array[(String, Iterable[Int])] = Array

((f,CompactBuffer(50000, 90000, 80000)),

(m,CompactBuffer(40000, 50000, 10000, 40000,

50000)))

scala> val r1 = grp.map(x => (x._1 , x._2.sum )

)
r1: org.apache.spark.rdd.RDD[(String, Int)] =

MapPartitionsRDD[27] at map at <console>:35

scala> r1.collect.foreach(println)
(f,220000)
(m,190000)

scala>

// select sex, sum(sal), count(*) ,
        avg(sal) , max(sal), min(sal),
              max(sal)-min(sal) as range
    from emp group by sex;

scala> val rall = grp.map{ x =>
     |      val sex = x._1
     |      val cb = x._2
     |      val tot = cb.sum
     |      val cnt = cb.size
     |      val avg = (tot/cnt).toInt
     |      val max = cb.max
     |      val min = cb.min
     |      val r = max-min
     |      (sex,tot,cnt,avg,max,min,r)
     | }
rall: org.apache.spark.rdd.RDD[(String, Int,

Int, Int, Int, Int, Int)] = MapPartitionsRDD[28]

at map at <console>:35

scala> rall.collect.foreach(println)
(f,220000,3,73333,90000,50000,40000)
(m,190000,5,38000,50000,10000,40000)

scala>



No comments:

Post a Comment