Data science Software Course Training in Ameerpet Hyderabad

Data science Software Course Training in Ameerpet Hyderabad

Friday, 19 August 2016

Spark Sql and Hql

[cloudera@quickstart ~]$ sudo find / -name 

'hive-site.xml'

[cloudera@quickstart ~]$ sudo chmod -R 777 

/usr/lib/spark/conf
[cloudera@quickstart ~]$ cp 

/etc/hive/conf.dist/hive-site.xml 

/usr/lib/spark/conf
_____________________________________
from hive-site.xml -->  

hive.metastore.warehouse.dir

from spark 2.0.0  onwards above opt is 

depricated
    use following option..

------>   spark.sql.warehouse.dir
_____________________________________________

____  [ tested in cloudera 5.8 
          spark version 1.6.0 ]

[cloudera@quickstart ~]$ls 

/usr/lib/hue/apps/beeswax/data/sample_07.csv

[cloudera@quickstart ~]$ head -n 2 

/usr/lib/hue/apps/beeswax/data/sample_07.csv
_____________________
val hq  = new  

org.apache.spark.sql.hive.HiveContext(sc)

hq.sql("create database sparkdb")

hq.sql("CREATE TABLE sample_07 (code  

 string,description string,total_emp    

int,salary int) ROW FORMAT DELIMITED FIELDS  

TERMINATED BY '\t' STORED AS TextFile")

 [cloudera@quickstart ~]$ hadoop fs -mkdir    

sparks

 [cloudera@quickstart ~]$ hadoop fs -  

copyFromLocal  

/usr/lib/hue/apps/beeswax/data/sample_07.csv  

 sparks
 [cloudera@quickstart ~]$ hadoop fs -ls    

sparks

 hq.sql("LOAD DATA INPATH  

'/user/cloudera/sparks/sample_07.csv'  

OVERWRITE INTO TABLE sample_07")

 val df = hq.sql("SELECT * from sample_07")

__________________________________________
scala> df.filter(df("salary") > 150000).show

()
+-------+--------------------+---------

+------+
|   code|         description|total_emp|

salary|
+-------+--------------------+---------

+------+
|11-1011|    Chief executives|   299160|

151370|
|29-1022|Oral and maxillof...|     5040|

178440|
|29-1023|       Orthodontists|     5350|

185340|
|29-1024|     Prosthodontists|      380|

169360|
|29-1061|   Anesthesiologists|    31030|

192780|
|29-1062|Family and genera...|   113250|

153640|
|29-1063| Internists, general|    46260|

167270|
|29-1064|Obstetricians and...|    21340|

183600|
|29-1067|            Surgeons|    50260|

191410|
|29-1069|Physicians and su...|   237400|

155150|
+-------+--------------------+---------

+------+
____________________________________________


val sqlContext = new 

org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._


[cloudera@quickstart ~]$ gedit json1
[cloudera@quickstart ~]$ hadoop fs -

copyFromLocal json1 sparks
[cloudera@quickstart ~]$ hadoop fs -cat 

sparks/json1
{"name":"Ravi","age":23,"sex":"M"}
{"name":"Rani","age":24,"sex":"F"}
{"name":"Mani","sex":"M"}
{"name":"Vani","age":34}
{"name":"Veni","age":29,"sex":"F"}
[cloudera@quickstart ~]$ 

scala> val df = sqlContext.read.json

("/user/cloudera/sparks/json1")

scala> df.show()
+----+----+----+
| age|name| sex|
+----+----+----+
|  23|Ravi|   M|
|  24|Rani|   F|
|null|Mani|   M|
|  34|Vani|null|
|  29|Veni|   F|
+----+----+----+

scala> df.printSchema()
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)
 |-- sex: string (nullable = true)

scala> df.select("name").show()
+----+
|name|
+----+
|Ravi|
|Rani|
|Mani|
|Vani|
|Veni|
+----+


scala> df.select("age").show()
+----+
| age|
+----+
|  23|
|  24|
|null|
|  34|
|  29|
+----+


scala> 



scala> df.select("name","age").show()
+----+----+
|name| age|
+----+----+
|Ravi|  23|
|Rani|  24|
|Mani|null|
|Vani|  34|
|Veni|  29|
+----+----+


scala> df.select("name","sex").show()
+----+----+
|name| sex|
+----+----+
|Ravi|   M|
|Rani|   F|
|Mani|   M|
|Vani|null|
|Veni|   F|
+----+----+


scala> 


scala> df.select(df("name"),df

("age")+10).show()
+----+----------+
|name|(age + 10)|
+----+----------+
|Ravi|        33|
|Rani|        34|
|Mani|      null|
|Vani|        44|
|Veni|        39|
+----+----------+

scala> df.filter(df("age")<34).show()
+---+----+---+
|age|name|sex|
+---+----+---+
| 23|Ravi|  M|
| 24|Rani|  F|
| 29|Veni|  F|
+---+----+---+

scala> df.filter(df("age")>=5 && df("age")

<30).show()
+---+----+---+
|age|name|sex|
+---+----+---+
| 23|Ravi|  M|
| 24|Rani|  F|
| 29|Veni|  F|
+---+----+---+

scala> df.groupBy("age").count().show()
+----+-----+                                  

                                  
| age|count|
+----+-----+
|  34|    1|
|null|    1|
|  23|    1|
|  24|    1|
|  29|    1|
+----+-----+


scala> df.groupBy("sex").count().show()
+----+-----+                                  

                                  
| sex|count|
+----+-----+
|   F|    2|
|   M|    2|
|null|    1|
+----+-----+


scala> 
scala> df.registerTempTable("df")

scala> sqlContext.sql("select * from 

df").collect.foreach(println)

[23,Ravi,M]
[24,Rani,F]
[null,Mani,M]
[34,Vani,null]
[29,Veni,F]

scala> val mm = sqlContext.sql("select * from 

df")
mm: org.apache.spark.sql.DataFrame = [age: 

bigint, name: string, sex: string]

scala> mm.registerTempTable("mm")

scala> sqlContext.sql("select * from 

mm").collect.foreach(println)
[23,Ravi,M]
[24,Rani,F]
[null,Mani,M]
[34,Vani,null]
[29,Veni,F]

scala> mm.show()
+----+----+----+
| age|name| sex|
+----+----+----+
|  23|Ravi|   M|
|  24|Rani|   F|
|null|Mani|   M|
|  34|Vani|null|
|  29|Veni|   F|
+----+----+----+


scala> val x = mm
x: org.apache.spark.sql.DataFrame = [age: 

bigint, name: string, sex: string]

scala> 

cala> val aggr1 = df.groupBy("sex").agg( max

("age"), min("age"))
aggr1: org.apache.spark.sql.DataFrame = [sex: 

string, max(age): bigint, min(age): bigint]

scala> aggr1.collect.foreach(println)
[F,29,24]                                     

                                  
[M,23,23]
[null,34,34]

scala> aggr1.show()
+----+--------+--------+                      

                                  
| sex|max(age)|min(age)|
+----+--------+--------+
|   F|      29|      24|
|   M|      23|      23|
|null|      34|      34|
+----+--------+--------+


scala> 

____________________

  ex:
[cloudera@quickstart ~]$ cat > emp1
101,aaa,30000,m,11
102,bbbb,40000,f,12
103,cc,60000,m,12
104,dd,80000,f,11
105,cccc,90000,m,12        
[cloudera@quickstart ~]$ cat > emp2
201,dk,90000,m,11
202,mm,100000,f,12
203,mmmx,80000,m,12
204,vbvb,70000,f,11
[cloudera@quickstart ~]$ hadoop fs -

copyFromLocal emp1 sparklab
[cloudera@quickstart ~]$ hadoop fs -

copyFromLocal emp2 sparklab
[cloudera@quickstart ~]$ 

scala> val emp1 = sc.textFile

("/user/cloudera/sparklab/emp1")


scala> val emp2 = sc.textFile

("/user/cloudera/sparklab/emp2")


scala> case class Emp(id:Int, name:String, 
     |    sal:Int, sex:String, dno:Int)


scala> def toEmp(x:String) = {
     |   val  w = x.split(",")
     |   Emp(w(0).toInt, 
     |        w(1), w(2).toInt, 
     |       w(3), w(4).toInt)
     | }
toEmp: (x: String)Emp

scala> val e1 = emp1.map(x => toEmp(x))
e1: org.apache.spark.rdd.RDD[Emp] = 

MapPartitionsRDD[43] at map at <console>:37

scala> val e2 = emp2.map(x => toEmp(x))
e2: org.apache.spark.rdd.RDD[Emp] = 

MapPartitionsRDD[44] at map at <console>:37

scala> 

scala> val df1 = e1.toDF
df1: org.apache.spark.sql.DataFrame = [id: 

int, name: string, sal: int, sex: string, 

dno: int]

scala> val df2 = e2.toDF
df2: org.apache.spark.sql.DataFrame = [id: 

int, name: string, sal: int, sex: string, 

dno: int]

scala> 

scala> val df = sqlContext.sql("select * from 

df1 union all  select * from df2")


scala> val res = sqlContext.sql("select sex,  

   sum(sal) as tot, count(*) as cnt 
   from df group by sex")

scala> 
scala> val wrres = res.map(x => x(0)+","+x

(1)+","+x(2))

scala> wrres.saveAsTextFile

("/user/cloudera/mytemp")

scala> hq.sql("create database park")
scala> hq.sql("use park")
scala> hq.sql("create table urres(sex string, 
   tot int, cnt int) 
   row format delimited 
    fields terminated by ',' ")

scala> hq.sql("load data inpath 

'/user/cloudera/mytemp/part-00000' into table 

 urres ")

scala> val hiveres = hq.sql("select * from 

urres")

scala> hiveres.show()
_____________________________________



































3 comments: