Data science Software Course Training in Ameerpet Hyderabad

Data science Software Course Training in Ameerpet Hyderabad

Tuesday, 2 May 2017

Spark : Performing grouping Aggregations based on Multiple Keys and saving results


 // performing  aggregations grouping by

multiple columns;

 sql:
   select dno, sex, sum(sal) from emp
        group by dno, sex;

  scala> val data = sc.textFile

("/user/cloudera/spLab/emp")
data: org.apache.spark.rdd.RDD[String] =

/user/cloudera/spLab/emp MapPartitionsRDD[1] at

textFile at <console>:27

scala> data.collect.foreach(println)
101,aaaa,40000,m,11
102,bbbbbb,50000,f,12
103,cccc,50000,m,12
104,dd,90000,f,13
105,ee,10000,m,12
106,dkd,40000,m,12
107,sdkfj,80000,f,13
108,iiii,50000,m,11

scala> val arr = data.map(_.split(","))
arr: org.apache.spark.rdd.RDD[Array[String]] =

MapPartitionsRDD[2] at map at <console>:29

scala> arr.collect
res1: Array[Array[String]] = Array(Array(101,

aaaa, 40000, m, 11), Array(102, bbbbbb, 50000,

f, 12), Array(103, cccc, 50000, m, 12), Array

(104, dd, 90000, f, 13), Array(105, ee, 10000,

m, 12), Array(106, dkd, 40000, m, 12), Array

(107, sdkfj, 80000, f, 13), Array(108, iiii,

50000, m, 11))

scala>
scala> val pair = arr.map(x =>  ( (x(4),x(3)) ,

x(2).toInt)     )
pair: org.apache.spark.rdd.RDD[((String,

String), Int)] = MapPartitionsRDD[3] at map at

<console>:31

scala> pair.collect.foreach(println)
((11,m),40000)
((12,f),50000)
((12,m),50000)
((13,f),90000)
((12,m),10000)
((12,m),40000)
((13,f),80000)
((11,m),50000)

scala>
//or

 val pair = data.map{ x =>
             val w = x.split(",")
             val dno = w(4)
             val sex = w(3)
             val sal = w(2).toInt
             val mykey = (dno,sex)
             val p = (mykey , sal)
             p
           }


scala> val res = pair.reduceByKey(_+_)

scala> res.collect.foreach(println)
((12,f),50000)
((13,f),170000)
((12,m),100000)
((11,m),90000)

scala> val r = res.map(x =>

(x._1._1,x._1._2,x._2) )

scala> r.collect.foreach(println)
(12,f,50000)
(13,f,170000)
(12,m,100000)
(11,m,90000)

-------------------------------------
  spark reduceByKey() allows only single key for

grouping.
  when you want grouping by multiple columns,
  make multiple columns as a tuple,
   keep the tuple as key in the pair.

---------------------------------------
sql:--> multi grouping and multi aggregations.

 select  dno, sex, sum(sal), count(*),
      avg(sal) , max(sal), min(sal) from emp
        group by dno, sex;

scala> val grp = pair.groupByKey()
grp: org.apache.spark.rdd.RDD[((String, String),

Iterable[Int])] = ShuffledRDD[7] at groupByKey

at <console>:31

scala> grp.collect.foreach(println)
((12,f),CompactBuffer(50000))
((13,f),CompactBuffer(90000, 80000))
((12,m),CompactBuffer(50000, 10000, 40000))
((11,m),CompactBuffer(40000, 50000))

scala> val agr = grp.map{ x =>
             val dno = x._1._1
            val sex = x._1._2
           val cb = x._2
           val tot = cb.sum
           val cnt = cb.size
           val avg = (tot/cnt).toInt
           val max = cb.max
          val min = cb.min
          val r = (dno,sex,tot,cnt,avg,max,min)
        r
      }
agr: org.apache.spark.rdd.RDD[(String, String,

Int, Int, Int, Int, Int)] = MapPartitionsRDD[8]

at map at <console>:37

scala>

scala> agr.collect.foreach(println)
(12,f,50000,1,50000,50000,50000)
(13,f,170000,2,85000,90000,80000)
(12,m,100000,3,33333,50000,10000)
(11,m,90000,2,45000,50000,40000)

scala> // to save results into file.

agr.saveAsTextFile("/user/cloudera/spLab/res1")

[cloudera@quickstart ~]$ hadoop fs -ls spLab
Found 2 items
-rw-r--r--   1 cloudera cloudera        158

2017-05-01 20:17 spLab/emp
drwxr-xr-x   - cloudera cloudera          0

2017-05-02 20:29 spLab/res1
[cloudera@quickstart ~]$ hadoop fs -ls

