-----------
Spark Sql
---------------
[ ]
Spark sql is a library,
to process spark data objects,
using sql select statements.
Spark sql follows mysql based sql syntaxes.
==============================================
Spark sql provides,
two types of contexts.
i) sqlContext
ii) HiveContext.
import org.apache.spark.sql.SqlContext
val sqlCon = new SqlContext(sc)
using sqlContext ,
we can process spark objects using select statements.
Using HiveContext,
we can integrate , Hive with Spark.
Hive, is data warehouse environment in hadoop framework,
So total is stored and managed at Hive tables.
using HiveContext we can access entire hive enviroment (hive tables) from Spark.
difference between, hql statement from Hive,
and hql statement from Spark.
--> if hql is executed from Hive Environment,
the statement to process, will be converted
as mAPREDUCE job.
---> if same hive is integrated with spark,
and hql is submitted from spark,
it uses, DAG and Inmemory computing models.
which is more faster than MapReduce.
import org.apache.spark.sql.hive.HiveContext
val hc = new HiveContext(sc)
-----------------------------
Example of sqlContext.
val sqc = new SqlContext(sc)
file name --> file1
sample ---> 100,200,300
300,400,400
:
:
step1)
create case class for the data.
case class Rec(a:Int, b:Int, c:Int)
step2) create a function ,
to convert raw line into case object.
[function to provide schema ]
def makeRec(line:String)={
val w = line.split(",")
val a = w(0).toInt
val b = w(1).toInt
val c = w(2).toInt
val r = Rec(a, b,c)
r
}
--------
step3) load data.
val data = sc.textFile("/user/cloudera/sparklab/file1")
100,200,300
2000,340,456
:
:
step4) transform each record into case Object
val recs = data.map(x => makeRec(x))
step5) convert rdd into data frme.
val df = recs.toDF
step6) create table instance for the dataframe.
df.registerTempTable("samp")
step7) apply select statement of sql on temp table.
val r1 = sqc.sql("select a+b+c as tot from samp")
r1
------
tot
----
600
900
r1.registerTempTable(samp1)
val r2 = sqc.sql("select sum(tot) as gtot from samp1")
once "select" statement is applied on
temp table, returned object will be dataframe.
to apply sql on processed results,
again we need to register the dataframe
as temp table.
r1.registerAsTempTable("Samp2")
val r2 = sqc.sql("select * from samp2
where tot>=200")
-----------------------------------
sales
--------------------
:
12/27/2016,10000,3,10
:
:
-------------------------
Steps involing in Spark Sql.[sqlContext]
----------------------------
monthly sales report...
schema ---> date, price, qnt, discount
step1)
case class Sales(mon : Int, price:Int,
qnt :Int, disc: Int)
step2)
def toSales(line: String) = {
val w = line.split(",")
val mon = w(0).split("/")(0)
val p = w(1).toInt
val q = w(2).toInt
val d = w(3).toInt
val srec = Sales(mon,p,q,d)
srec
}
step3)
val data = sc.textFile("/user/cloudera/mydata/sales.txt")
step4)
val strans = data.map(x => toSales(x))
step5)
val sdf = strans.toDF
sdf.show
step6)
sdf.registerTempTable("SalesTrans")
step7) // play with select
---> mon, price, qnt, disc
val res1 = sqlContext.sql("select mon ,
sum(
(price - price*disc/100)*qnt
) as tsales from SalesTrans
group by mon")
res1.show
res1.printSchema
-----------------------------------------
val res2 = res1
res1.registerTempTable("tab1")
res2.registerTempTable("tab2")
val res3 = sqlContext.sql("select l.mon as m1,
r.mon as m2, l.tsales as t1,
r.tsales as t2
from tab1 l join tab2 r
where (l.mon-r.mon)==1")
// 11 rows.
res3.registerTempTable("tab3")
------------------------------
val res4 = sqlContext.sql("select
m1, m2, t1, t2, ((t2-t1)*100)/t1 as sgrowth
from tab3")
res4.show()
------------------------------------------
json1.json
--------------------------
{"name":"Ravi","age":25,"city":"Hyd"}
{"name":"Rani","sex":"F","city":"Del"}
:
:
---------------------------------------
val df = sqlContext.read.json("/user/cloudera/mydata/json1.json")
df.show
------------------------
name age sex city
----------------------------------------
ravi 25 null hyd
rani null F del
:
:
--------------------------------------
json2.json
------------------
{"name":"Ravi","age":25,
"wife":{"name":"Rani","age":23},"city":"Hyd"}}
:
:
val df2 = sqlContext.read.json("/../json2.json")
df2
-----------------------
name age wife city
Ravi 25 {"name":"rani","age":23} HYd
:
---------------------
df2.registerTempTable("Info")
val df3 = sqlContext.sql("select name,
wife.name as wname,
age, wife.age as wage,
abs(age-wife.age) as diff,
city from Info")
----------------------------------------
xml data processing with spark sql.
---spark sql does not have, direct libraries
for xml processing.
two ways.
i) 3 rd party api [ ex: databricks]
ii) using Hive Integreation.
2nd is best.
How to integrate Hive with spark .
---Using HiveContext.
step1)
copy hive-site.xml file into,
/usr/lib/spark/conf directory.
what if , hive-site.xml is not copied into
conf directory of spark?
--- spark can not understand,
hive's metastore location [derby/mysql/oracle ....]
this info is available with hive-site.xml .
step2)
create hive Context object
import org.apache.spark.sql.hive.HiveContext
val hc = new HiveContext(sc)
step3) access Hive Environment from spark
hc.sql("create database mydb")
hc.sql("use mydb")
hc.sql("create table samp(line string)")
hc.sql("load data local inpath 'file1'
into table samp")
val df = hc.sql("select * from samp")
-------------------------------------
xml1.xml
----------------------------------
<rec><name>Ravi</name><age>25</age></rec>
<rec><name>Rani</name><sex>F</sex></rec>
:
:
------------------------------------------
hc.sql("create table raw(line string)")
hc.sql("load data local inpath 'xml1.xml'
into table raw")
hc.sql("create table info(name string,
age int, sex string)")
hc.sql("insert overwrite table info
select xpath_string(line,'rec/name'),
xpath_int(line, 'rec/age'),
xpath_string(line, 'rec/sex')
from raw")
----------------------------------------
xml2.xml
------------
<rec><name><fname>Ravi</fname><lname>kumar</lname><age>24</age><contact><email><personal>ravi@gmail.com</personal><official>ravi@ventech.com</official></email><phone><mobile>12345</mobile><office>123900</office><residence>127845</residence></phone></contact><city>Hyd</city></rec>
hc.sql("create table xraw(line string)")
hc.sql("load data local inpath 'xml2.xml'
into table xraw")
hc.sql("create table xinfo(fname string ,
lname string, age int,
personal_email string,
official_email string,
mobile String,
office_phone string ,
residence_phone string,
city string)")
hc.sql("insert overwrite table xinfo
select
xpath_string(line,'rec/name/fname'),
xpath_string(line,'rec/name/lname'),
xpath_int(line,'rec/age'),
xpath_string(line,'rec/contact/email/personal'),
xpath_string(line,'rec/contact/email/official'),
xpath_string(line,'rec/contact/phone/mobile'),
xpath_string(line,'rec/contact/phone/office'),
xpath_string(line,'rec/contact/phone/residence'),
xpath_string(line,'rec/city')
from xraw")
-------------------------
xml3.xml
----------------
<tr><cid>101</cid><pr>200</pr><pr>300</pr><pr>300</pr></tr>
<tr><cid>102</cid><pr>400</pr><pr>800</pr></tr>
<tr><cid>101</cid><pr>1000</pr></tr>
--------------------------------
hc.sql("create table sraw")
hc.sql("load data local inpath 'xml3.xml'
into table sraw")
hc.sql("create table raw2(cid int, pr array<String>)")
hc.sql("insert overwrite table raw2
select xpath_int(line, 'tr/cid'),
xpath(line,'tr/pr/text()')
from sraw")
hc.sql("select * from raw2").show
-------------------------------
cid pr
101 [100,300,300]
102 [400,800]
101 [1000]
hc.sql("select explode(pr) as price from raw2").show
100
300
300
400
800
1000
hc.sql("select cid, explode(pr) as price from raw2").show
----> above is invalid.
hc.sql("create table raw3(cid int, pr int)")
hc.sql("Insert overwrite table raw3
select name, mypr from raw2
lateral view explode(pr) p as mypr")
hc.sql("select * from raw3").show
cid pr
101 200
101 300
101 300
102 400
102 800
101 1000
hc.sql("create table summary(cid int, totbill long)")
hc.sql("insert overwrite table summary
select cid , sum(pr) from raw3
group by cid")
--------------------
Spark Sql
---------------
[ ]
Spark sql is a library,
to process spark data objects,
using sql select statements.
Spark sql follows mysql based sql syntaxes.
==============================================
Spark sql provides,
two types of contexts.
i) sqlContext
ii) HiveContext.
import org.apache.spark.sql.SqlContext
val sqlCon = new SqlContext(sc)
using sqlContext ,
we can process spark objects using select statements.
Using HiveContext,
we can integrate , Hive with Spark.
Hive, is data warehouse environment in hadoop framework,
So total is stored and managed at Hive tables.
using HiveContext we can access entire hive enviroment (hive tables) from Spark.
difference between, hql statement from Hive,
and hql statement from Spark.
--> if hql is executed from Hive Environment,
the statement to process, will be converted
as mAPREDUCE job.
---> if same hive is integrated with spark,
and hql is submitted from spark,
it uses, DAG and Inmemory computing models.
which is more faster than MapReduce.
import org.apache.spark.sql.hive.HiveContext
val hc = new HiveContext(sc)
-----------------------------
Example of sqlContext.
val sqc = new SqlContext(sc)
file name --> file1
sample ---> 100,200,300
300,400,400
:
:
step1)
create case class for the data.
case class Rec(a:Int, b:Int, c:Int)
step2) create a function ,
to convert raw line into case object.
[function to provide schema ]
def makeRec(line:String)={
val w = line.split(",")
val a = w(0).toInt
val b = w(1).toInt
val c = w(2).toInt
val r = Rec(a, b,c)
r
}
--------
step3) load data.
val data = sc.textFile("/user/cloudera/sparklab/file1")
100,200,300
2000,340,456
:
:
step4) transform each record into case Object
val recs = data.map(x => makeRec(x))
step5) convert rdd into data frme.
val df = recs.toDF
step6) create table instance for the dataframe.
df.registerTempTable("samp")
step7) apply select statement of sql on temp table.
val r1 = sqc.sql("select a+b+c as tot from samp")
r1
------
tot
----
600
900
r1.registerTempTable(samp1)
val r2 = sqc.sql("select sum(tot) as gtot from samp1")
once "select" statement is applied on
temp table, returned object will be dataframe.
to apply sql on processed results,
again we need to register the dataframe
as temp table.
r1.registerAsTempTable("Samp2")
val r2 = sqc.sql("select * from samp2
where tot>=200")
-----------------------------------
sales
--------------------
:
12/27/2016,10000,3,10
:
:
-------------------------
Steps involing in Spark Sql.[sqlContext]
----------------------------
monthly sales report...
schema ---> date, price, qnt, discount
step1)
case class Sales(mon : Int, price:Int,
qnt :Int, disc: Int)
step2)
def toSales(line: String) = {
val w = line.split(",")
val mon = w(0).split("/")(0)
val p = w(1).toInt
val q = w(2).toInt
val d = w(3).toInt
val srec = Sales(mon,p,q,d)
srec
}
step3)
val data = sc.textFile("/user/cloudera/mydata/sales.txt")
step4)
val strans = data.map(x => toSales(x))
step5)
val sdf = strans.toDF
sdf.show
step6)
sdf.registerTempTable("SalesTrans")
step7) // play with select
---> mon, price, qnt, disc
val res1 = sqlContext.sql("select mon ,
sum(
(price - price*disc/100)*qnt
) as tsales from SalesTrans
group by mon")
res1.show
res1.printSchema
-----------------------------------------
val res2 = res1
res1.registerTempTable("tab1")
res2.registerTempTable("tab2")
val res3 = sqlContext.sql("select l.mon as m1,
r.mon as m2, l.tsales as t1,
r.tsales as t2
from tab1 l join tab2 r
where (l.mon-r.mon)==1")
// 11 rows.
res3.registerTempTable("tab3")
------------------------------
val res4 = sqlContext.sql("select
m1, m2, t1, t2, ((t2-t1)*100)/t1 as sgrowth
from tab3")
res4.show()
------------------------------------------
json1.json
--------------------------
{"name":"Ravi","age":25,"city":"Hyd"}
{"name":"Rani","sex":"F","city":"Del"}
:
:
---------------------------------------
val df = sqlContext.read.json("/user/cloudera/mydata/json1.json")
df.show
------------------------
name age sex city
----------------------------------------
ravi 25 null hyd
rani null F del
:
:
--------------------------------------
json2.json
------------------
{"name":"Ravi","age":25,
"wife":{"name":"Rani","age":23},"city":"Hyd"}}
:
:
val df2 = sqlContext.read.json("/../json2.json")
df2
-----------------------
name age wife city
Ravi 25 {"name":"rani","age":23} HYd
:
---------------------
df2.registerTempTable("Info")
val df3 = sqlContext.sql("select name,
wife.name as wname,
age, wife.age as wage,
abs(age-wife.age) as diff,
city from Info")
----------------------------------------
xml data processing with spark sql.
---spark sql does not have, direct libraries
for xml processing.
two ways.
i) 3 rd party api [ ex: databricks]
ii) using Hive Integreation.
2nd is best.
How to integrate Hive with spark .
---Using HiveContext.
step1)
copy hive-site.xml file into,
/usr/lib/spark/conf directory.
what if , hive-site.xml is not copied into
conf directory of spark?
--- spark can not understand,
hive's metastore location [derby/mysql/oracle ....]
this info is available with hive-site.xml .
step2)
create hive Context object
import org.apache.spark.sql.hive.HiveContext
val hc = new HiveContext(sc)
step3) access Hive Environment from spark
hc.sql("create database mydb")
hc.sql("use mydb")
hc.sql("create table samp(line string)")
hc.sql("load data local inpath 'file1'
into table samp")
val df = hc.sql("select * from samp")
-------------------------------------
xml1.xml
----------------------------------
<rec><name>Ravi</name><age>25</age></rec>
<rec><name>Rani</name><sex>F</sex></rec>
:
:
------------------------------------------
hc.sql("create table raw(line string)")
hc.sql("load data local inpath 'xml1.xml'
into table raw")
hc.sql("create table info(name string,
age int, sex string)")
hc.sql("insert overwrite table info
select xpath_string(line,'rec/name'),
xpath_int(line, 'rec/age'),
xpath_string(line, 'rec/sex')
from raw")
----------------------------------------
xml2.xml
------------
<rec><name><fname>Ravi</fname><lname>kumar</lname><age>24</age><contact><email><personal>ravi@gmail.com</personal><official>ravi@ventech.com</official></email><phone><mobile>12345</mobile><office>123900</office><residence>127845</residence></phone></contact><city>Hyd</city></rec>
hc.sql("create table xraw(line string)")
hc.sql("load data local inpath 'xml2.xml'
into table xraw")
hc.sql("create table xinfo(fname string ,
lname string, age int,
personal_email string,
official_email string,
mobile String,
office_phone string ,
residence_phone string,
city string)")
hc.sql("insert overwrite table xinfo
select
xpath_string(line,'rec/name/fname'),
xpath_string(line,'rec/name/lname'),
xpath_int(line,'rec/age'),
xpath_string(line,'rec/contact/email/personal'),
xpath_string(line,'rec/contact/email/official'),
xpath_string(line,'rec/contact/phone/mobile'),
xpath_string(line,'rec/contact/phone/office'),
xpath_string(line,'rec/contact/phone/residence'),
xpath_string(line,'rec/city')
from xraw")
-------------------------
xml3.xml
----------------
<tr><cid>101</cid><pr>200</pr><pr>300</pr><pr>300</pr></tr>
<tr><cid>102</cid><pr>400</pr><pr>800</pr></tr>
<tr><cid>101</cid><pr>1000</pr></tr>
--------------------------------
hc.sql("create table sraw")
hc.sql("load data local inpath 'xml3.xml'
into table sraw")
hc.sql("create table raw2(cid int, pr array<String>)")
hc.sql("insert overwrite table raw2
select xpath_int(line, 'tr/cid'),
xpath(line,'tr/pr/text()')
from sraw")
hc.sql("select * from raw2").show
-------------------------------
cid pr
101 [100,300,300]
102 [400,800]
101 [1000]
hc.sql("select explode(pr) as price from raw2").show
100
300
300
400
800
1000
hc.sql("select cid, explode(pr) as price from raw2").show
----> above is invalid.
hc.sql("create table raw3(cid int, pr int)")
hc.sql("Insert overwrite table raw3
select name, mypr from raw2
lateral view explode(pr) p as mypr")
hc.sql("select * from raw3").show
cid pr
101 200
101 300
101 300
102 400
102 800
101 1000
hc.sql("create table summary(cid int, totbill long)")
hc.sql("insert overwrite table summary
select cid , sum(pr) from raw3
group by cid")
--------------------
Thanks for sharing very useful information
ReplyDeleteHadoop Online Training
Hi, I am really happy to found such a helpful and fascinating post that is written in well manner. Thanks for sharing such an informative post.
ReplyDeleteHadoop Course Online
import org.apache.spark.sql.SQLContext is right the one.
ReplyDelete
ReplyDeletevery informative blog and useful article thank you for sharing with us , keep posting learn more Big Data Hadoop Online Course