Data science Software Course Training in Ameerpet Hyderabad

Data science Software Course Training in Ameerpet Hyderabad

Wednesday, 3 May 2017

Pig : Entire Column Aggregations


 Entire column aggregations.

 select sum(sal) from emp;

grunt> describe emp
emp: {id: int,name: chararray,sal: int,sex: chararray,dno: int}
grunt> esal = foreach emp generate sal;
grunt> rsum = foreach esal generate SUM(sal) as tot;

 -- ABOVE is invalid.
 bcoz, pig aggregated functions should be applied only inner bags.

solution:
way1:

 grunt> e = foreach emp generate
           'ventech' as org, sal;
grunt> grp1 = group e by org;

grunt> illustrate grp1
---------------------------------------------------------------------------------------------
| emp     | id:int     | name:chararray     | sal:int     | sex:chararray     | dno:int     |
---------------------------------------------------------------------------------------------
|         | 107        | sdkfj              | 80000       | f                 | 13          |
|         | 103        | cccc               | 50000       | m                 | 12          |
---------------------------------------------------------------------------------------------
-------------------------------------------
| e     | org:chararray     | sal:int     |
-------------------------------------------
|       | ventech           | 80000       |
|       | ventech           | 50000       |
-------------------------------------------
-----------------------------------------------------------------------------------------
| grp1     | group:chararray     | e:bag{:tuple(org:chararray,sal:int)}                 |
-----------------------------------------------------------------------------------------
|          | ventech             | {(ventech, 80000), (ventech, 50000)}                 |
-----------------------------------------------------------------------------------------

grunt>

difference between describe and illustrate:

 describe gives only schema of given relation.

 illustrate gives,
   entire heirarchy of data flow along with schema and sample data.

 --used for debugging.

grunt> dump grp1

(ventech,{(ventech,50000),(ventech,80000),(ventech,40000),(ventech,10000),(ventech,90000),(ventech,50000),(ventech,50000),(ventech,40000)})

grunt> esum = foreach grp1 generate
             SUM(e.sal) as tot;

grunt> eall = foreach grp1 generate
      SUM(e.sal) as tot,
       MAX(e.sal) as max;

grunt> dump eall
(410000,90000)

note: pig needs inner bag, to perform aggregations. when we group the data,
   inner bags will be produced.
 but our task does not require any grouping field. but its mandatory for pig.

 solution:
   provide a Constant as key(grouping column).
   and group it by the key.

 e = foreach emp generate 'ventech' as org, sal;

 here , for all the rows(tuples)
   org value is constant.

 when you group it by 'org' field,
  all tuples will be formed as one inner bag.



way2:

grunt> e = foreach emp generate sal;
grunt> grp2 = group e all;
grunt> describe grp2;
grp2: {group: chararray,e: {(sal: int)}}
grunt> dump grp2

(all,{(50000),(80000),(40000),(10000),(90000),(50000),(50000),(40000)})

grunt> rall = foreach grp2 generate
             SUM(e.sal) as tot,
               MAX(e.sal) as max,
               COUNT(e) as cnt;
grunt> illustrate rall

--------------------------------------------





















Pig : How to perform grouping by Multiple Columns


 how to perform grouping by multiple columns.
-------------------------------------------
 task: mutiple grouping with mulitiple aggregations .

 sql:
  select dno, sex , sum(sal) ,
         count(*), avg(sal), max(sal),
           min(sal) from emp
         group by dno, sex;

grunt> emp = load 'piglab/emp' using PigStorage(',')
>>   as (id:int, name:chararray,  sal:int,
>>     sex:chararray, dno:int);
grunt>

grunt> e = foreach emp generate dno, sex, sal;
grunt> grp = group e by dno, sex;
2017-05-03 18:54:14,501 [main] ERROR org.apache.pig.tools.grunt.Grunt - ERROR 1200: <line 7, column 25>  Syntax error, unexpected symbol at or near ';'
Details at logfile: /home/cloudera/pig_1493862740272.log
grunt>

above grouping is invalid.
 pig does not allow grouping by multiple fields.


solution:
  make mulitple fields as a tuple.
  and group it by the tuple field.

grunt> grp = group e by (dno,sex);
grunt> describe grp;
grp: {group: (dno: int,sex: chararray),e: {(dno: int,sex: chararray,sal: int)}}
grunt> dump grp
((11,m),{(11,m,50000),(11,m,40000)})
((12,f),{(12,f,50000)})
((12,m),{(12,m,40000),(12,m,10000),(12,m,50000)})
((13,f),{(13,f,80000),(13,f,90000)})

grunt> res = foreach grp generate
>>         group.dno as dno,
>>        group.sex as gender,
>>        SUM(e.sal) as tot,
>>        COUNT(e) as cnt,
>>       AVG(e.sal) as avg,
>>       MAX(e.sal) as max, MIN(e.sal) as min;
grunt>dump res
(11,m,90000,2,45000.0,50000,40000)
(12,f,50000,1,50000.0,50000,50000)
(12,m,100000,3,33333.333333333336,50000,10000)
(13,f,170000,2,85000.0,90000,80000)
-----------------------------------------












