Data science Software Course Training in Ameerpet Hyderabad

Data science Software Course Training in Ameerpet Hyderabad

Wednesday 3 May 2017

Spark : Handling CSV files .. Removing Headers

scala> val l = List(10,20,30,40,50,56,67)

scala> val r2 = r.collect.reverse.take(3)
r2: Array[Int] = Array(67, 56, 50)

scala> val r2 = sc.parallelize(r.collect.reverse.take(3))
r2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at <console>:31


-------------------------------
hadling CSV files [ first is header ]

[cloudera@quickstart ~]$ gedit prods
[cloudera@quickstart ~]$ hadoop fs -copyFromLocal prods spLab

scala> val raw  = sc.textFile("/user/cloudera/spLab/prods")
raw: org.apache.spark.rdd.RDD[String] = /user/cloudera/spLab/prods MapPartitionsRDD[11] at textFile at <console>:27

scala> raw.collect.foreach(println)
"pid","name","price"
p1,Tv,50000
p2,Lap,70000
p3,Ipod,8000
p4,Mobile,9000

scala> raw.count
res18: Long = 5



to eleminate first element, slice is used .

 scala> l
res19: List[Int] = List(10, 20, 30, 40, 50, 50, 56, 67)

scala> l.slice(2,5)
res20: List[Int] = List(30, 40, 50)

scala> l.slice(1,l.size)
res21: List[Int] = List(20, 30, 40, 50, 50, 56, 67)

way1:

scala> raw.collect
res29: Array[String] = Array("pid","name","price", p1,Tv,50000, p2,Lap,70000, p3,Ipod,8000, p4,Mobile,9000)

scala> val data = sc.parallelize(raw.collect.slice(1,raw.collect.size))
data: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[12] at parallelize at <console>:29

scala> data.collect.foreach(println)
p1,Tv,50000
p2,Lap,70000
p3,Ipod,8000
p4,Mobile,9000

scala>

 here slice is not available with rdd.
 so  , data to be collected into local , then  slice has to applied.
 if rdd volume is bigger, client can not collect it. flow will be failed.

Way2:
------

 val data = raw.filter(x =>
     !line.contains("pid"))

 data.persist
--adv: no need to collect data into client[local]

--disadv : to eleminate 1 row, scanning all rows.

-----------------------------------------










1 comment: