Data science Software Course Training in Ameerpet Hyderabad

Data science Software Course Training in Ameerpet Hyderabad

Tuesday, 2 May 2017

Spark : Performing grouping Aggregations based on Multiple Keys and saving results


 // performing  aggregations grouping by

multiple columns;

 sql:
   select dno, sex, sum(sal) from emp
        group by dno, sex;

  scala> val data = sc.textFile

("/user/cloudera/spLab/emp")
data: org.apache.spark.rdd.RDD[String] =

/user/cloudera/spLab/emp MapPartitionsRDD[1] at

textFile at <console>:27

scala> data.collect.foreach(println)
101,aaaa,40000,m,11
102,bbbbbb,50000,f,12
103,cccc,50000,m,12
104,dd,90000,f,13
105,ee,10000,m,12
106,dkd,40000,m,12
107,sdkfj,80000,f,13
108,iiii,50000,m,11

scala> val arr = data.map(_.split(","))
arr: org.apache.spark.rdd.RDD[Array[String]] =

MapPartitionsRDD[2] at map at <console>:29

scala> arr.collect
res1: Array[Array[String]] = Array(Array(101,

aaaa, 40000, m, 11), Array(102, bbbbbb, 50000,

f, 12), Array(103, cccc, 50000, m, 12), Array

(104, dd, 90000, f, 13), Array(105, ee, 10000,

m, 12), Array(106, dkd, 40000, m, 12), Array

(107, sdkfj, 80000, f, 13), Array(108, iiii,

50000, m, 11))

scala>
scala> val pair = arr.map(x =>  ( (x(4),x(3)) ,

x(2).toInt)     )
pair: org.apache.spark.rdd.RDD[((String,

String), Int)] = MapPartitionsRDD[3] at map at

<console>:31

scala> pair.collect.foreach(println)
((11,m),40000)
((12,f),50000)
((12,m),50000)
((13,f),90000)
((12,m),10000)
((12,m),40000)
((13,f),80000)
((11,m),50000)

scala>
//or

 val pair = data.map{ x =>
             val w = x.split(",")
             val dno = w(4)
             val sex = w(3)
             val sal = w(2).toInt
             val mykey = (dno,sex)
             val p = (mykey , sal)
             p
           }


scala> val res = pair.reduceByKey(_+_)

scala> res.collect.foreach(println)
((12,f),50000)
((13,f),170000)
((12,m),100000)
((11,m),90000)

scala> val r = res.map(x =>

(x._1._1,x._1._2,x._2) )

scala> r.collect.foreach(println)
(12,f,50000)
(13,f,170000)
(12,m,100000)
(11,m,90000)

-------------------------------------
  spark reduceByKey() allows only single key for

grouping.
  when you want grouping by multiple columns,
  make multiple columns as a tuple,
   keep the tuple as key in the pair.

---------------------------------------
sql:--> multi grouping and multi aggregations.

 select  dno, sex, sum(sal), count(*),
      avg(sal) , max(sal), min(sal) from emp
        group by dno, sex;

scala> val grp = pair.groupByKey()
grp: org.apache.spark.rdd.RDD[((String, String),

Iterable[Int])] = ShuffledRDD[7] at groupByKey

at <console>:31

scala> grp.collect.foreach(println)
((12,f),CompactBuffer(50000))
((13,f),CompactBuffer(90000, 80000))
((12,m),CompactBuffer(50000, 10000, 40000))
((11,m),CompactBuffer(40000, 50000))

scala> val agr = grp.map{ x =>
             val dno = x._1._1
            val sex = x._1._2
           val cb = x._2
           val tot = cb.sum
           val cnt = cb.size
           val avg = (tot/cnt).toInt
           val max = cb.max
          val min = cb.min
          val r = (dno,sex,tot,cnt,avg,max,min)
        r
      }
agr: org.apache.spark.rdd.RDD[(String, String,

Int, Int, Int, Int, Int)] = MapPartitionsRDD[8]

at map at <console>:37

scala>

scala> agr.collect.foreach(println)
(12,f,50000,1,50000,50000,50000)
(13,f,170000,2,85000,90000,80000)
(12,m,100000,3,33333,50000,10000)
(11,m,90000,2,45000,50000,40000)

scala> // to save results into file.

agr.saveAsTextFile("/user/cloudera/spLab/res1")

[cloudera@quickstart ~]$ hadoop fs -ls spLab
Found 2 items
-rw-r--r--   1 cloudera cloudera        158

2017-05-01 20:17 spLab/emp
drwxr-xr-x   - cloudera cloudera          0

2017-05-02 20:29 spLab/res1
[cloudera@quickstart ~]$ hadoop fs -ls