Pig : Data types and Operators

 Data types:

  simple data types:
 ---------------------
   int --> 32 bit integer.
   long ---> 64 bit "
   float --> 32 bit float [ not available in
             latest version ]
   double --> 64 bit
 
   boolean --> true/false
 
  -------------------------
 complex data types:
 -------------------
  tuple.
  bag:



 name : chararray [ older versions 0.7 before,
              2gb is length
      later 4 gb is length ]
       ----> variable length.

 age : int
 sal : double.
 sex : chararry

 wife: tuple: (rani,24,hyd)
 children : bag : {(sony,4,m),(tony,2,f)}

 sample tuples of a outerbag.

        profiles
-------------------------------
 (Ravi, 26, M, (rani,24,hyd),{(sony,4,m),     (tony,2,f)})
   :
   :
------------------------------------------
 pig latin statement:
-----------------------
 structure

 <Alias of Relation> = <Operator along with expressions >

  these expressions will be changing from one operator to another.

 1) load
 2) describe
 3) dump
 4) store
 5) foreach
 6) filter
 7) limit
 8) sample
 10) group
 11) cogroup
 12) union
 13) join
 14) left outer join
 15) right outer join
 16) full outer join
 17) cross
 18) pig
 19) exec
 20) run
 21) illustrate

load:
------
  to load data from
   file to pig relation.
   [ logical load ]

  A = load 'file1' using PigStorage(',')
      as (a:int, b:int, c:int);

  here A is alias of relation.
 
  standard: use Capital letters for relation name.

   the input file can be ,
  hdfs / local file.
  that depends on start-up mode of pig .
------------------
 describe :
  --> to get the schema of a relation.

 grunt> describe emp ;

dump:-- to execute data flow
.

  A = load 'file1' ......
  B = foreach A generate ...
  C = foreach B generate ...
  D = group C by ....
  E = foreach D generate ....

 grunt> dump E
 ---> the flow will be executed from root relation. and writes output into console..

store:-- to execute data flow,
    and writes output into file.

  the file can be local/hdfs depends on start up mode.

grunt> store E into '/user/cloudera/myresults';
 ---> myresults will be output directory.
   file s are prefixed with
   'part-m-<number>'  --> output written by mapper
   or 'part-r-<number>' --> output written by reducer.

limit:-
------------
  ---> to get first n number of tuples.
 grunt> X = limit A 3;

  --> to get last n number of tuples.
  two solutions:
  i) udf
 ii) joins ;.

-----------------------------------
 Sample:
 -------
   to get random samples.

   two types of smpling  techniques.
 i) sampling with out replacement.
   ---> different sample sets,
    dont have common elements (tuples)
    solution: Hive Bucketing.

 ii) sampling with replacement:
  ---> different sample sets,
   can have common tuples.

  soluting: pig Sampling.

 grunt> s1 = sample  products  0.05;
grunt> s2 = sample products 0.05;
grunt> s3 = sample products 0.05;


 filter:
   to filter tuples based on given criteria.

 males = filter emp by (sex='m');

3 ways to create subsets:
 i)limit ii) sample iii)filter


 foreach:
 --------
     to process a tuple.
 
   i) to filter fieds.
  ii) to copy data from one relation to another.
 iii) to change field orders
  iv) to rename the fields.
  v) changing field data types.
  vi) performing transformations.
      with given expressiosn.
  vii) conditional transformations.

 ETL
   extract transform loading.
 
  extracting from databases.
  performing transformations,
  loading into target systems.
  ---> above is bad approach for bigdata.

ELT is recommended for big data.

 E -->? extracting from rdbms, using sqoop
 L --> load into hdfs.
 T --> transform using Pig/hive/mr/spark.








   








































Tuesday, 2 May 2017

Spark : Performing grouping Aggregations based on Multiple Keys and saving results


 // performing  aggregations grouping by

multiple columns;

 sql:
   select dno, sex, sum(sal) from emp
        group by dno, sex;

  scala> val data = sc.textFile

("/user/cloudera/spLab/emp")
data: org.apache.spark.rdd.RDD[String] =

/user/cloudera/spLab/emp MapPartitionsRDD[1] at

textFile at <console>:27

scala> data.collect.foreach(println)
101,aaaa,40000,m,11
102,bbbbbb,50000,f,12
103,cccc,50000,m,12
104,dd,90000,f,13
105,ee,10000,m,12
106,dkd,40000,m,12
107,sdkfj,80000,f,13
108,iiii,50000,m,11

scala> val arr = data.map(_.split(","))
arr: org.apache.spark.rdd.RDD[Array[String]] =

MapPartitionsRDD[2] at map at <console>:29

scala> arr.collect
res1: Array[Array[String]] = Array(Array(101,

aaaa, 40000, m, 11), Array(102, bbbbbb, 50000,

f, 12), Array(103, cccc, 50000, m, 12), Array

(104, dd, 90000, f, 13), Array(105, ee, 10000,

m, 12), Array(106, dkd, 40000, m, 12), Array

(107, sdkfj, 80000, f, 13), Array(108, iiii,

50000, m, 11))

