// performing aggregations grouping by
multiple columns;
sql:
select dno, sex, sum(sal) from emp
group by dno, sex;
scala> val data = sc.textFile
("/user/cloudera/spLab/emp")
data: org.apache.spark.rdd.RDD[String] =
/user/cloudera/spLab/emp MapPartitionsRDD[1] at
textFile at <console>:27
scala> data.collect.foreach(println)
101,aaaa,40000,m,11
102,bbbbbb,50000,f,12
103,cccc,50000,m,12
104,dd,90000,f,13
105,ee,10000,m,12
106,dkd,40000,m,12
107,sdkfj,80000,f,13
108,iiii,50000,m,11
scala> val arr = data.map(_.split(","))
arr: org.apache.spark.rdd.RDD[Array[String]] =
MapPartitionsRDD[2] at map at <console>:29
scala> arr.collect
res1: Array[Array[String]] = Array(Array(101,
aaaa, 40000, m, 11), Array(102, bbbbbb, 50000,
f, 12), Array(103, cccc, 50000, m, 12), Array
(104, dd, 90000, f, 13), Array(105, ee, 10000,
m, 12), Array(106, dkd, 40000, m, 12), Array
(107, sdkfj, 80000, f, 13), Array(108, iiii,
50000, m, 11))
scala>
scala> val pair = arr.map(x => ( (x(4),x(3)) ,
x(2).toInt) )
pair: org.apache.spark.rdd.RDD[((String,
String), Int)] = MapPartitionsRDD[3] at map at
<console>:31
scala> pair.collect.foreach(println)
((11,m),40000)
((12,f),50000)
((12,m),50000)
((13,f),90000)
((12,m),10000)
((12,m),40000)
((13,f),80000)
((11,m),50000)
scala>
//or
val pair = data.map{ x =>
val w = x.split(",")
val dno = w(4)
val sex = w(3)
val sal = w(2).toInt
val mykey = (dno,sex)
val p = (mykey , sal)
p
}
scala> val res = pair.reduceByKey(_+_)
scala> res.collect.foreach(println)
((12,f),50000)
((13,f),170000)
((12,m),100000)
((11,m),90000)
scala> val r = res.map(x =>
(x._1._1,x._1._2,x._2) )
scala> r.collect.foreach(println)
(12,f,50000)
(13,f,170000)
(12,m,100000)
(11,m,90000)
-------------------------------------
spark reduceByKey() allows only single key for
grouping.
when you want grouping by multiple columns,
make multiple columns as a tuple,
keep the tuple as key in the pair.
---------------------------------------
sql:--> multi grouping and multi aggregations.
select dno, sex, sum(sal), count(*),
avg(sal) , max(sal), min(sal) from emp
group by dno, sex;
scala> val grp = pair.groupByKey()
grp: org.apache.spark.rdd.RDD[((String, String),
Iterable[Int])] = ShuffledRDD[7] at groupByKey
at <console>:31
scala> grp.collect.foreach(println)
((12,f),CompactBuffer(50000))
((13,f),CompactBuffer(90000, 80000))
((12,m),CompactBuffer(50000, 10000, 40000))
((11,m),CompactBuffer(40000, 50000))
scala> val agr = grp.map{ x =>
val dno = x._1._1
val sex = x._1._2
val cb = x._2
val tot = cb.sum
val cnt = cb.size
val avg = (tot/cnt).toInt
val max = cb.max
val min = cb.min
val r = (dno,sex,tot,cnt,avg,max,min)
r
}
agr: org.apache.spark.rdd.RDD[(String, String,
Int, Int, Int, Int, Int)] = MapPartitionsRDD[8]
at map at <console>:37
scala>
scala> agr.collect.foreach(println)
(12,f,50000,1,50000,50000,50000)
(13,f,170000,2,85000,90000,80000)
(12,m,100000,3,33333,50000,10000)
(11,m,90000,2,45000,50000,40000)
scala> // to save results into file.
agr.saveAsTextFile("/user/cloudera/spLab/res1")
[cloudera@quickstart ~]$ hadoop fs -ls spLab
Found 2 items
-rw-r--r-- 1 cloudera cloudera 158
2017-05-01 20:17 spLab/emp
drwxr-xr-x - cloudera cloudera 0
2017-05-02 20:29 spLab/res1
[cloudera@quickstart ~]$ hadoop fs -ls
spLab/res1
Found 2 items
-rw-r--r-- 1 cloudera cloudera 0
2017-05-02 20:29 spLab/res1/_SUCCESS
-rw-r--r-- 1 cloudera cloudera 134
2017-05-02 20:29 spLab/res1/part-00000
[cloudera@quickstart ~]$ hadoop fs -cat
spLab/res1/part-00000
(12,f,50000,1,50000,50000,50000)
(13,f,170000,2,85000,90000,80000)
(12,m,100000,3,33333,50000,10000)
(11,m,90000,2,45000,50000,40000)
[cloudera@quickstart ~]$
// here, output is written as tuple shape.
// which is not valid format for hive, rdbms, or
other systems.
// before saving results following
transformation should be done.
val r1 = agr.map{ x=>
x._1+","+x._2+","+x._3+","+
x._4+","+x._5+","+x._6+","+x._7
}
scala> val r1 = agr.map{ x=>
| x._1+","+x._2+","+x._3+","+
| x._4+","+x._5+","+x._6+","+x._7
| }
r1: org.apache.spark.rdd.RDD[String] =
MapPartitionsRDD[5] at map at <console>:35
scala> r1.collect.foreach(println)
12,f,50000,1,50000,50000,50000
13,f,170000,2,85000,90000,80000
12,m,100000,3,33333,50000,10000
11,m,90000,2,45000,50000,40000
scala>
// or
scala> val r2 = agr.map{ x =>
| val dno = x._1
| val sex = x._2
| val tot = x._3
| val cnt = x._4
| val avg = x._5
| val max = x._6
| val min = x._7
| Array(dno,sex,tot.toString,cnt.toString,
| avg.toString, max.toString,
min.toString).mkString("\t")
| }
r2: org.apache.spark.rdd.RDD[String] =
MapPartitionsRDD[6] at map at <console>:35
scala> r2.collect.foreach(println)
12 f 50000 1 50000 50000
50000
13 f 170000 2 85000 90000
80000
12 m 100000 3 33333 50000
10000
11 m 90000 2 45000 50000
40000
scala>
[cloudera@quickstart ~]$ hadoop fs -ls
spLab/res2
Found 2 items
-rw-r--r-- 1 cloudera cloudera 0
2017-05-02 20:44 spLab/res2/_SUCCESS
-rw-r--r-- 1 cloudera cloudera 126
2017-05-02 20:44 spLab/res2/part-00000
[cloudera@quickstart ~]$ hadoop fs -cat
spLab/res2/part-00000
12 f 50000 1 50000 50000
50000
13 f 170000 2 85000 90000
80000
12 m 100000 3 33333 50000
10000
11 m 90000 2 45000 50000
40000
[cloudera@quickstart ~]$
-- this results , can be directly exported into
rdbms.
[cloudera@quickstart ~]$ mysql -u root
-pcloudera
mysql> create database spres;
Query OK, 1 row affected (0.03 sec)
mysql> use spres;
Database changed
mysql> create table summary(dno int, sex char
(1),
-> tot int , cnt int, avg int, max int,
min int);
Query OK, 0 rows affected (0.10 sec)
mysql> select * from summary;
Empty set (0.00 sec)
mysql>
[cloudera@quickstart ~]$ sqoop export --connect
jdbc:mysql://localhost/spres --username root --
password cloudera --table summary --export-dir
'/user/cloudera/spLab/res2/part-00000' --input-
fields-terminated-by '\t'
to use spark written results by hive.
hive> create table info(dno int, sex string,
tot int, cnt int, avg int, max int,
min int)
row format delimited
fields terminated by '\t';
hive> load data
'/user/cloudera/spLab/res2/part-00000' into
table info;
mysql> select * from summary;
+------+------+--------+------+-------+-------
+-------+
| dno | sex | tot | cnt | avg | max |
min |
+------+------+--------+------+-------+-------
+-------+
| 12 | m | 100000 | 3 | 33333 | 50000 |
10000 |
| 11 | m | 90000 | 2 | 45000 | 50000 |
40000 |
| 12 | f | 50000 | 1 | 50000 | 50000 |
50000 |
| 13 | f | 170000 | 2 | 85000 | 90000 |
80000 |
+------+------+--------+------+-------+-------
+-------+
4 rows in set (0.03 sec)
thank for you sharing your blog ,excellent blog good idea big data hadoop
ReplyDeleteHadoop Training In Hyderabad
Thanks alot for sharing blog..
ReplyDeleteIts really helping us.Excellent teaching skills Mr.SriRam.