spLab/res1
Found 2 items
-rw-r--r--   1 cloudera cloudera          0

2017-05-02 20:29 spLab/res1/_SUCCESS
-rw-r--r--   1 cloudera cloudera        134

2017-05-02 20:29 spLab/res1/part-00000
[cloudera@quickstart ~]$ hadoop fs -cat

spLab/res1/part-00000
(12,f,50000,1,50000,50000,50000)
(13,f,170000,2,85000,90000,80000)
(12,m,100000,3,33333,50000,10000)
(11,m,90000,2,45000,50000,40000)
[cloudera@quickstart ~]$

// here, output is written as tuple shape.
// which is not valid format for hive, rdbms, or

other systems.
// before saving results following

transformation  should be done.


 val r1 = agr.map{ x=>
    x._1+","+x._2+","+x._3+","+
      x._4+","+x._5+","+x._6+","+x._7
   }

scala> val r1 = agr.map{ x=>
     |     x._1+","+x._2+","+x._3+","+
     |       x._4+","+x._5+","+x._6+","+x._7
     |    }
r1: org.apache.spark.rdd.RDD[String] =

MapPartitionsRDD[5] at map at <console>:35

scala> r1.collect.foreach(println)
12,f,50000,1,50000,50000,50000
13,f,170000,2,85000,90000,80000
12,m,100000,3,33333,50000,10000
11,m,90000,2,45000,50000,40000

scala>
// or

scala> val r2 = agr.map{ x =>
     |      val dno = x._1
     |      val sex = x._2
     |      val tot = x._3
     |      val cnt = x._4
     |      val avg = x._5
     |      val max = x._6
     |     val min = x._7
     |  Array(dno,sex,tot.toString,cnt.toString,
     |        avg.toString, max.toString,

min.toString).mkString("\t")
     | }
r2: org.apache.spark.rdd.RDD[String] =

MapPartitionsRDD[6] at map at <console>:35

scala> r2.collect.foreach(println)
12 f 50000 1 50000 50000

50000
13 f 170000 2 85000 90000

80000
12 m 100000 3 33333 50000

10000
11 m 90000 2 45000 50000

40000

scala>

[cloudera@quickstart ~]$ hadoop fs -ls

spLab/res2
Found 2 items
-rw-r--r--   1 cloudera cloudera          0

2017-05-02 20:44 spLab/res2/_SUCCESS
-rw-r--r--   1 cloudera cloudera        126

2017-05-02 20:44 spLab/res2/part-00000
[cloudera@quickstart ~]$ hadoop fs -cat

spLab/res2/part-00000
12 f 50000 1 50000 50000

50000
13 f 170000 2 85000 90000

80000
12 m 100000 3 33333 50000

10000
11 m 90000 2 45000 50000

40000
[cloudera@quickstart ~]$


-- this results , can be directly exported into

rdbms.

[cloudera@quickstart ~]$ mysql -u root
                              -pcloudera

mysql> create database spres;
Query OK, 1 row affected (0.03 sec)

mysql> use spres;
Database changed
mysql> create table summary(dno int,  sex char

(1),
    ->     tot int  , cnt int, avg int, max int,

min int);
Query OK, 0 rows affected (0.10 sec)

mysql> select * from summary;
Empty set (0.00 sec)

mysql>

[cloudera@quickstart ~]$ sqoop export  --connect

jdbc:mysql://localhost/spres --username root --

password cloudera --table summary  --export-dir

'/user/cloudera/spLab/res2/part-00000' --input-

fields-terminated-by '\t'


to use spark written results by hive.

hive> create table info(dno int, sex string,
    tot int, cnt int, avg int, max int,
        min int)
     row format delimited
      fields terminated by '\t';
hive> load data

'/user/cloudera/spLab/res2/part-00000' into

table info;

mysql> select * from summary;
+------+------+--------+------+-------+-------

+-------+
| dno  | sex  | tot    | cnt  | avg   | max   |

min   |
+------+------+--------+------+-------+-------

+-------+
|   12 | m    | 100000 |    3 | 33333 | 50000 |

10000 |
|   11 | m    |  90000 |    2 | 45000 | 50000 |

40000 |
|   12 | f    |  50000 |    1 | 50000 | 50000 |

50000 |
|   13 | f    | 170000 |    2 | 85000 | 90000 |

80000 |
+------+------+--------+------+-------+-------

+-------+
4 rows in set (0.03 sec)




















































2 comments:

  1. thank for you sharing your blog ,excellent blog good idea big data hadoop
    Hadoop Training In Hyderabad

    ReplyDelete
  2. Thanks alot for sharing blog..
    Its really helping us.Excellent teaching skills Mr.SriRam.

    ReplyDelete