scala>
scala> val pair = arr.map(x =>  ( (x(4),x(3)) ,

x(2).toInt)     )
pair: org.apache.spark.rdd.RDD[((String,

String), Int)] = MapPartitionsRDD[3] at map at

<console>:31

scala> pair.collect.foreach(println)
((11,m),40000)
((12,f),50000)
((12,m),50000)
((13,f),90000)
((12,m),10000)
((12,m),40000)
((13,f),80000)
((11,m),50000)

scala>
//or

 val pair = data.map{ x =>
             val w = x.split(",")
             val dno = w(4)
             val sex = w(3)
             val sal = w(2).toInt
             val mykey = (dno,sex)
             val p = (mykey , sal)
             p
           }


scala> val res = pair.reduceByKey(_+_)

scala> res.collect.foreach(println)
((12,f),50000)
((13,f),170000)
((12,m),100000)
((11,m),90000)

scala> val r = res.map(x =>

(x._1._1,x._1._2,x._2) )

scala> r.collect.foreach(println)
(12,f,50000)
(13,f,170000)
(12,m,100000)
(11,m,90000)

-------------------------------------
  spark reduceByKey() allows only single key for

grouping.
  when you want grouping by multiple columns,
  make multiple columns as a tuple,
   keep the tuple as key in the pair.

---------------------------------------
sql:--> multi grouping and multi aggregations.

 select  dno, sex, sum(sal), count(*),
      avg(sal) , max(sal), min(sal) from emp
        group by dno, sex;

scala> val grp = pair.groupByKey()
grp: org.apache.spark.rdd.RDD[((String, String),

Iterable[Int])] = ShuffledRDD[7] at groupByKey

at <console>:31

scala> grp.collect.foreach(println)
((12,f),CompactBuffer(50000))
((13,f),CompactBuffer(90000, 80000))
((12,m),CompactBuffer(50000, 10000, 40000))
((11,m),CompactBuffer(40000, 50000))

scala> val agr = grp.map{ x =>
             val dno = x._1._1
            val sex = x._1._2
           val cb = x._2
           val tot = cb.sum
           val cnt = cb.size
           val avg = (tot/cnt).toInt
           val max = cb.max
          val min = cb.min
          val r = (dno,sex,tot,cnt,avg,max,min)
        r
      }
agr: org.apache.spark.rdd.RDD[(String, String,

Int, Int, Int, Int, Int)] = MapPartitionsRDD[8]

at map at <console>:37

scala>

scala> agr.collect.foreach(println)
(12,f,50000,1,50000,50000,50000)
(13,f,170000,2,85000,90000,80000)
(12,m,100000,3,33333,50000,10000)
(11,m,90000,2,45000,50000,40000)

scala> // to save results into file.

agr.saveAsTextFile("/user/cloudera/spLab/res1")

[cloudera@quickstart ~]$ hadoop fs -ls spLab
Found 2 items
-rw-r--r--   1 cloudera cloudera        158

2017-05-01 20:17 spLab/emp
drwxr-xr-x   - cloudera cloudera          0

2017-05-02 20:29 spLab/res1
[cloudera@quickstart ~]$ hadoop fs -ls

spLab/res1
Found 2 items
-rw-r--r--   1 cloudera cloudera          0

2017-05-02 20:29 spLab/res1/_SUCCESS
-rw-r--r--   1 cloudera cloudera        134

2017-05-02 20:29 spLab/res1/part-00000
[cloudera@quickstart ~]$ hadoop fs -cat

spLab/res1/part-00000
(12,f,50000,1,50000,50000,50000)
(13,f,170000,2,85000,90000,80000)
(12,m,100000,3,33333,50000,10000)
(11,m,90000,2,45000,50000,40000)
[cloudera@quickstart ~]$

// here, output is written as tuple shape.
// which is not valid format for hive, rdbms, or

other systems.
// before saving results following

transformation  should be done.


 val r1 = agr.map{ x=>
    x._1+","+x._2+","+x._3+","+
      x._4+","+x._5+","+x._6+","+x._7
   }

scala> val r1 = agr.map{ x=>
     |     x._1+","+x._2+","+x._3+","+
     |       x._4+","+x._5+","+x._6+","+x._7
     |    }
r1: org.apache.spark.rdd.RDD[String] =

MapPartitionsRDD[5] at map at <console>:35

scala> r1.collect.foreach(println)
12,f,50000,1,50000,50000,50000
13,f,170000,2,85000,90000,80000
12,m,100000,3,33333,50000,10000
11,m,90000,2,45000,50000,40000

scala>
// or

scala> val r2 = agr.map{ x =>
     |      val dno = x._1
     |      val sex = x._2
     |      val tot = x._3
     |      val cnt = x._4
     |      val avg = x._5
     |      val max = x._6
     |     val min = x._7
     |  Array(dno,sex,tot.toString,cnt.toString,
     |        avg.toString, max.toString,

min.toString).mkString("\t")
     | }
r2: org.apache.spark.rdd.RDD[String] =

MapPartitionsRDD[6] at map at <console>:35

scala> r2.collect.foreach(println)
12 f 50000 1 50000 50000