spLab/res1
Found 2 items
-rw-r--r--   1 cloudera cloudera          0

2017-05-02 20:29 spLab/res1/_SUCCESS
-rw-r--r--   1 cloudera cloudera        134

2017-05-02 20:29 spLab/res1/part-00000
[cloudera@quickstart ~]$ hadoop fs -cat

spLab/res1/part-00000
(12,f,50000,1,50000,50000,50000)
(13,f,170000,2,85000,90000,80000)
(12,m,100000,3,33333,50000,10000)
(11,m,90000,2,45000,50000,40000)
[cloudera@quickstart ~]$

// here, output is written as tuple shape.
// which is not valid format for hive, rdbms, or

other systems.
// before saving results following

transformation  should be done.


 val r1 = agr.map{ x=>
    x._1+","+x._2+","+x._3+","+
      x._4+","+x._5+","+x._6+","+x._7
   }

scala> val r1 = agr.map{ x=>
     |     x._1+","+x._2+","+x._3+","+
     |       x._4+","+x._5+","+x._6+","+x._7
     |    }
r1: org.apache.spark.rdd.RDD[String] =

MapPartitionsRDD[5] at map at <console>:35

scala> r1.collect.foreach(println)
12,f,50000,1,50000,50000,50000
13,f,170000,2,85000,90000,80000
12,m,100000,3,33333,50000,10000
11,m,90000,2,45000,50000,40000

scala>
// or

scala> val r2 = agr.map{ x =>
     |      val dno = x._1
     |      val sex = x._2
     |      val tot = x._3
     |      val cnt = x._4
     |      val avg = x._5
     |      val max = x._6
     |     val min = x._7
     |  Array(dno,sex,tot.toString,cnt.toString,
     |        avg.toString, max.toString,

min.toString).mkString("\t")
     | }
r2: org.apache.spark.rdd.RDD[String] =

MapPartitionsRDD[6] at map at <console>:35

scala> r2.collect.foreach(println)
12 f 50000 1 50000 50000

50000
13 f 170000 2 85000 90000

80000
12 m 100000 3 33333 50000

10000
11 m 90000 2 45000 50000

40000

scala>

[cloudera@quickstart ~]$ hadoop fs -ls

spLab/res2
Found 2 items
-rw-r--r--   1 cloudera cloudera          0

2017-05-02 20:44 spLab/res2/_SUCCESS
-rw-r--r--   1 cloudera cloudera        126

2017-05-02 20:44 spLab/res2/part-00000
[cloudera@quickstart ~]$ hadoop fs -cat

spLab/res2/part-00000
12 f 50000 1 50000 50000

50000
13 f 170000 2 85000 90000

80000
12 m 100000 3 33333 50000

10000
11 m 90000 2 45000 50000

40000
[cloudera@quickstart ~]$


-- this results , can be directly exported into

rdbms.

[cloudera@quickstart ~]$ mysql -u root
                              -pcloudera

mysql> create database spres;
Query OK, 1 row affected (0.03 sec)

mysql> use spres;
Database changed
mysql> create table summary(dno int,  sex char

(1),
    ->     tot int  , cnt int, avg int, max int,

min int);
Query OK, 0 rows affected (0.10 sec)

mysql> select * from summary;
Empty set (0.00 sec)

mysql>

[cloudera@quickstart ~]$ sqoop export  --connect

jdbc:mysql://localhost/spres --username root --

password cloudera --table summary  --export-dir

'/user/cloudera/spLab/res2/part-00000' --input-

fields-terminated-by '\t'


to use spark written results by hive.

hive> create table info(dno int, sex string,
    tot int, cnt int, avg int, max int,
        min int)
     row format delimited
      fields terminated by '\t';
hive> load data

'/user/cloudera/spLab/res2/part-00000' into

table info;

mysql> select * from summary;
+------+------+--------+------+-------+-------

+-------+
| dno  | sex  | tot    | cnt  | avg   | max   |

min   |
+------+------+--------+------+-------+-------

+-------+
|   12 | m    | 100000 |    3 | 33333 | 50000 |

10000 |
|   11 | m    |  90000 |    2 | 45000 | 50000 |

40000 |
|   12 | f    |  50000 |    1 | 50000 | 50000 |

50000 |
|   13 | f    | 170000 |    2 | 85000 | 90000 |

80000 |
+------+------+--------+------+-------+-------

+-------+
4 rows in set (0.03 sec)




















































1 comment:

  1. thank for you sharing your blog ,excellent blog good idea big data hadoop
    Hadoop Training In Hyderabad

    ReplyDelete