demo grouping aggregations on structured data.
----------------------------------------------
[cloudera@quickstart ~]$ ls emp
emp
[cloudera@quickstart ~]$ cat emp
101,aaaa,40000,m,11
102,bbbbbb,50000,f,12
103,cccc,50000,m,12
104,dd,90000,f,13
105,ee,10000,m,12
106,dkd,40000,m,12
107,sdkfj,80000,f,13
108,iiii,50000,m,11
[cloudera@quickstart ~]$ hadoop fs -ls spLab
ls: `spLab': No such file or directory
[cloudera@quickstart ~]$ hadoop fs -mkdir spLab
[cloudera@quickstart ~]$ hadoop fs -
copyFromLocal emp spLab
scala> val data = sc.textFile
("/user/cloudera/spLab/emp")
data: org.apache.spark.rdd.RDD[String] =
/user/cloudera/spLab/emp MapPartitionsRDD[1] at
textFile at <console>:27
scala> data.collect.foreach(println)
101,aaaa,40000,m,11
102,bbbbbb,50000,f,12
103,cccc,50000,m,12
104,dd,90000,f,13
105,ee,10000,m,12
106,dkd,40000,m,12
107,sdkfj,80000,f,13
108,iiii,50000,m,11
scala>
scala> val arr = data.map(_.split(","))
arr: org.apache.spark.rdd.RDD[Array[String]] =
MapPartitionsRDD[2] at map at <console>:29
scala> arr.collect
res1: Array[Array[String]] = Array(Array(101,
aaaa, 40000, m, 11), Array(102, bbbbbb, 50000,
f, 12), Array(103, cccc, 50000, m, 12), Array
(104, dd, 90000, f, 13), Array(105, ee, 10000,
m, 12), Array(106, dkd, 40000, m, 12), Array
(107, sdkfj, 80000, f, 13), Array(108, iiii,
50000, m, 11))
scala>
scala> val pair1 = arr.map(x => (x(3), x
(2).toInt) )
pair1: org.apache.spark.rdd.RDD[(String, Int)] =
MapPartitionsRDD[3] at map at <console>:31
scala> // or
scala> val pair1 = arr.map{ x =>
| val sex = x(3)
| val sal = x(2).toInt
| (sex, sal)
| }
pair1: org.apache.spark.rdd.RDD[(String, Int)] =
MapPartitionsRDD[4] at map at <console>:31
scala>
scala> pair1.collect.foreach(println)
(m,40000)
(f,50000)
(m,50000)
(f,90000)
(m,10000)
(m,40000)
(f,80000)
(m,50000)
scala>
scala> // select sex, sum(sal) from emp group by
sex
scala> val rsum = pair1.reduceByKey((a,b) => a
+b)
rsum: org.apache.spark.rdd.RDD[(String, Int)] =
ShuffledRDD[5] at reduceByKey at <console>:33
scala> // or
scala> val rsum = pair1.reduceByKey(_+_)
rsum: org.apache.spark.rdd.RDD[(String, Int)] =
ShuffledRDD[6] at reduceByKey at <console>:33
scala> rsum.collect
res3: Array[(String, Int)] = Array((f,220000),
(m,190000))
scala>
// select sex, max(sal) from emp group by sex;
scala> val rmax = pair1.reduceByKey(Math.max
(_,_))
rmax: org.apache.spark.rdd.RDD[(String, Int)] =
ShuffledRDD[7] at reduceByKey at <console>:33
scala> rmax.collect
res4: Array[(String, Int)] = Array((f,90000),
(m,50000))
scala>
// select sex, min(sal) from emp group by sex;
scala> val rmin = pair1.reduceByKey(Math.min
(_,_))
rmin: org.apache.spark.rdd.RDD[(String, Int)] =
ShuffledRDD[8] at reduceByKey at <console>:33
scala> rmin.collect
res5: Array[(String, Int)] = Array((f,50000),
(m,10000))
scala>
// select sex, count(*) from emp
group by sex
scala> pair1.collect
res6: Array[(String, Int)] = Array((m,40000),
(f,50000), (m,50000), (f,90000), (m,10000),
(m,40000), (f,80000), (m,50000))
scala> pair1.countByKey
res7: scala.collection.Map[String,Long] = Map(f
-> 3, m -> 5)
scala> val pair2 = pair1.map(x => (x._1 , 1)
)
pair2: org.apache.spark.rdd.RDD[(String, Int)] =
MapPartitionsRDD[11] at map at <console>:33
scala> pair2.collect
res8: Array[(String, Int)] = Array((m,1), (f,1),
(m,1), (f,1), (m,1), (m,1), (f,1), (m,1))
scala> val rcnt = pair2.reduceByKey(_+_)
rcnt: org.apache.spark.rdd.RDD[(String, Int)] =
ShuffledRDD[12] at reduceByKey at <console>:35
scala> rcnt.collect
res9: Array[(String, Int)] = Array((f,3), (m,5))
scala>
// select sex, avg(sal) from emp group by sex;
scala> rsum.collect.foreach(println)
(f,220000)
(m,190000)
scala> rcnt.collect.foreach(println)
(f,3)
(m,5)
scala> val j = rsum.join(rcnt)
j: org.apache.spark.rdd.RDD[(String, (Int,
Int))] = MapPartitionsRDD[15] at join at
<console>:39
scala> j.collect
res12: Array[(String, (Int, Int))] = Array((f,
(220000,3)), (m,(190000,5)))
scala>
scala> j.collect
res13: Array[(String, (Int, Int))] = Array((f,
(220000,3)), (m,(190000,5)))
scala> val ravg = j.map{ x =>
| val sex = x._1
| val v = x._2
| val tot = v._1
| val cnt = v._2
| val avg = tot/cnt
| (sex, avg.toInt)
| }
ravg: org.apache.spark.rdd.RDD[(String, Int)] =
MapPartitionsRDD[17] at map at <console>:41
scala> ravg.collect
res15: Array[(String, Int)] = Array((f,73333),
(m,38000))
scala>
// select dno, range(sal) from emp
group by dno;
--> range is a difference between max and min.
scala> val pair3 = arr.map(x => ( x(4), x
(2).toInt ) )
pair3: org.apache.spark.rdd.RDD[(String, Int)] =
MapPartitionsRDD[18] at map at <console>:31
scala> pair3.collect.foreach(println)
(11,40000)
(12,50000)
(12,50000)
(13,90000)
(12,10000)
(12,40000)
(13,80000)
(11,50000)
scala>
scala> val dmax = pair3.reduceByKey(Math.max
(_,_))
dmax: org.apache.spark.rdd.RDD[(String, Int)] =
ShuffledRDD[19] at reduceByKey at <console>:33
scala> val dmin = pair3.reduceByKey(Math.min
(_,_))
dmin: org.apache.spark.rdd.RDD[(String, Int)] =
ShuffledRDD[20] at reduceByKey at <console>:33
scala> val dj = dmax.join(dmin)
dj: org.apache.spark.rdd.RDD[(String, (Int,
Int))] = MapPartitionsRDD[23] at join at
<console>:37
scala> val drange = dj.map{ x =>
| val dno = x._1
| val max = x._2._1
| val min = x._2._2
| val r = max-min
| (dno, r)
| }
drange: org.apache.spark.rdd.RDD[(String, Int)]
= MapPartitionsRDD[25] at map at <console>:39
scala> drange.collect.foreach(println)
(12,40000)
(13,10000)
(11,10000)
scala>
-------------------------------------
scala> // multiple aggregations.
scala> pair1.collect
res18: Array[(String, Int)] = Array((m,40000),
(f,50000), (m,50000), (f,90000), (m,10000),
(m,40000), (f,80000), (m,50000))
scala> val grp = pair1.groupByKey()
grp: org.apache.spark.rdd.RDD[(String, Iterable
[Int])] = ShuffledRDD[26] at groupByKey at
<console>:33
scala> grp.collect
res19: Array[(String, Iterable[Int])] = Array
((f,CompactBuffer(50000, 90000, 80000)),
(m,CompactBuffer(40000, 50000, 10000, 40000,
50000)))
scala> val r1 = grp.map(x => (x._1 , x._2.sum )
)
r1: org.apache.spark.rdd.RDD[(String, Int)] =
MapPartitionsRDD[27] at map at <console>:35
scala> r1.collect.foreach(println)
(f,220000)
(m,190000)
scala>
// select sex, sum(sal), count(*) ,
avg(sal) , max(sal), min(sal),
max(sal)-min(sal) as range
from emp group by sex;
scala> val rall = grp.map{ x =>
| val sex = x._1
| val cb = x._2
| val tot = cb.sum
| val cnt = cb.size
| val avg = (tot/cnt).toInt
| val max = cb.max
| val min = cb.min
| val r = max-min
| (sex,tot,cnt,avg,max,min,r)
| }
rall: org.apache.spark.rdd.RDD[(String, Int,
Int, Int, Int, Int, Int)] = MapPartitionsRDD[28]
at map at <console>:35
scala> rall.collect.foreach(println)
(f,220000,3,73333,90000,50000,40000)
(m,190000,5,38000,50000,10000,40000)
scala>
No comments:
Post a Comment