50000
13 f 170000 2 85000 90000

80000
12 m 100000 3 33333 50000

10000
11 m 90000 2 45000 50000

40000

scala>

[cloudera@quickstart ~]$ hadoop fs -ls

spLab/res2
Found 2 items
-rw-r--r--   1 cloudera cloudera          0

2017-05-02 20:44 spLab/res2/_SUCCESS
-rw-r--r--   1 cloudera cloudera        126

2017-05-02 20:44 spLab/res2/part-00000
[cloudera@quickstart ~]$ hadoop fs -cat

spLab/res2/part-00000
12 f 50000 1 50000 50000

50000
13 f 170000 2 85000 90000

80000
12 m 100000 3 33333 50000

10000
11 m 90000 2 45000 50000

40000
[cloudera@quickstart ~]$


-- this results , can be directly exported into

rdbms.

[cloudera@quickstart ~]$ mysql -u root
                              -pcloudera

mysql> create database spres;
Query OK, 1 row affected (0.03 sec)

mysql> use spres;
Database changed
mysql> create table summary(dno int,  sex char

(1),
    ->     tot int  , cnt int, avg int, max int,

min int);
Query OK, 0 rows affected (0.10 sec)

mysql> select * from summary;
Empty set (0.00 sec)

mysql>

[cloudera@quickstart ~]$ sqoop export  --connect

jdbc:mysql://localhost/spres --username root --

password cloudera --table summary  --export-dir

'/user/cloudera/spLab/res2/part-00000' --input-

fields-terminated-by '\t'


to use spark written results by hive.

hive> create table info(dno int, sex string,
    tot int, cnt int, avg int, max int,
        min int)
     row format delimited
      fields terminated by '\t';
hive> load data

'/user/cloudera/spLab/res2/part-00000' into

table info;

mysql> select * from summary;
+------+------+--------+------+-------+-------

+-------+
| dno  | sex  | tot    | cnt  | avg   | max   |

min   |
+------+------+--------+------+-------+-------

+-------+
|   12 | m    | 100000 |    3 | 33333 | 50000 |

10000 |
|   11 | m    |  90000 |    2 | 45000 | 50000 |

40000 |
|   12 | f    |  50000 |    1 | 50000 | 50000 |

50000 |
|   13 | f    | 170000 |    2 | 85000 | 90000 |

80000 |
+------+------+--------+------+-------+-------

+-------+
4 rows in set (0.03 sec)




















































Demo of Pig Grouping Aggregations

Demo of grouping aggregations on structured

data.
-----------------------------------


[cloudera@quickstart ~]$ hadoop fs -mkdir piglab
[cloudera@quickstart ~]$ hadoop fs -

copyFromLocal emp piglab
[cloudera@quickstart ~]$ hadoop fs -ls piglab
Found 1 items
-rw-r--r--   1 cloudera cloudera        158

2017-05-02 18:55 piglab/emp
[cloudera@quickstart ~]$

[cloudera@quickstart ~]$ hadoop fs -cat

piglab/emp
101,aaaa,40000,m,11
102,bbbbbb,50000,f,12
103,cccc,50000,m,12
104,dd,90000,f,13
105,ee,10000,m,12
106,dkd,40000,m,12
107,sdkfj,80000,f,13
108,iiii,50000,m,11
[cloudera@quickstart ~]$

[cloudera@quickstart ~]$ pig
grunt> emp = load 'piglab/emp' using PigStorage

(',')
>>      as (id:int, name:chararray, sal:int,
>>   sex:chararray, dno:int);
grunt>dump emp
(101,aaaa,40000,m,11)
(102,bbbbbb,50000,f,12)
(103,cccc,50000,m,12)
(104,dd,90000,f,13)
(105,ee,10000,m,12)
(106,dkd,40000,m,12)
(107,sdkfj,80000,f,13)
(108,iiii,50000,m,11)

required task: sex based sum aggr on sal.
sql--> select sex,sum(sal) from emp
          group by sex;

grunt> e = foreach emp generate sex, sal;
grunt> describe e;
e: {sex: chararray,sal: int}
grunt> grp = group e by sex;
grunt> describe grp
grp: {group: chararray,e: {(sex: chararray,sal:

int)}}
grunt> dump grp

(f,{(f,80000),(f,90000),(f,50000)})
(m,{(m,50000),(m,40000),(m,10000),(m,50000),

(m,40000)})

grunt> res1 = foreach grp generate
>>           group as sex, SUM(e.sal) as tot;
grunt> dump res1

(f,220000)
(m,190000)

sql--> select sex, avg(sal) from emp group by sex;
grunt> res2 = foreach grp generate
     group as sex, AVG(e.sal) as avg;


sql--> select sex, max(sal) from emp group by sex;

grunt> res3 = foreach grp generate
    group as sex, MAX(e.sal) as max;

sql--> select sex, min(sal) from emp group by sex;

grunt> res4 = foreach grp generate
     group as sex, MIN(e.sal) as min;

sql--> select sex, count(*) from emp group by sex;

grunt> res5 = foreach grp generate
    group as sex, COUNT(e) as cnt;

