scala> val l = List(10,20,30,40,50,56,67)
scala> val r2 = r.collect.reverse.take(3)
r2: Array[Int] = Array(67, 56, 50)
scala> val r2 = sc.parallelize(r.collect.reverse.take(3))
r2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at <console>:31
-------------------------------
hadling CSV files [ first is header ]
[cloudera@quickstart ~]$ gedit prods
[cloudera@quickstart ~]$ hadoop fs -copyFromLocal prods spLab
scala> val raw = sc.textFile("/user/cloudera/spLab/prods")
raw: org.apache.spark.rdd.RDD[String] = /user/cloudera/spLab/prods MapPartitionsRDD[11] at textFile at <console>:27
scala> raw.collect.foreach(println)
"pid","name","price"
p1,Tv,50000
p2,Lap,70000
p3,Ipod,8000
p4,Mobile,9000
scala> raw.count
res18: Long = 5
to eleminate first element, slice is used .
scala> l
res19: List[Int] = List(10, 20, 30, 40, 50, 50, 56, 67)
scala> l.slice(2,5)
res20: List[Int] = List(30, 40, 50)
scala> l.slice(1,l.size)
res21: List[Int] = List(20, 30, 40, 50, 50, 56, 67)
way1:
scala> raw.collect
res29: Array[String] = Array("pid","name","price", p1,Tv,50000, p2,Lap,70000, p3,Ipod,8000, p4,Mobile,9000)
scala> val data = sc.parallelize(raw.collect.slice(1,raw.collect.size))
data: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[12] at parallelize at <console>:29
scala> data.collect.foreach(println)
p1,Tv,50000
p2,Lap,70000
p3,Ipod,8000
p4,Mobile,9000
scala>
here slice is not available with rdd.
so , data to be collected into local , then slice has to applied.
if rdd volume is bigger, client can not collect it. flow will be failed.
Way2:
------
val data = raw.filter(x =>
!line.contains("pid"))
data.persist
--adv: no need to collect data into client[local]
--disadv : to eleminate 1 row, scanning all rows.
-----------------------------------------
scala> val r2 = r.collect.reverse.take(3)
r2: Array[Int] = Array(67, 56, 50)
scala> val r2 = sc.parallelize(r.collect.reverse.take(3))
r2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at <console>:31
-------------------------------
hadling CSV files [ first is header ]
[cloudera@quickstart ~]$ gedit prods
[cloudera@quickstart ~]$ hadoop fs -copyFromLocal prods spLab
scala> val raw = sc.textFile("/user/cloudera/spLab/prods")
raw: org.apache.spark.rdd.RDD[String] = /user/cloudera/spLab/prods MapPartitionsRDD[11] at textFile at <console>:27
scala> raw.collect.foreach(println)
"pid","name","price"
p1,Tv,50000
p2,Lap,70000
p3,Ipod,8000
p4,Mobile,9000
scala> raw.count
res18: Long = 5
to eleminate first element, slice is used .
scala> l
res19: List[Int] = List(10, 20, 30, 40, 50, 50, 56, 67)
scala> l.slice(2,5)
res20: List[Int] = List(30, 40, 50)
scala> l.slice(1,l.size)
res21: List[Int] = List(20, 30, 40, 50, 50, 56, 67)
way1:
scala> raw.collect
res29: Array[String] = Array("pid","name","price", p1,Tv,50000, p2,Lap,70000, p3,Ipod,8000, p4,Mobile,9000)
scala> val data = sc.parallelize(raw.collect.slice(1,raw.collect.size))
data: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[12] at parallelize at <console>:29
scala> data.collect.foreach(println)
p1,Tv,50000
p2,Lap,70000
p3,Ipod,8000
p4,Mobile,9000
scala>
here slice is not available with rdd.
so , data to be collected into local , then slice has to applied.
if rdd volume is bigger, client can not collect it. flow will be failed.
Way2:
------
val data = raw.filter(x =>
!line.contains("pid"))
data.persist
--adv: no need to collect data into client[local]
--disadv : to eleminate 1 row, scanning all rows.
-----------------------------------------
nice blog
ReplyDelete