Linear Regression using RDD

November 10, 2018    Spark

회귀 분석(RDD)

  • Spark를 이용한 회귀분석으로 입력으로 RDD를 사용하였다.


코드 예시(notebook 코드 참조)

import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.regression.LinearRegressionWithSGD
import org.apache.spark.mllib.optimization.SquaredL2Updater


  • SparkContext(sc)로 데이터셋(RDD)을 불러온다.
val trainingLines = sc.textFile("data/regression.txt")
trainingLines: org.apache.spark.rdd.RDD[String] = data/regression.txt MapPartitionsRDD[1] at textFile at <console>:35


  • 1 col: y
  • 2 col: x
trainingLines.take(10).foreach(println)
-1.74,1.66
1.24,-1.18
0.29,-0.40
-0.13,0.09
-0.39,0.38
-1.79,1.73
0.71,-0.77
1.39,-1.48
1.15,-1.43
0.13,-0.07


  • trainingData를 다시 평가해보자.
// same as training
val testingLines = sc.textFile("data/regression-test.txt")
testingLines: org.apache.spark.rdd.RDD[String] = data/regression-test.txt MapPartitionsRDD[24] at textFile at <console>:36


  • LabeledPoint.parse: 변수와 타켓 구분
    • $(y,[X])$
val trainingData = trainingLines.map(LabeledPoint.parse).cache()
val testData = testingLines.map(LabeledPoint.parse)
trainingData: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[25] at map at <console>:39
testData: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[26] at map at <console>:40


testData.take(10).foreach(println)
(-1.74,[1.66])
(1.24,[-1.18])
(0.29,[-0.4])
(-0.13,[0.09])
(-0.39,[0.38])
(-1.79,[1.73])
(0.71,[-0.77])
(1.39,[-1.48])
(1.15,[-1.43])
(0.13,[-0.07])


// x variable
testData.map( x => x.features).take(10).foreach(println)
[1.66]
[-1.18]
[-0.4]
[0.09]
[0.38]
[1.73]
[-0.77]
[-1.48]
[-1.43]
[-0.07]


// y target
testData.map( x => x.label).take(10).foreach(println)
-1.74
1.24
0.29
-0.13
-0.39
-1.79
0.71
1.39
1.15
0.13


  • SGD방법으로 회귀분석하는 모델을 생성
val algorithm = new LinearRegressionWithSGD()

algorithm.optimizer
  .setNumIterations(100)
  .setStepSize(1.0) //learning rate
  .setUpdater(new SquaredL2Updater())
  .setRegParam(0.01)
algorithm: org.apache.spark.mllib.regression.LinearRegressionWithSGD = org.apache.spark.mllib.regression.LinearRegressionWithSGD@1e7da171
res9: algorithm.optimizer.type = org.apache.spark.mllib.optimization.GradientDescent@481acdc2


  • Training
val model = algorithm.run(trainingData)
model: org.apache.spark.mllib.regression.LinearRegressionModel = org.apache.spark.mllib.regression.LinearRegressionModel: intercept = 0.0, numFeatures = 1


  • Inference
val predictions = model.predict(testData.map(x => x.features))
predictions: org.apache.spark.rdd.RDD[Double] = MapPartitionsRDD[38] at mapPartitions at GeneralizedLinearAlgorithm.scala:70


predictions.take(10).foreach(println)
-1.636276212468243
1.1631361028388716
0.3942834246911429
-0.08871377055550715
-0.37456925345658576
-1.705275811789193
0.7589955925304501
1.4588486713572286
1.4095632432708358
0.06899959932095001


val predictionAndLabel = predictions.zip(testData.map(_.label))
predictionAndLabel: org.apache.spark.rdd.RDD[(Double, Double)] = ZippedPartitionsRDD2[43] at zip at <console>:48


  • (예측값, 실제값)
predictionAndLabel.take(10).foreach(println)
(-1.636276212468243,-1.74)
(1.1631361028388716,1.24)
(0.3942834246911429,0.29)
(-0.08871377055550715,-0.13)
(-0.37456925345658576,-0.39)
(-1.705275811789193,-1.79)
(0.7589955925304501,0.71)
(1.4588486713572286,1.39)
(1.4095632432708358,1.15)
(0.06899959932095001,0.13)

DSBA