[cloudera@quickstart ~]$ gedit prof
[cloudera@quickstart ~]$ hadoop fs -mkdir mlib
[cloudera@quickstart ~]$ hadoop fs -
copyFromLocal prof mlib
[cloudera@quickstart ~]$
scala> val data = sc.textFile
("/user/cloudera/mlib/prof")
data: org.apache.spark.rdd.RDD[String] =
/user/cloudera/mlib/prof MapPartitionsRDD[1]
at textFile at <console>:27
scala> data.collect.take(3).foreach(println)
"a","w","h","c"
25,80,5.9,120
23,55,5.7,90
scala> val ndata = data.filter{x =>
| !(x.split(",")(0).contains("a"))
| }
scala> ndata.collect.foreach(println)
25,80,5.9,120
23,55,5.7,90
23,89,6.0,130
26,80,5.9,120
23,55,5.7,90
23,69,6.0,130
28,81,5.9,120
23,55,5.9,190
23,81,6.0,130
29,87,5.9,120
23,55,5.7,190
23,89,5.0,130
scala>
scala>
| import
org.apache.spark.mllib.regression.LabeledPoint
import
org.apache.spark.mllib.regression.LabeledPoint
scala> import
org.apache.spark.mllib.regression.LinearRegres
sionModel
import
org.apache.spark.mllib.regression.LinearRegres
sionModel
scala> import
org.apache.spark.mllib.regression.LinearRegres
sionWithSGD
import
org.apache.spark.mllib.regression.LinearRegres
sionWithSGD
scala> import
org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.Vectors
-------------
scala> def toLabel(line:String) = {
|
| val w = line.split(",")
| val lbl = w(3).toDouble
| val f = w.take(3).map(x =>
x.toDouble)
| LabeledPoint(lbl,
Vectors.dense(f))
| }
toLabel: (line: String)
org.apache.spark.mllib.regression.LabeledPoint
scala>
scala> toLabel("23,78,5.9,120")
res8:
org.apache.spark.mllib.regression.LabeledPoint
= (120.0,[23.0,78.0,5.9])
scala>
val trainset = ndata.map(x => toLabel(x))
scala> val trainset = ndata.map(x => toLabel
(x))
trainset: org.apache.spark.rdd.RDD
[org.apache.spark.mllib.regression.LabeledPoin
t] = MapPartitionsRDD[3] at map at
<console>:37
scala>
scala> trainset.collect.foreach(println)
(120.0,[25.0,80.0,5.9])
(90.0,[23.0,55.0,5.7])
(130.0,[23.0,89.0,6.0])
(120.0,[26.0,80.0,5.9])
(90.0,[23.0,55.0,5.7])
(130.0,[23.0,69.0,6.0])
(120.0,[28.0,81.0,5.9])
(190.0,[23.0,55.0,5.9])
(130.0,[23.0,81.0,6.0])
(120.0,[29.0,87.0,5.9])
(190.0,[23.0,55.0,5.7])
(130.0,[23.0,89.0,5.0])
scala>
val numIterations = 100
val model = LinearRegressionWithSGD.train
(trainset, numIterations)
val valuesAndPreds = trainset.map {
x =>
val prediction = model.predict
(x.features)
(x.label, prediction)
}
// above contains , y and ycap,
y is actual label and ycap is predicted
label.
[ label means response variable ].
scala> valuesAndPreds.collect.foreach(println)
(120.0,-7.150280334821135E301)
(90.0,-5.078652953403039E301)
(130.0,-7.824818198048878E301)
(120.0,-7.176538392878548E301)
(90.0,-5.078652953403039E301)
(130.0,-6.210523036282773E301)
(120.0,-7.309769267081678E301)
(190.0,-5.07989526649868E301)
(130.0,-7.179100133342436E301)
(120.0,-7.820315873668922E301)
(190.0,-5.078652953403039E301)
(130.0,-7.818606632570674E301)
val mse = valuesAndPreds.map{ x =>
val y = x._1.toInt
val ycap = x._2.toInt
val e = y - ycap
e*e
}.mean
continue the trails by improving
number of iterations. till you meet
convergence. [[ mse wont be changed. ]]
val acc = valuesAndPreds.map{ x =>
val y = x._1.toInt
val ycap = x._2.toInt
val dist = ((y-ycap)*100)/y
val stat=if (dist>= -20 & dist<= 20) "Pass"
else "Fail"
(stat,1)
}
val accres = acc.reduceByKey(_+_)
---------------------------------
if accuracy satisfied,
apply the predictions on predictables(live
data )
model.predict(<dense vector>)
dense vector should contain
only features.
-----------------------------------
[cloudera@quickstart ~]$ hadoop fs -mkdir mlib
[cloudera@quickstart ~]$ hadoop fs -
copyFromLocal prof mlib
[cloudera@quickstart ~]$
scala> val data = sc.textFile
("/user/cloudera/mlib/prof")
data: org.apache.spark.rdd.RDD[String] =
/user/cloudera/mlib/prof MapPartitionsRDD[1]
at textFile at <console>:27
scala> data.collect.take(3).foreach(println)
"a","w","h","c"
25,80,5.9,120
23,55,5.7,90
scala> val ndata = data.filter{x =>
| !(x.split(",")(0).contains("a"))
| }
scala> ndata.collect.foreach(println)
25,80,5.9,120
23,55,5.7,90
23,89,6.0,130
26,80,5.9,120
23,55,5.7,90
23,69,6.0,130
28,81,5.9,120
23,55,5.9,190
23,81,6.0,130
29,87,5.9,120
23,55,5.7,190
23,89,5.0,130
scala>
scala>
| import
org.apache.spark.mllib.regression.LabeledPoint
import
org.apache.spark.mllib.regression.LabeledPoint
scala> import
org.apache.spark.mllib.regression.LinearRegres
sionModel
import
org.apache.spark.mllib.regression.LinearRegres
sionModel
scala> import
org.apache.spark.mllib.regression.LinearRegres
sionWithSGD
import
org.apache.spark.mllib.regression.LinearRegres
sionWithSGD
scala> import
org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.Vectors
-------------
scala> def toLabel(line:String) = {
|
| val w = line.split(",")
| val lbl = w(3).toDouble
| val f = w.take(3).map(x =>
x.toDouble)
| LabeledPoint(lbl,
Vectors.dense(f))
| }
toLabel: (line: String)
org.apache.spark.mllib.regression.LabeledPoint
scala>
scala> toLabel("23,78,5.9,120")
res8:
org.apache.spark.mllib.regression.LabeledPoint
= (120.0,[23.0,78.0,5.9])
scala>
val trainset = ndata.map(x => toLabel(x))
scala> val trainset = ndata.map(x => toLabel
(x))
trainset: org.apache.spark.rdd.RDD
[org.apache.spark.mllib.regression.LabeledPoin
t] = MapPartitionsRDD[3] at map at
<console>:37
scala>
scala> trainset.collect.foreach(println)
(120.0,[25.0,80.0,5.9])
(90.0,[23.0,55.0,5.7])
(130.0,[23.0,89.0,6.0])
(120.0,[26.0,80.0,5.9])
(90.0,[23.0,55.0,5.7])
(130.0,[23.0,69.0,6.0])
(120.0,[28.0,81.0,5.9])
(190.0,[23.0,55.0,5.9])
(130.0,[23.0,81.0,6.0])
(120.0,[29.0,87.0,5.9])
(190.0,[23.0,55.0,5.7])
(130.0,[23.0,89.0,5.0])
scala>
val numIterations = 100
val model = LinearRegressionWithSGD.train
(trainset, numIterations)
val valuesAndPreds = trainset.map {
x =>
val prediction = model.predict
(x.features)
(x.label, prediction)
}
// above contains , y and ycap,
y is actual label and ycap is predicted
label.
[ label means response variable ].
scala> valuesAndPreds.collect.foreach(println)
(120.0,-7.150280334821135E301)
(90.0,-5.078652953403039E301)
(130.0,-7.824818198048878E301)
(120.0,-7.176538392878548E301)
(90.0,-5.078652953403039E301)
(130.0,-6.210523036282773E301)
(120.0,-7.309769267081678E301)
(190.0,-5.07989526649868E301)
(130.0,-7.179100133342436E301)
(120.0,-7.820315873668922E301)
(190.0,-5.078652953403039E301)
(130.0,-7.818606632570674E301)
val mse = valuesAndPreds.map{ x =>
val y = x._1.toInt
val ycap = x._2.toInt
val e = y - ycap
e*e
}.mean
continue the trails by improving
number of iterations. till you meet
convergence. [[ mse wont be changed. ]]
val acc = valuesAndPreds.map{ x =>
val y = x._1.toInt
val ycap = x._2.toInt
val dist = ((y-ycap)*100)/y
val stat=if (dist>= -20 & dist<= 20) "Pass"
else "Fail"
(stat,1)
}
val accres = acc.reduceByKey(_+_)
---------------------------------
if accuracy satisfied,
apply the predictions on predictables(live
data )
model.predict(<dense vector>)
dense vector should contain
only features.
-----------------------------------
Thank you sir
ReplyDeleteThanks you sir
ReplyDeleteCan we do performance testing of bigdata part using Rest api
ReplyDeleteI really appreciate information shared above. It’s of great help. If someone want to learn Online (Virtual) instructor lead live training in Apache spark mlib, kindly contact us http://www.maxmunus.com/contact
ReplyDeleteMaxMunus Offer World Class Virtual Instructor led training on Apache spark mlib. We have industry expert trainer. We provide Training Material and Software Support. MaxMunus has successfully conducted 100000+ trainings in India, USA, UK, Australlia, Switzerland, Qatar, Saudi Arabia, Bangladesh, Bahrain and UAE etc.
For Free Demo Contact us:
Name : Arunkumar U
Email : arun@maxmunus.com
Skype id: training_maxmunus
Contact No.-+91-9738507310
Company Website –http://www.maxmunus.com
Thanks for sharing very useful information
ReplyDeleteHadoop Online Training