[cloudera@quickstart ~]$ gedit prof
[cloudera@quickstart ~]$ hadoop fs -mkdir mlib
[cloudera@quickstart ~]$ hadoop fs -
copyFromLocal prof mlib
[cloudera@quickstart ~]$
scala> val data = sc.textFile
("/user/cloudera/mlib/prof")
data: org.apache.spark.rdd.RDD[String] =
/user/cloudera/mlib/prof MapPartitionsRDD[1]
at textFile at <console>:27
scala> data.collect.take(3).foreach(println)
"a","w","h","c"
25,80,5.9,120
23,55,5.7,90
scala> val ndata = data.filter{x =>
| !(x.split(",")(0).contains("a"))
| }
scala> ndata.collect.foreach(println)
25,80,5.9,120
23,55,5.7,90
23,89,6.0,130
26,80,5.9,120
23,55,5.7,90
23,69,6.0,130
28,81,5.9,120
23,55,5.9,190
23,81,6.0,130
29,87,5.9,120
23,55,5.7,190
23,89,5.0,130
scala>
scala>
| import
org.apache.spark.mllib.regression.LabeledPoint
import
org.apache.spark.mllib.regression.LabeledPoint
scala> import
org.apache.spark.mllib.regression.LinearRegres
sionModel
import
org.apache.spark.mllib.regression.LinearRegres
sionModel
scala> import
org.apache.spark.mllib.regression.LinearRegres
sionWithSGD
import
org.apache.spark.mllib.regression.LinearRegres
sionWithSGD
scala> import
org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.Vectors
-------------
scala> def toLabel(line:String) = {
|
| val w = line.split(",")
| val lbl = w(3).toDouble
| val f = w.take(3).map(x =>
x.toDouble)
| LabeledPoint(lbl,
Vectors.dense(f))
| }
toLabel: (line: String)
org.apache.spark.mllib.regression.LabeledPoint
scala>
scala> toLabel("23,78,5.9,120")
res8:
org.apache.spark.mllib.regression.LabeledPoint
= (120.0,[23.0,78.0,5.9])
scala>
val trainset = ndata.map(x => toLabel(x))
scala> val trainset = ndata.map(x => toLabel
(x))
trainset: org.apache.spark.rdd.RDD
[org.apache.spark.mllib.regression.LabeledPoin
t] = MapPartitionsRDD[3] at map at
<console>:37
scala>
scala> trainset.collect.foreach(println)
(120.0,[25.0,80.0,5.9])
(90.0,[23.0,55.0,5.7])
(130.0,[23.0,89.0,6.0])
(120.0,[26.0,80.0,5.9])
(90.0,[23.0,55.0,5.7])
(130.0,[23.0,69.0,6.0])
(120.0,[28.0,81.0,5.9])
(190.0,[23.0,55.0,5.9])
(130.0,[23.0,81.0,6.0])
(120.0,[29.0,87.0,5.9])
(190.0,[23.0,55.0,5.7])
(130.0,[23.0,89.0,5.0])
scala>
val numIterations = 100
val model = LinearRegressionWithSGD.train
(trainset, numIterations)
val valuesAndPreds = trainset.map {
x =>
val prediction = model.predict
(x.features)
(x.label, prediction)
}
// above contains , y and ycap,
y is actual label and ycap is predicted
label.
[ label means response variable ].
scala> valuesAndPreds.collect.foreach(println)
(120.0,-7.150280334821135E301)
(90.0,-5.078652953403039E301)
(130.0,-7.824818198048878E301)
(120.0,-7.176538392878548E301)
(90.0,-5.078652953403039E301)
(130.0,-6.210523036282773E301)
(120.0,-7.309769267081678E301)
(190.0,-5.07989526649868E301)
(130.0,-7.179100133342436E301)
(120.0,-7.820315873668922E301)
(190.0,-5.078652953403039E301)
(130.0,-7.818606632570674E301)
val mse = valuesAndPreds.map{ x =>
val y = x._1.toInt
val ycap = x._2.toInt
val e = y - ycap
e*e
}.mean
continue the trails by improving
number of iterations. till you meet
convergence. [[ mse wont be changed. ]]
val acc = valuesAndPreds.map{ x =>
val y = x._1.toInt
val ycap = x._2.toInt
val dist = ((y-ycap)*100)/y
val stat=if (dist>= -20 & dist<= 20) "Pass"
else "Fail"
(stat,1)
}
val accres = acc.reduceByKey(_+_)
---------------------------------
if accuracy satisfied,
apply the predictions on predictables(live
data )
model.predict(<dense vector>)
dense vector should contain
only features.
-----------------------------------
[cloudera@quickstart ~]$ hadoop fs -mkdir mlib
[cloudera@quickstart ~]$ hadoop fs -
copyFromLocal prof mlib
[cloudera@quickstart ~]$
scala> val data = sc.textFile
("/user/cloudera/mlib/prof")
data: org.apache.spark.rdd.RDD[String] =
/user/cloudera/mlib/prof MapPartitionsRDD[1]
at textFile at <console>:27
scala> data.collect.take(3).foreach(println)
"a","w","h","c"
25,80,5.9,120
23,55,5.7,90
scala> val ndata = data.filter{x =>
| !(x.split(",")(0).contains("a"))
| }
scala> ndata.collect.foreach(println)
25,80,5.9,120
23,55,5.7,90
23,89,6.0,130
26,80,5.9,120
23,55,5.7,90
23,69,6.0,130
28,81,5.9,120
23,55,5.9,190
23,81,6.0,130
29,87,5.9,120
23,55,5.7,190
23,89,5.0,130
scala>
scala>
| import
org.apache.spark.mllib.regression.LabeledPoint
import
org.apache.spark.mllib.regression.LabeledPoint
scala> import
org.apache.spark.mllib.regression.LinearRegres
sionModel
import
org.apache.spark.mllib.regression.LinearRegres
sionModel
scala> import
org.apache.spark.mllib.regression.LinearRegres
sionWithSGD
import
org.apache.spark.mllib.regression.LinearRegres
sionWithSGD
scala> import
org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.Vectors
-------------
scala> def toLabel(line:String) = {
|
| val w = line.split(",")
| val lbl = w(3).toDouble
| val f = w.take(3).map(x =>
x.toDouble)
| LabeledPoint(lbl,
Vectors.dense(f))
| }
toLabel: (line: String)
org.apache.spark.mllib.regression.LabeledPoint
scala>
scala> toLabel("23,78,5.9,120")
res8:
org.apache.spark.mllib.regression.LabeledPoint
= (120.0,[23.0,78.0,5.9])
scala>
val trainset = ndata.map(x => toLabel(x))
scala> val trainset = ndata.map(x => toLabel
(x))
trainset: org.apache.spark.rdd.RDD
[org.apache.spark.mllib.regression.LabeledPoin
t] = MapPartitionsRDD[3] at map at
<console>:37
scala>
scala> trainset.collect.foreach(println)
(120.0,[25.0,80.0,5.9])
(90.0,[23.0,55.0,5.7])
(130.0,[23.0,89.0,6.0])
(120.0,[26.0,80.0,5.9])
(90.0,[23.0,55.0,5.7])
(130.0,[23.0,69.0,6.0])
(120.0,[28.0,81.0,5.9])
(190.0,[23.0,55.0,5.9])
(130.0,[23.0,81.0,6.0])
(120.0,[29.0,87.0,5.9])
(190.0,[23.0,55.0,5.7])
(130.0,[23.0,89.0,5.0])
scala>
val numIterations = 100
val model = LinearRegressionWithSGD.train
(trainset, numIterations)
val valuesAndPreds = trainset.map {
x =>
val prediction = model.predict
(x.features)
(x.label, prediction)
}
// above contains , y and ycap,
y is actual label and ycap is predicted
label.
[ label means response variable ].
scala> valuesAndPreds.collect.foreach(println)
(120.0,-7.150280334821135E301)
(90.0,-5.078652953403039E301)
(130.0,-7.824818198048878E301)
(120.0,-7.176538392878548E301)
(90.0,-5.078652953403039E301)
(130.0,-6.210523036282773E301)
(120.0,-7.309769267081678E301)
(190.0,-5.07989526649868E301)
(130.0,-7.179100133342436E301)
(120.0,-7.820315873668922E301)
(190.0,-5.078652953403039E301)
(130.0,-7.818606632570674E301)
val mse = valuesAndPreds.map{ x =>
val y = x._1.toInt
val ycap = x._2.toInt
val e = y - ycap
e*e
}.mean
continue the trails by improving
number of iterations. till you meet
convergence. [[ mse wont be changed. ]]
val acc = valuesAndPreds.map{ x =>
val y = x._1.toInt
val ycap = x._2.toInt
val dist = ((y-ycap)*100)/y
val stat=if (dist>= -20 & dist<= 20) "Pass"
else "Fail"
(stat,1)
}
val accres = acc.reduceByKey(_+_)
---------------------------------
if accuracy satisfied,
apply the predictions on predictables(live
data )
model.predict(<dense vector>)
dense vector should contain
only features.
-----------------------------------