Data science Software Course Training in Ameerpet Hyderabad

Data science Software Course Training in Ameerpet Hyderabad

Tuesday, 30 August 2016

spark lab1 : Spark Aggregations : map, flatMap, sc.textFile(), reduceByKey(), groupByKey()

spark Lab1:
___________
[cloudera@quickstart ~]$ cat > comment
i love hadoop
i love spark
i love hadoop and spark
[cloudera@quickstart ~]$ hadoop fs -mkdir spark
[cloudera@quickstart ~]$ hadoop fs -copyFromLocal comment spark

Word Count using spark:

scala> val r1 = sc.textFile("hdfs://quickstart.cloudera/user/cloudera/spark/comment")

scala> r1.collect.foreach(println)

scala> val r2 = r1.map(x => x.split(" "))

scala> val r3 = r2.flatMap(x => x)

instead of writing r2 and r3.

scala> val words  = r1.flatMap(x =>
     |    x.split(" ") )

scala> val wpair = words.map( x =>
     |    (x,1) )

scala> val wc = wpair.reduceByKey((x,y) => x+y)

scala> wc.collect

scala> val wcres = wc.map( x =>
     |     x._1+","+x._2 )

scala> wcres.saveAsTextFile("hdfs://quickstart.cloudera/user/cloudera/spark/results2")


[cloudera@quickstart ~]$ cat emp
101,aa,20000,m,11
102,bb,30000,f,12
103,cc,40000,m,11
104,ddd,50000,f,12
105,ee,60000,m,12
106,dd,90000,f,11
[cloudera@quickstart ~]$ hadoop fs -copyFromLocal emp spark
[cloudera@quickstart ~]$

scala> val e1 = sc.textFile("/user/cloudera/spark/emp")

scala> val e2 = e1.map(_.split(","))

scala> val epair = e2.map( x=>
     |   (x(3), x(2).toInt ) )

scala> val res = epair.reduceByKey(_+_)
res: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[18] at reduceByKey at <console>:24

scala> res.collect.foreach(println)
(f,170000)
(m,120000)

scala> val resmax = epair.reduceByKey(
     |    (x,y) => Math.max(x,y))


scala> val resmin = epair.reduceByKey(Math.min(_,_))

scala> resmax.collect.foreach(println)
(f,90000)
(m,60000)

scala> resmin.collect.foreach(println)
(f,30000)
(m,20000)

scala> val grpd = epair.groupByKey()



scala> val resall = grpd.map(x =>
     |  (x._1, x._2.sum,x._2.size,x._2.max,x._2.min,x._2.sum/x._2.size) )
scala> resall.collect.foreach(println)










4 comments: