[cloudera@quickstart ~]$ sudo find / -name
'hive-site.xml'
[cloudera@quickstart ~]$ sudo chmod -R 777
/usr/lib/spark/conf
[cloudera@quickstart ~]$ cp
/etc/hive/conf.dist/hive-site.xml
/usr/lib/spark/conf
_____________________________________
from hive-site.xml -->
hive.metastore.warehouse.dir
from spark 2.0.0 onwards above opt is
depricated
use following option..
------> spark.sql.warehouse.dir
_____________________________________________
____ [ tested in cloudera 5.8
spark version 1.6.0 ]
[cloudera@quickstart ~]$ls
/usr/lib/hue/apps/beeswax/data/sample_07.csv
[cloudera@quickstart ~]$ head -n 2
/usr/lib/hue/apps/beeswax/data/sample_07.csv
_____________________
val hq = new
org.apache.spark.sql.hive.HiveContext(sc)
hq.sql("create database sparkdb")
hq.sql("CREATE TABLE sample_07 (code
string,description string,total_emp
int,salary int) ROW FORMAT DELIMITED FIELDS
TERMINATED BY '\t' STORED AS TextFile")
[cloudera@quickstart ~]$ hadoop fs -mkdir
sparks
[cloudera@quickstart ~]$ hadoop fs -
copyFromLocal
/usr/lib/hue/apps/beeswax/data/sample_07.csv
sparks
[cloudera@quickstart ~]$ hadoop fs -ls
sparks
hq.sql("LOAD DATA INPATH
'/user/cloudera/sparks/sample_07.csv'
OVERWRITE INTO TABLE sample_07")
val df = hq.sql("SELECT * from sample_07")
__________________________________________
scala> df.filter(df("salary") > 150000).show
()
+-------+--------------------+---------
+------+
| code| description|total_emp|
salary|
+-------+--------------------+---------
+------+
|11-1011| Chief executives| 299160|
151370|
|29-1022|Oral and maxillof...| 5040|
178440|
|29-1023| Orthodontists| 5350|
185340|
|29-1024| Prosthodontists| 380|
169360|
|29-1061| Anesthesiologists| 31030|
192780|
|29-1062|Family and genera...| 113250|
153640|
|29-1063| Internists, general| 46260|
167270|
|29-1064|Obstetricians and...| 21340|
183600|
|29-1067| Surgeons| 50260|
191410|
|29-1069|Physicians and su...| 237400|
155150|
+-------+--------------------+---------
+------+
____________________________________________
val sqlContext = new
org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
[cloudera@quickstart ~]$ gedit json1
[cloudera@quickstart ~]$ hadoop fs -
copyFromLocal json1 sparks
[cloudera@quickstart ~]$ hadoop fs -cat
sparks/json1
{"name":"Ravi","age":23,"sex":"M"}
{"name":"Rani","age":24,"sex":"F"}
{"name":"Mani","sex":"M"}
{"name":"Vani","age":34}
{"name":"Veni","age":29,"sex":"F"}
[cloudera@quickstart ~]$
scala> val df = sqlContext.read.json
("/user/cloudera/sparks/json1")
scala> df.show()
+----+----+----+
| age|name| sex|
+----+----+----+
| 23|Ravi| M|
| 24|Rani| F|
|null|Mani| M|
| 34|Vani|null|
| 29|Veni| F|
+----+----+----+
scala> df.printSchema()
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
|-- sex: string (nullable = true)
scala> df.select("name").show()
+----+
|name|
+----+
|Ravi|
|Rani|
|Mani|
|Vani|
|Veni|
+----+
scala> df.select("age").show()
+----+
| age|
+----+
| 23|
| 24|
|null|
| 34|
| 29|
+----+
scala>
scala> df.select("name","age").show()
+----+----+
|name| age|
+----+----+
|Ravi| 23|
|Rani| 24|
|Mani|null|
|Vani| 34|
|Veni| 29|
+----+----+
scala> df.select("name","sex").show()
+----+----+
|name| sex|
+----+----+
|Ravi| M|
|Rani| F|
|Mani| M|
|Vani|null|
|Veni| F|
+----+----+
scala>
scala> df.select(df("name"),df
("age")+10).show()
+----+----------+
|name|(age + 10)|
+----+----------+
|Ravi| 33|
|Rani| 34|
|Mani| null|
|Vani| 44|
|Veni| 39|
+----+----------+
scala> df.filter(df("age")<34).show()
+---+----+---+
|age|name|sex|
+---+----+---+
| 23|Ravi| M|
| 24|Rani| F|
| 29|Veni| F|
+---+----+---+
scala> df.filter(df("age")>=5 && df("age")
<30).show()
+---+----+---+
|age|name|sex|
+---+----+---+
| 23|Ravi| M|
| 24|Rani| F|
| 29|Veni| F|
+---+----+---+
scala> df.groupBy("age").count().show()
+----+-----+
| age|count|
+----+-----+
| 34| 1|
|null| 1|
| 23| 1|
| 24| 1|
| 29| 1|
+----+-----+
scala> df.groupBy("sex").count().show()
+----+-----+
| sex|count|
+----+-----+
| F| 2|
| M| 2|
|null| 1|
+----+-----+
scala>
scala> df.registerTempTable("df")
scala> sqlContext.sql("select * from
df").collect.foreach(println)
[23,Ravi,M]
[24,Rani,F]
[null,Mani,M]
[34,Vani,null]
[29,Veni,F]
scala> val mm = sqlContext.sql("select * from
df")
mm: org.apache.spark.sql.DataFrame = [age:
bigint, name: string, sex: string]
scala> mm.registerTempTable("mm")
scala> sqlContext.sql("select * from
mm").collect.foreach(println)
[23,Ravi,M]
[24,Rani,F]
[null,Mani,M]
[34,Vani,null]
[29,Veni,F]
scala> mm.show()
+----+----+----+
| age|name| sex|
+----+----+----+
| 23|Ravi| M|
| 24|Rani| F|
|null|Mani| M|
| 34|Vani|null|
| 29|Veni| F|
+----+----+----+
scala> val x = mm
x: org.apache.spark.sql.DataFrame = [age:
bigint, name: string, sex: string]
scala>
cala> val aggr1 = df.groupBy("sex").agg( max
("age"), min("age"))
aggr1: org.apache.spark.sql.DataFrame = [sex:
string, max(age): bigint, min(age): bigint]
scala> aggr1.collect.foreach(println)
[F,29,24]
[M,23,23]
[null,34,34]
scala> aggr1.show()
+----+--------+--------+
| sex|max(age)|min(age)|
+----+--------+--------+
| F| 29| 24|
| M| 23| 23|
|null| 34| 34|
+----+--------+--------+
scala>
____________________
ex:
[cloudera@quickstart ~]$ cat > emp1
101,aaa,30000,m,11
102,bbbb,40000,f,12
103,cc,60000,m,12
104,dd,80000,f,11
105,cccc,90000,m,12
[cloudera@quickstart ~]$ cat > emp2
201,dk,90000,m,11
202,mm,100000,f,12
203,mmmx,80000,m,12
204,vbvb,70000,f,11
[cloudera@quickstart ~]$ hadoop fs -
copyFromLocal emp1 sparklab
[cloudera@quickstart ~]$ hadoop fs -
copyFromLocal emp2 sparklab
[cloudera@quickstart ~]$
scala> val emp1 = sc.textFile
("/user/cloudera/sparklab/emp1")
scala> val emp2 = sc.textFile
("/user/cloudera/sparklab/emp2")
scala> case class Emp(id:Int, name:String,
| sal:Int, sex:String, dno:Int)
scala> def toEmp(x:String) = {
| val w = x.split(",")
| Emp(w(0).toInt,
| w(1), w(2).toInt,
| w(3), w(4).toInt)
| }
toEmp: (x: String)Emp
scala> val e1 = emp1.map(x => toEmp(x))
e1: org.apache.spark.rdd.RDD[Emp] =
MapPartitionsRDD[43] at map at <console>:37
scala> val e2 = emp2.map(x => toEmp(x))
e2: org.apache.spark.rdd.RDD[Emp] =
MapPartitionsRDD[44] at map at <console>:37
scala>
scala> val df1 = e1.toDF
df1: org.apache.spark.sql.DataFrame = [id:
int, name: string, sal: int, sex: string,
dno: int]
scala> val df2 = e2.toDF
df2: org.apache.spark.sql.DataFrame = [id:
int, name: string, sal: int, sex: string,
dno: int]
scala>
scala> val df = sqlContext.sql("select * from
df1 union all select * from df2")
scala> val res = sqlContext.sql("select sex,
sum(sal) as tot, count(*) as cnt
from df group by sex")
scala>
scala> val wrres = res.map(x => x(0)+","+x
(1)+","+x(2))
scala> wrres.saveAsTextFile
("/user/cloudera/mytemp")
scala> hq.sql("create database park")
scala> hq.sql("use park")
scala> hq.sql("create table urres(sex string,
tot int, cnt int)
row format delimited
fields terminated by ',' ")
scala> hq.sql("load data inpath
'/user/cloudera/mytemp/part-00000' into table
urres ")
scala> val hiveres = hq.sql("select * from
urres")
scala> hiveres.show()
_____________________________________
'hive-site.xml'
[cloudera@quickstart ~]$ sudo chmod -R 777
/usr/lib/spark/conf
[cloudera@quickstart ~]$ cp
/etc/hive/conf.dist/hive-site.xml
/usr/lib/spark/conf
_____________________________________
from hive-site.xml -->
hive.metastore.warehouse.dir
from spark 2.0.0 onwards above opt is
depricated
use following option..
------> spark.sql.warehouse.dir
_____________________________________________
____ [ tested in cloudera 5.8
spark version 1.6.0 ]
[cloudera@quickstart ~]$ls
/usr/lib/hue/apps/beeswax/data/sample_07.csv
[cloudera@quickstart ~]$ head -n 2
/usr/lib/hue/apps/beeswax/data/sample_07.csv
_____________________
val hq = new
org.apache.spark.sql.hive.HiveContext(sc)
hq.sql("create database sparkdb")
hq.sql("CREATE TABLE sample_07 (code
string,description string,total_emp
int,salary int) ROW FORMAT DELIMITED FIELDS
TERMINATED BY '\t' STORED AS TextFile")
[cloudera@quickstart ~]$ hadoop fs -mkdir
sparks
[cloudera@quickstart ~]$ hadoop fs -
copyFromLocal
/usr/lib/hue/apps/beeswax/data/sample_07.csv
sparks
[cloudera@quickstart ~]$ hadoop fs -ls
sparks
hq.sql("LOAD DATA INPATH
'/user/cloudera/sparks/sample_07.csv'
OVERWRITE INTO TABLE sample_07")
val df = hq.sql("SELECT * from sample_07")
__________________________________________
scala> df.filter(df("salary") > 150000).show
()
+-------+--------------------+---------
+------+
| code| description|total_emp|
salary|
+-------+--------------------+---------
+------+
|11-1011| Chief executives| 299160|
151370|
|29-1022|Oral and maxillof...| 5040|
178440|
|29-1023| Orthodontists| 5350|
185340|
|29-1024| Prosthodontists| 380|
169360|
|29-1061| Anesthesiologists| 31030|
192780|
|29-1062|Family and genera...| 113250|
153640|
|29-1063| Internists, general| 46260|
167270|
|29-1064|Obstetricians and...| 21340|
183600|
|29-1067| Surgeons| 50260|
191410|
|29-1069|Physicians and su...| 237400|
155150|
+-------+--------------------+---------
+------+
____________________________________________
val sqlContext = new
org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
[cloudera@quickstart ~]$ gedit json1
[cloudera@quickstart ~]$ hadoop fs -
copyFromLocal json1 sparks
[cloudera@quickstart ~]$ hadoop fs -cat
sparks/json1
{"name":"Ravi","age":23,"sex":"M"}
{"name":"Rani","age":24,"sex":"F"}
{"name":"Mani","sex":"M"}
{"name":"Vani","age":34}
{"name":"Veni","age":29,"sex":"F"}
[cloudera@quickstart ~]$
scala> val df = sqlContext.read.json
("/user/cloudera/sparks/json1")
scala> df.show()
+----+----+----+
| age|name| sex|
+----+----+----+
| 23|Ravi| M|
| 24|Rani| F|
|null|Mani| M|
| 34|Vani|null|
| 29|Veni| F|
+----+----+----+
scala> df.printSchema()
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
|-- sex: string (nullable = true)
scala> df.select("name").show()
+----+
|name|
+----+
|Ravi|
|Rani|
|Mani|
|Vani|
|Veni|
+----+
scala> df.select("age").show()
+----+
| age|
+----+
| 23|
| 24|
|null|
| 34|
| 29|
+----+
scala>
scala> df.select("name","age").show()
+----+----+
|name| age|
+----+----+
|Ravi| 23|
|Rani| 24|
|Mani|null|
|Vani| 34|
|Veni| 29|
+----+----+
scala> df.select("name","sex").show()
+----+----+
|name| sex|
+----+----+
|Ravi| M|
|Rani| F|
|Mani| M|
|Vani|null|
|Veni| F|
+----+----+
scala>
scala> df.select(df("name"),df
("age")+10).show()
+----+----------+
|name|(age + 10)|
+----+----------+
|Ravi| 33|
|Rani| 34|
|Mani| null|
|Vani| 44|
|Veni| 39|
+----+----------+
scala> df.filter(df("age")<34).show()
+---+----+---+
|age|name|sex|
+---+----+---+
| 23|Ravi| M|
| 24|Rani| F|
| 29|Veni| F|
+---+----+---+
scala> df.filter(df("age")>=5 && df("age")
<30).show()
+---+----+---+
|age|name|sex|
+---+----+---+
| 23|Ravi| M|
| 24|Rani| F|
| 29|Veni| F|
+---+----+---+
scala> df.groupBy("age").count().show()
+----+-----+
| age|count|
+----+-----+
| 34| 1|
|null| 1|
| 23| 1|
| 24| 1|
| 29| 1|
+----+-----+
scala> df.groupBy("sex").count().show()
+----+-----+
| sex|count|
+----+-----+
| F| 2|
| M| 2|
|null| 1|
+----+-----+
scala>
scala> df.registerTempTable("df")
scala> sqlContext.sql("select * from
df").collect.foreach(println)
[23,Ravi,M]
[24,Rani,F]
[null,Mani,M]
[34,Vani,null]
[29,Veni,F]
scala> val mm = sqlContext.sql("select * from
df")
mm: org.apache.spark.sql.DataFrame = [age:
bigint, name: string, sex: string]
scala> mm.registerTempTable("mm")
scala> sqlContext.sql("select * from
mm").collect.foreach(println)
[23,Ravi,M]
[24,Rani,F]
[null,Mani,M]
[34,Vani,null]
[29,Veni,F]
scala> mm.show()
+----+----+----+
| age|name| sex|
+----+----+----+
| 23|Ravi| M|
| 24|Rani| F|
|null|Mani| M|
| 34|Vani|null|
| 29|Veni| F|
+----+----+----+
scala> val x = mm
x: org.apache.spark.sql.DataFrame = [age:
bigint, name: string, sex: string]
scala>
cala> val aggr1 = df.groupBy("sex").agg( max
("age"), min("age"))
aggr1: org.apache.spark.sql.DataFrame = [sex:
string, max(age): bigint, min(age): bigint]
scala> aggr1.collect.foreach(println)
[F,29,24]
[M,23,23]
[null,34,34]
scala> aggr1.show()
+----+--------+--------+
| sex|max(age)|min(age)|
+----+--------+--------+
| F| 29| 24|
| M| 23| 23|
|null| 34| 34|
+----+--------+--------+
scala>
____________________
ex:
[cloudera@quickstart ~]$ cat > emp1
101,aaa,30000,m,11
102,bbbb,40000,f,12
103,cc,60000,m,12
104,dd,80000,f,11
105,cccc,90000,m,12
[cloudera@quickstart ~]$ cat > emp2
201,dk,90000,m,11
202,mm,100000,f,12
203,mmmx,80000,m,12
204,vbvb,70000,f,11
[cloudera@quickstart ~]$ hadoop fs -
copyFromLocal emp1 sparklab
[cloudera@quickstart ~]$ hadoop fs -
copyFromLocal emp2 sparklab
[cloudera@quickstart ~]$
scala> val emp1 = sc.textFile
("/user/cloudera/sparklab/emp1")
scala> val emp2 = sc.textFile
("/user/cloudera/sparklab/emp2")
scala> case class Emp(id:Int, name:String,
| sal:Int, sex:String, dno:Int)
scala> def toEmp(x:String) = {
| val w = x.split(",")
| Emp(w(0).toInt,
| w(1), w(2).toInt,
| w(3), w(4).toInt)
| }
toEmp: (x: String)Emp
scala> val e1 = emp1.map(x => toEmp(x))
e1: org.apache.spark.rdd.RDD[Emp] =
MapPartitionsRDD[43] at map at <console>:37
scala> val e2 = emp2.map(x => toEmp(x))
e2: org.apache.spark.rdd.RDD[Emp] =
MapPartitionsRDD[44] at map at <console>:37
scala>
scala> val df1 = e1.toDF
df1: org.apache.spark.sql.DataFrame = [id:
int, name: string, sal: int, sex: string,
dno: int]
scala> val df2 = e2.toDF
df2: org.apache.spark.sql.DataFrame = [id:
int, name: string, sal: int, sex: string,
dno: int]
scala>
scala> val df = sqlContext.sql("select * from
df1 union all select * from df2")
scala> val res = sqlContext.sql("select sex,
sum(sal) as tot, count(*) as cnt
from df group by sex")
scala>
scala> val wrres = res.map(x => x(0)+","+x
(1)+","+x(2))
scala> wrres.saveAsTextFile
("/user/cloudera/mytemp")
scala> hq.sql("create database park")
scala> hq.sql("use park")
scala> hq.sql("create table urres(sex string,
tot int, cnt int)
row format delimited
fields terminated by ',' ")
scala> hq.sql("load data inpath
'/user/cloudera/mytemp/part-00000' into table
urres ")
scala> val hiveres = hq.sql("select * from
urres")
scala> hiveres.show()
_____________________________________
Thank you sir . Happy teachers day _/\_
ReplyDeletethank you sir
ReplyDeleteThank's a lot Sir for posted valuable
ReplyDeleteinformation in blog.
The blog is so interactive and Informative , you should write more blogs like this Big Data Hadoop Online course India
ReplyDelete