sql--> select sex, sum(sal) as tot,
       count(*) as cnt, avg(sal) as avg,
        max(sal) as max, min(sal) as min
    from emp group by sex;

grunt> res6 = foreach grp generate
   group as sex, SUM(e.sal) as tot,
       COUNT(e) as cnt, AVG(e.sal) as avg,
        MAX(e.sal) as max,
          MIN(e.sal) as min;
grunt> dump res6
(f,220000,3,73333.33333333333,90000,50000)
(m,190000,5,38000.0,50000,10000)

grunt> store res6 into 'piglab/results1';

grunt> ls piglab
hdfs://quickstart.cloudera:8020/user/cloudera/piglab/emp<r 1> 158
hdfs://quickstart.cloudera:8020/user/cloudera/piglab/results1 <dir>
grunt> ls piglab/results1
hdfs://quickstart.cloudera:8020/user/cloudera/piglab/results1/_SUCCESS<r 1> 0
hdfs://quickstart.cloudera:8020/user/cloudera/piglab/results1/part-r-00000<r 1> 72
grunt> cat piglab/results1/part-r-00000
f 220000 3 73333.33333333333 90000 50000
m 190000 5 38000.0 50000 10000
grunt>

grunt> store res6 into 'piglab/results2'
>>    using PigStorage(',');

grunt> cat piglab/results2/part-r-00000
f,220000,3,73333.33333333333,90000,50000
m,190000,5,38000.0,50000,10000
grunt>























Monday, 1 May 2017

Spark Grouping Aggregations


demo grouping aggregations on structured data.
----------------------------------------------
[cloudera@quickstart ~]$ ls emp
emp
[cloudera@quickstart ~]$ cat emp
101,aaaa,40000,m,11
102,bbbbbb,50000,f,12
103,cccc,50000,m,12
104,dd,90000,f,13
105,ee,10000,m,12
106,dkd,40000,m,12
107,sdkfj,80000,f,13
108,iiii,50000,m,11
[cloudera@quickstart ~]$ hadoop fs -ls spLab
ls: `spLab': No such file or directory
[cloudera@quickstart ~]$ hadoop fs -mkdir spLab
[cloudera@quickstart ~]$ hadoop fs -

copyFromLocal emp spLab


scala> val data = sc.textFile

("/user/cloudera/spLab/emp")
data: org.apache.spark.rdd.RDD[String] =

/user/cloudera/spLab/emp MapPartitionsRDD[1] at

textFile at <console>:27

scala> data.collect.foreach(println)
101,aaaa,40000,m,11
102,bbbbbb,50000,f,12
103,cccc,50000,m,12
104,dd,90000,f,13
105,ee,10000,m,12
106,dkd,40000,m,12
107,sdkfj,80000,f,13
108,iiii,50000,m,11

scala>

scala> val arr = data.map(_.split(","))
arr: org.apache.spark.rdd.RDD[Array[String]] =

MapPartitionsRDD[2] at map at <console>:29

scala> arr.collect
res1: Array[Array[String]] = Array(Array(101,

aaaa, 40000, m, 11), Array(102, bbbbbb, 50000,

f, 12), Array(103, cccc, 50000, m, 12), Array

(104, dd, 90000, f, 13), Array(105, ee, 10000,

m, 12), Array(106, dkd, 40000, m, 12), Array

(107, sdkfj, 80000, f, 13), Array(108, iiii,

50000, m, 11))

scala>

scala> val pair1 = arr.map(x => (x(3), x

(2).toInt) )
pair1: org.apache.spark.rdd.RDD[(String, Int)] =

MapPartitionsRDD[3] at map at <console>:31

scala> // or

scala> val pair1 = arr.map{ x =>
     |      val sex = x(3)
     |      val sal = x(2).toInt
     |     (sex, sal)
     | }
pair1: org.apache.spark.rdd.RDD[(String, Int)] =

MapPartitionsRDD[4] at map at <console>:31

scala>

scala> pair1.collect.foreach(println)
(m,40000)
(f,50000)
(m,50000)
(f,90000)
(m,10000)
(m,40000)
(f,80000)
(m,50000)

scala>

scala> // select sex, sum(sal) from emp group by

sex

scala> val rsum = pair1.reduceByKey((a,b) => a

+b)
rsum: org.apache.spark.rdd.RDD[(String, Int)] =

ShuffledRDD[5] at reduceByKey at <console>:33

scala> // or

scala> val rsum = pair1.reduceByKey(_+_)
rsum: org.apache.spark.rdd.RDD[(String, Int)] =

ShuffledRDD[6] at reduceByKey at <console>:33

scala> rsum.collect
res3: Array[(String, Int)] = Array((f,220000),

(m,190000))

scala>

// select sex, max(sal) from emp group by sex;

scala> val rmax = pair1.reduceByKey(Math.max

(_,_))
rmax: org.apache.spark.rdd.RDD[(String, Int)] =

ShuffledRDD[7] at reduceByKey at <console>:33

scala> rmax.collect
res4: Array[(String, Int)] = Array((f,90000),

(m,50000))

scala>

// select sex, min(sal) from emp group by sex;

scala> val rmin = pair1.reduceByKey(Math.min

(_,_))
rmin: org.apache.spark.rdd.RDD[(String, Int)] =

ShuffledRDD[8] at reduceByKey at <console>:33

scala> rmin.collect
res5: Array[(String, Int)] = Array((f,50000),

(m,10000))

scala>

// select sex, count(*) from emp
  group by sex

scala> pair1.collect
res6: Array[(String, Int)] = Array((m,40000),

(f,50000), (m,50000), (f,90000), (m,10000),

(m,40000), (f,80000), (m,50000))

scala> pair1.countByKey
res7: scala.collection.Map[String,Long] = Map(f

-> 3, m -> 5)

scala> val pair2 = pair1.map(x => (x._1 ,    1)

)
pair2: org.apache.spark.rdd.RDD[(String, Int)] =

MapPartitionsRDD[11] at map at <console>:33

scala> pair2.collect
res8: Array[(String, Int)] = Array((m,1), (f,1),

(m,1), (f,1), (m,1), (m,1), (f,1), (m,1))

scala> val rcnt = pair2.reduceByKey(_+_)
rcnt: org.apache.spark.rdd.RDD[(String, Int)] =

ShuffledRDD[12] at reduceByKey at <console>:35

scala> rcnt.collect
res9: Array[(String, Int)] = Array((f,3), (m,5))

scala>

// select sex, avg(sal) from emp group by sex;
scala> rsum.collect.foreach(println)
(f,220000)
(m,190000)

scala> rcnt.collect.foreach(println)
(f,3)
(m,5)

scala> val j = rsum.join(rcnt)
j: org.apache.spark.rdd.RDD[(String, (Int,

Int))] = MapPartitionsRDD[15] at join at

<console>:39

scala> j.collect
res12: Array[(String, (Int, Int))] = Array((f,

(220000,3)), (m,(190000,5)))

scala>

scala> j.collect
res13: Array[(String, (Int, Int))] = Array((f,

(220000,3)), (m,(190000,5)))
scala> val ravg = j.map{ x =>
     |             val sex = x._1
     |            val v = x._2
     |            val tot = v._1
     |           val cnt = v._2
     |           val avg = tot/cnt
     |           (sex, avg.toInt)
     |       }
ravg: org.apache.spark.rdd.RDD[(String, Int)] =

MapPartitionsRDD[17] at map at <console>:41

scala> ravg.collect
res15: Array[(String, Int)] = Array((f,73333),

(m,38000))

scala>


// select dno, range(sal) from emp
    group by dno;

  --> range is a difference between max and min.

scala> val pair3 = arr.map(x => ( x(4), x

(2).toInt ) )
pair3: org.apache.spark.rdd.RDD[(String, Int)] =

MapPartitionsRDD[18] at map at <console>:31

scala> pair3.collect.foreach(println)
(11,40000)
(12,50000)
(12,50000)
(13,90000)
(12,10000)
(12,40000)
(13,80000)
(11,50000)

scala>
scala> val dmax = pair3.reduceByKey(Math.max

(_,_))
dmax: org.apache.spark.rdd.RDD[(String, Int)] =

ShuffledRDD[19] at reduceByKey at <console>:33

scala> val dmin = pair3.reduceByKey(Math.min

(_,_))
dmin: org.apache.spark.rdd.RDD[(String, Int)] =

ShuffledRDD[20] at reduceByKey at <console>:33

scala> val dj = dmax.join(dmin)
dj: org.apache.spark.rdd.RDD[(String, (Int,

Int))] = MapPartitionsRDD[23] at join at

<console>:37

scala>  val drange = dj.map{ x =>
     |              val dno = x._1
     |             val max = x._2._1
     |             val min = x._2._2
     |             val r = max-min
     |             (dno, r)
     |       }
drange: org.apache.spark.rdd.RDD[(String, Int)]

= MapPartitionsRDD[25] at map at <console>:39

scala> drange.collect.foreach(println)
(12,40000)
(13,10000)
(11,10000)

scala>

-------------------------------------

scala> // multiple aggregations.

scala> pair1.collect
res18: Array[(String, Int)] = Array((m,40000),

(f,50000), (m,50000), (f,90000), (m,10000),

(m,40000), (f,80000), (m,50000))

scala> val grp = pair1.groupByKey()
grp: org.apache.spark.rdd.RDD[(String, Iterable

[Int])] = ShuffledRDD[26] at groupByKey at

<console>:33

scala> grp.collect
res19: Array[(String, Iterable[Int])] = Array

((f,CompactBuffer(50000, 90000, 80000)),

(m,CompactBuffer(40000, 50000, 10000, 40000,

50000)))

scala> val r1 = grp.map(x => (x._1 , x._2.sum )

)
r1: org.apache.spark.rdd.RDD[(String, Int)] =

MapPartitionsRDD[27] at map at <console>:35

scala> r1.collect.foreach(println)
(f,220000)
(m,190000)

scala>

// select sex, sum(sal), count(*) ,
        avg(sal) , max(sal), min(sal),
              max(sal)-min(sal) as range
    from emp group by sex;

scala> val rall = grp.map{ x =>
     |      val sex = x._1
     |      val cb = x._2
     |      val tot = cb.sum
     |      val cnt = cb.size
     |      val avg = (tot/cnt).toInt
     |      val max = cb.max
     |      val min = cb.min
     |      val r = max-min
     |      (sex,tot,cnt,avg,max,min,r)
     | }
rall: org.apache.spark.rdd.RDD[(String, Int,

Int, Int, Int, Int, Int)] = MapPartitionsRDD[28]

at map at <console>:35

scala> rall.collect.foreach(println)
(f,220000,3,73333,90000,50000,40000)
(m,190000,5,38000,50000,10000,40000)

scala>



Saturday, 8 April 2017

spark sql with json and xml processing

-----------


   Spark Sql
---------------

  [ ]
     Spark sql is a library,
   to process spark data objects,
   using sql select statements.

 Spark sql follows mysql based sql syntaxes.
==============================================
 Spark sql provides,
   two types of contexts.
  i) sqlContext
 ii) HiveContext.

import org.apache.spark.sql.SqlContext

    val sqlCon = new SqlContext(sc)

using sqlContext ,
   we can process spark objects using select statements.

 Using HiveContext,
   we can integrate , Hive with Spark.
 Hive, is data warehouse environment in hadoop framework,
   So total is stored and managed at Hive tables.
  using HiveContext we can access entire hive enviroment (hive tables) from Spark.

  difference between, hql statement from Hive,
  and hql statement from Spark.
--> if hql is executed from Hive Environment,
  the statement to process, will be converted
 as mAPREDUCE job.
 ---> if same hive is integrated with spark,
  and hql is submitted from spark,
    it uses, DAG and Inmemory computing models.
  which is more faster than MapReduce.

 import org.apache.spark.sql.hive.HiveContext

 val hc = new HiveContext(sc)
-----------------------------
 Example of sqlContext.

 val sqc = new SqlContext(sc)

  file name --> file1
  sample --->  100,200,300
               300,400,400
                :
                :
 step1)
  create case class for the data.
 
   case class  Rec(a:Int, b:Int, c:Int)

 step2) create a function ,
   to convert raw line into case object.
  [function to provide schema ]

 def makeRec(line:String)={
        val w = line.split(",")
        val a = w(0).toInt
        val b = w(1).toInt
        val c = w(2).toInt
        val r = Rec(a, b,c)
         r
    }
--------
step3) load data.
   val data = sc.textFile("/user/cloudera/sparklab/file1")

   100,200,300
   2000,340,456
    :
    :


step4) transform each record into case Object

  val recs = data.map(x => makeRec(x))

step5) convert rdd into data frme.
 
  val df = recs.toDF

step6)  create table instance for the dataframe.

   df.registerTempTable("samp")

step7) apply select statement of sql on temp table.

val  r1 = sqc.sql("select a+b+c as tot from samp")

    r1
  ------
   tot
   ----
   600
   900

  r1.registerTempTable(samp1)

val r2 =    sqc.sql("select sum(tot) as gtot from samp1")


   once "select" statement is applied on
 temp table, returned object will be dataframe.


 to apply sql on processed results,
    again we need to register the dataframe
  as temp table.

 r1.registerAsTempTable("Samp2")
 
 val r2 = sqc.sql("select * from samp2
                      where tot>=200")

-----------------------------------

         sales
   --------------------
     :
   12/27/2016,10000,3,10
     :
     :

  -------------------------
 Steps involing in Spark Sql.[sqlContext]
----------------------------
   monthly sales report...
schema ---> date, price, qnt, discount

  step1)
    case class Sales(mon : Int, price:Int,
             qnt :Int, disc: Int)

  step2)
    def toSales(line: String) = {
       val w = line.split(",")
       val mon = w(0).split("/")(0)
       val p = w(1).toInt
       val q = w(2).toInt
       val d = w(3).toInt
       val srec = Sales(mon,p,q,d)
       srec
   }
step3)
 val data = sc.textFile("/user/cloudera/mydata/sales.txt")
step4)
  val strans = data.map(x => toSales(x))
step5)

  val sdf = strans.toDF
  sdf.show

step6)

  sdf.registerTempTable("SalesTrans")

step7) // play with select
  ---> mon, price, qnt, disc

 val res1 = sqlContext.sql("select mon ,
             sum(
             (price -  price*disc/100)*qnt
                ) as tsales from SalesTrans
              group by mon")

 res1.show
 res1.printSchema

-----------------------------------------

 val res2 = res1

 res1.registerTempTable("tab1")
 res2.registerTempTable("tab2")

 val res3 = sqlContext.sql("select l.mon as m1,
     r.mon as m2, l.tsales as t1,
         r.tsales as t2
    from tab1 l join tab2 r
          where (l.mon-r.mon)==1")
//  11 rows.
 res3.registerTempTable("tab3")

------------------------------
 val res4 = sqlContext.sql("select
    m1, m2, t1, t2, ((t2-t1)*100)/t1 as sgrowth
    from tab3")

  res4.show()
------------------------------------------
  json1.json
--------------------------
 {"name":"Ravi","age":25,"city":"Hyd"}
 {"name":"Rani","sex":"F","city":"Del"}
   :
   :
---------------------------------------

    val df = sqlContext.read.json("/user/cloudera/mydata/json1.json")

   df.show
 ------------------------
  name    age    sex    city
 ----------------------------------------
 ravi     25     null hyd
 rani  null   F del
    :
    :
--------------------------------------

 json2.json
 ------------------
 {"name":"Ravi","age":25,
    "wife":{"name":"Rani","age":23},"city":"Hyd"}}
   :
   :

 val df2 = sqlContext.read.json("/../json2.json")

  df2
-----------------------
name     age      wife                  city
Ravi 25    {"name":"rani","age":23}  HYd
    :
---------------------

 df2.registerTempTable("Info")

val df3 = sqlContext.sql("select name,
           wife.name as wname,
           age, wife.age as wage,
          abs(age-wife.age) as diff,
          city from Info")
----------------------------------------
 xml data processing with spark sql.

---spark sql does not have, direct libraries
  for xml processing.
   two ways.
     i) 3 rd party api [ ex: databricks]
    ii) using Hive Integreation.

 2nd is best.

How to integrate Hive with spark .
 ---Using HiveContext.


step1)
   copy hive-site.xml file into,
     /usr/lib/spark/conf directory.

what if , hive-site.xml is not copied into
    conf directory of spark?
--- spark can not understand,
  hive's metastore location [derby/mysql/oracle ....]
    this info is available with hive-site.xml .


 step2)

  create hive Context object

import org.apache.spark.sql.hive.HiveContext
val hc = new HiveContext(sc)

  step3)  access Hive Environment from spark

 hc.sql("create database mydb")
 hc.sql("use mydb")
 hc.sql("create table samp(line string)")
 hc.sql("load data local inpath 'file1'
         into table samp")
 val df = hc.sql("select * from samp")
-------------------------------------
   xml1.xml
----------------------------------
<rec><name>Ravi</name><age>25</age></rec>
<rec><name>Rani</name><sex>F</sex></rec>
 :
 :
------------------------------------------

 hc.sql("create table raw(line string)")
 hc.sql("load data local inpath 'xml1.xml'
   into table raw")
 hc.sql("create table info(name string,
          age int, sex string)")
 hc.sql("insert overwrite table info
    select xpath_string(line,'rec/name'),
       xpath_int(line, 'rec/age'),
      xpath_string(line, 'rec/sex')
      from raw")
----------------------------------------
 xml2.xml
------------
<rec><name><fname>Ravi</fname><lname>kumar</lname><age>24</age><contact><email><personal>ravi@gmail.com</personal><official>ravi@ventech.com</official></email><phone><mobile>12345</mobile><office>123900</office><residence>127845</residence></phone></contact><city>Hyd</city></rec>

hc.sql("create table xraw(line string)")
hc.sql("load data local inpath 'xml2.xml'
   into table xraw")
hc.sql("create table xinfo(fname string ,
   lname string, age int,
    personal_email string,
    official_email string,
    mobile String,
    office_phone string ,
    residence_phone string,
    city string)")

hc.sql("insert overwrite table xinfo
  select
     xpath_string(line,'rec/name/fname'),
     xpath_string(line,'rec/name/lname'),
     xpath_int(line,'rec/age'),
     xpath_string(line,'rec/contact/email/personal'),
  xpath_string(line,'rec/contact/email/official'),
xpath_string(line,'rec/contact/phone/mobile'),
xpath_string(line,'rec/contact/phone/office'),
xpath_string(line,'rec/contact/phone/residence'),
xpath_string(line,'rec/city')
  from xraw")
-------------------------
 xml3.xml
----------------
<tr><cid>101</cid><pr>200</pr><pr>300</pr><pr>300</pr></tr>
<tr><cid>102</cid><pr>400</pr><pr>800</pr></tr>
<tr><cid>101</cid><pr>1000</pr></tr>
--------------------------------

hc.sql("create table sraw")
hc.sql("load data local inpath 'xml3.xml'
    into table sraw")
hc.sql("create table raw2(cid int, pr array<String>)")

hc.sql("insert overwrite table raw2
   select xpath_int(line, 'tr/cid'),
      xpath(line,'tr/pr/text()')
     from sraw")
hc.sql("select * from raw2").show
-------------------------------
cid           pr
101       [100,300,300]
102       [400,800]
101       [1000]

hc.sql("select explode(pr) as price from  raw2").show

    100
    300
    300
    400
    800
   1000

hc.sql("select cid, explode(pr) as price from  raw2").show

----> above is invalid.

hc.sql("create table raw3(cid int, pr int)")
hc.sql("Insert overwrite table raw3
    select name, mypr from raw2
      lateral view explode(pr) p as mypr")

hc.sql("select * from raw3").show

cid     pr
101 200
101 300
101     300
102     400
102     800
101    1000

hc.sql("create table summary(cid int, totbill long)")

hc.sql("insert overwrite table summary
   select cid , sum(pr) from raw3
   group by cid")

--------------------