Data science Software Course Training in Ameerpet Hyderabad

Data science Software Course Training in Ameerpet Hyderabad

Monday, 8 May 2017

Spark : Joins


[cloudera@quickstart ~]$ hadoop fs -copyFromLocal emp spLab/e
[cloudera@quickstart ~]$ hadoop fs -copyFromLocal dept spLab/d
[cloudera@quickstart ~]$ hadoop fs -cat spLab/e
101,aaaa,40000,m,11
102,bbbbbb,50000,f,12
103,cccc,50000,m,12
104,dd,90000,f,13
105,ee,10000,m,12
106,dkd,40000,m,12
107,sdkfj,80000,f,13
108,iiii,50000,m,11
109,jj,10000,m,14
110,kkk,20000,f,15
111,dddd,30000,m,15
[cloudera@quickstart ~]$ hadoop fs -cat spLab/d
11,marketing,hyd
12,hr,del
13,fin,del
21,admin,hyd
22,production,del
[cloudera@quickstart ~]$
val emp = sc.textFile("/user/cloudera/spLab/e")
val dept = sc.textFile("/user/cloudera/spLab/d")
val epair = emp.map{x =>
               val w = x.split(",")
                val dno = w(4).toInt
                val sal = w(2).toInt
                (dno, sal)
             }
 epair.collect.foreach(println)
(11,40000)
(12,50000)
(12,50000)
(13,90000)
(12,10000)
(12,40000)
(13,80000)
(11,50000)
(14,10000)
val dpair = dept.map{ x =>
            val w = x.split(",")
            val dno = w(0).toInt
             val loc = w(2)
            (dno, loc)
            }
scala> dpair.collect.foreach(println)
(11,hyd)
(12,del)
(13,del)
(21,hyd)
(22,del)
-- inner join
val ij = epair.join(dpair)
ij.collect.foreach(println)
 ij.collect.foreach(println)
(13,(90000,del))
(13,(80000,del))
(11,(40000,hyd))
(11,(50000,hyd))
(12,(50000,del))
(12,(50000,del))
(12,(10000,del))
(12,(40000,del))
-- left outer join
val lj = epair.leftOuterJoin(dpair)
lj.collect.foreach(println)
scala> lj.collect.foreach(println)
(13,(90000,Some(del)))
(13,(80000,Some(del)))
(15,(20000,None))
(15,(30000,None))
(11,(40000,Some(hyd)))
(11,(50000,Some(hyd)))
(14,(10000,None))
(12,(50000,Some(del)))
(12,(50000,Some(del)))
(12,(10000,Some(del)))
(12,(40000,Some(del)))
-- right outer join
val rj = epair.rightOuterJoin(dpair)
rj.collect.foreach(println)
(13,(Some(90000),del))
(13,(Some(80000),del))
(21,(None,hyd))
(22,(None,del))
(11,(Some(40000),hyd))
(11,(Some(50000),hyd))
(12,(Some(50000),del))
(12,(Some(50000),del))
(12,(Some(10000),del))
(12,(Some(40000),del))
-- full outer join
val fj = epair.fullOuterJoin(dpair)
fj.collect.foreach(println)
(13,(Some(90000),Some(del)))
(13,(Some(80000),Some(del)))
(15,(Some(20000),None))
(15,(Some(30000),None))
(21,(None,Some(hyd)))
(22,(None,Some(del)))
(11,(Some(40000),Some(hyd)))
(11,(Some(50000),Some(hyd)))
(14,(Some(10000),None))
(12,(Some(50000),Some(del)))
(12,(Some(50000),Some(del)))
(12,(Some(10000),Some(del)))
(12,(Some(40000),Some(del)))

location based aggregations:
val locSal = fj.map{ x =>
       val sal = x._2._1
       val loc = x._2._2
       val s = if(sal==None) 0 else sal.get
   val l = if(loc==None) "NoCity" else loc.get
       (l, s)
 }
locSal.collect.foreach(println)
(del,90000)
(del,80000)
(NoCity,20000)
(NoCity,30000)
(hyd,0)
(del,0)
(hyd,40000)
(hyd,50000)
(NoCity,10000)
(del,50000)
(del,50000)
(del,10000)
(del,40000)
val locSummary = locSal.reduceByKey(_+_)
locSummary.collect.foreach(println)
scala> locSummary.collect.foreach(println)
(hyd,90000)
(del,320000)
(NoCity,60000)
-----------------
val stats = fj.map{ x =>
       val sal = x._2._1
       val loc = x._2._2
val stat = if(sal!=None & loc!=None)  "Working" else
     if(sal==None) "BenchProj" else "BenchTeam"
       val s = if(sal==None) 0 else sal.get
      (stat, s)
 }
stats.collect.foreach(println)
(Working,90000)
(Working,80000)
(BenchTeam,20000)
(BenchTeam,30000)
(BenchProj,0)
(BenchProj,0)
(Working,40000)
(Working,50000)
(BenchTeam,10000)
(Working,50000)
(Working,50000)
(Working,10000)
(Working,40000)
val res = stats.reduceByKey(_+_)
res.collect.foreach(println)
(BenchTeam,60000)
(Working,410000)
(BenchProj,0)

     
















4 comments: