Data science Software Course Training in Ameerpet Hyderabad

Saturday, 8 April 2017

spark sql with json and xml processing


   Spark Sql

  [ ]
     Spark sql is a library,
   to process spark data objects,
   using sql select statements.

 Spark sql follows mysql based sql syntaxes.
 Spark sql provides,
   two types of contexts.
  i) sqlContext
 ii) HiveContext.

import org.apache.spark.sql.SqlContext

    val sqlCon = new SqlContext(sc)

using sqlContext ,
   we can process spark objects using select statements.

 Using HiveContext,
   we can integrate , Hive with Spark.
 Hive, is data warehouse environment in hadoop framework,
   So total is stored and managed at Hive tables.
  using HiveContext we can access entire hive enviroment (hive tables) from Spark.

  difference between, hql statement from Hive,
  and hql statement from Spark.
--> if hql is executed from Hive Environment,
  the statement to process, will be converted
 as mAPREDUCE job.
 ---> if same hive is integrated with spark,
  and hql is submitted from spark,
    it uses, DAG and Inmemory computing models.
  which is more faster than MapReduce.

 import org.apache.spark.sql.hive.HiveContext

 val hc = new HiveContext(sc)
 Example of sqlContext.

 val sqc = new SqlContext(sc)

  file name --> file1
  sample --->  100,200,300
  create case class for the data.
   case class  Rec(a:Int, b:Int, c:Int)

 step2) create a function ,
   to convert raw line into case object.
  [function to provide schema ]

 def makeRec(line:String)={
        val w = line.split(",")
        val a = w(0).toInt
        val b = w(1).toInt
        val c = w(2).toInt
        val r = Rec(a, b,c)
step3) load data.
   val data = sc.textFile("/user/cloudera/sparklab/file1")


step4) transform each record into case Object

  val recs = => makeRec(x))

step5) convert rdd into data frme.
  val df = recs.toDF

step6)  create table instance for the dataframe.


step7) apply select statement of sql on temp table.

val  r1 = sqc.sql("select a+b+c as tot from samp")



val r2 =    sqc.sql("select sum(tot) as gtot from samp1")

   once "select" statement is applied on
 temp table, returned object will be dataframe.

 to apply sql on processed results,
    again we need to register the dataframe
  as temp table.

 val r2 = sqc.sql("select * from samp2
                      where tot>=200")



 Steps involing in Spark Sql.[sqlContext]
   monthly sales report...
schema ---> date, price, qnt, discount

    case class Sales(mon : Int, price:Int,
             qnt :Int, disc: Int)

    def toSales(line: String) = {
       val w = line.split(",")
       val mon = w(0).split("/")(0)
       val p = w(1).toInt
       val q = w(2).toInt
       val d = w(3).toInt
       val srec = Sales(mon,p,q,d)
 val data = sc.textFile("/user/cloudera/mydata/sales.txt")
  val strans = => toSales(x))

  val sdf = strans.toDF



step7) // play with select
  ---> mon, price, qnt, disc

 val res1 = sqlContext.sql("select mon ,
             (price -  price*disc/100)*qnt
                ) as tsales from SalesTrans
              group by mon")


 val res2 = res1


 val res3 = sqlContext.sql("select l.mon as m1,
     r.mon as m2, l.tsales as t1,
         r.tsales as t2
    from tab1 l join tab2 r
          where (l.mon-r.mon)==1")
//  11 rows.

 val res4 = sqlContext.sql("select
    m1, m2, t1, t2, ((t2-t1)*100)/t1 as sgrowth
    from tab3")

    val df ="/user/cloudera/mydata/json1.json")
  name    age    sex    city
 ravi     25     null hyd
 rani  null   F del


 val df2 ="/../json2.json")

name     age      wife                  city
Ravi 25    {"name":"rani","age":23}  HYd


val df3 = sqlContext.sql("select name,
  as wname,
           age, wife.age as wage,
          abs(age-wife.age) as diff,
          city from Info")
 xml data processing with spark sql.

---spark sql does not have, direct libraries
  for xml processing.
   two ways.
     i) 3 rd party api [ ex: databricks]
    ii) using Hive Integreation.

 2nd is best.

How to integrate Hive with spark .
 ---Using HiveContext.

   copy hive-site.xml file into,
     /usr/lib/spark/conf directory.

what if , hive-site.xml is not copied into
    conf directory of spark?
--- spark can not understand,
  hive's metastore location [derby/mysql/oracle ....]
    this info is available with hive-site.xml .


  create hive Context object

import org.apache.spark.sql.hive.HiveContext
val hc = new HiveContext(sc)

  step3)  access Hive Environment from spark

 hc.sql("create database mydb")
 hc.sql("use mydb")
 hc.sql("create table samp(line string)")
 hc.sql("load data local inpath 'file1'
         into table samp")
 val df = hc.sql("select * from samp")

 hc.sql("create table raw(line string)")
 hc.sql("load data local inpath 'xml1.xml'
   into table raw")
 hc.sql("create table info(name string,
          age int, sex string)")
 hc.sql("insert overwrite table info
    select xpath_string(line,'rec/name'),
       xpath_int(line, 'rec/age'),
      xpath_string(line, 'rec/sex')
      from raw")

hc.sql("create table xraw(line string)")
hc.sql("load data local inpath 'xml2.xml'
   into table xraw")
hc.sql("create table xinfo(fname string ,
   lname string, age int,
    personal_email string,
    official_email string,
    mobile String,
    office_phone string ,
    residence_phone string,
    city string)")

hc.sql("insert overwrite table xinfo
  from xraw")

hc.sql("create table sraw")
hc.sql("load data local inpath 'xml3.xml'
    into table sraw")
hc.sql("create table raw2(cid int, pr array<String>)")

hc.sql("insert overwrite table raw2
   select xpath_int(line, 'tr/cid'),
     from sraw")
hc.sql("select * from raw2").show
cid           pr
101       [100,300,300]
102       [400,800]
101       [1000]

hc.sql("select explode(pr) as price from  raw2").show


hc.sql("select cid, explode(pr) as price from  raw2").show

----> above is invalid.

hc.sql("create table raw3(cid int, pr int)")
hc.sql("Insert overwrite table raw3
    select name, mypr from raw2
      lateral view explode(pr) p as mypr")

hc.sql("select * from raw3").show

cid     pr
101 200
101 300
101     300
102     400
102     800
101    1000

hc.sql("create table summary(cid int, totbill long)")

hc.sql("insert overwrite table summary
   select cid , sum(pr) from raw3
   group by cid")













