Filter(multiple zip, case)

October 28, 2018    Spark

Temperature Examples

  • RDD의 데이터를 제거할 때 주로 사용
  • Used functions
    • (a,b,c) 세쌍의 pair-set을 만드는 방법
    • case: 값을 바꿀 때 주로 사용


코드 예시(notebook 코드 참조)

import org.apache.spark._
Intitializing Scala interpreter ...

Spark Web UI available at http://163.152.---.---:----
SparkContext available as 'sc' (version = 2.3.1, master = local[*], app id = local-1553043368515)
SparkSession available as 'spark'

import org.apache.spark._
val lines = sc.textFile("./data/1800.csv")
lines: org.apache.spark.rdd.RDD[String] = ./data/1800.csv MapPartitionsRDD[1] at textFile at <console>:28


  • 1 col: Weather station
  • 2 col: Date
  • 3 col: Temperature tyoe
  • 4 col: Temperature value
lines.take(10).foreach(println)
ITE00100554,18000101,TMAX,-75,,,E,
ITE00100554,18000101,TMIN,-148,,,E,
GM000010962,18000101,PRCP,0,,,E,
EZE00100082,18000101,TMAX,-86,,,E,
EZE00100082,18000101,TMIN,-135,,,E,
ITE00100554,18000102,TMAX,-60,,I,E,
ITE00100554,18000102,TMIN,-125,,,E,
GM000010962,18000102,PRCP,0,,,E,
EZE00100082,18000102,TMAX,-44,,,E,
EZE00100082,18000102,TMIN,-130,,,E,


val stationID = lines.map(line => line.split(",")(0))
stationID: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[4] at map at <console>:30


stationID.take(10).foreach(println)
ITE00100554
ITE00100554
GM000010962
EZE00100082
EZE00100082
ITE00100554
ITE00100554
GM000010962
EZE00100082
EZE00100082


val entryType = lines.map(line => line.split(",")(2))
entryType: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[5] at map at <console>:30


entryType.take(10).foreach(println)
TMAX
TMIN
PRCP
TMAX
TMIN
TMAX
TMIN
PRCP
TMAX
TMIN


val temperature = lines.map(line => line.split(",")(3).toFloat)
temperature: org.apache.spark.rdd.RDD[Float] = MapPartitionsRDD[8] at map at <console>:30


temperature.take(10).foreach(println)
-75.0
-148.0
0.0
-86.0
-135.0
-60.0
-125.0
0.0
-44.0
-130.0


val temperature = lines.map(line => line.split(",")(3).toFloat*0.1f*(9.0f/5.0f)+32.0f)
temperature: org.apache.spark.rdd.RDD[Float] = MapPartitionsRDD[9] at map at <console>:30


temperature.take(10).foreach(println)
18.5
5.3600006
32.0
16.52
7.700001
21.2
9.5
32.0
24.08
8.6


  • zip two items
val tmp = stationID.zip(entryType)
tmp: org.apache.spark.rdd.RDD[(String, String)] = ZippedPartitionsRDD2[86] at zip at <console>:34


tmp.collect()
res94: Array[(String, String)] = Array((ITE00100554,TMAX), (ITE00100554,TMIN), (GM000010962,PRCP), (EZE00100082,TMAX), (EZE00100082,TMIN), (ITE00100554,TMAX), (ITE00100554,TMIN), (GM000010962,PRCP), (EZE00100082,TMAX), (EZE00100082,TMIN), (ITE00100554,TMAX), (ITE00100554,TMIN), (GM000010962,PRCP), (EZE00100082,TMAX), (EZE00100082,TMIN), (ITE00100554,TMAX), (ITE00100554,TMIN), (GM000010962,PRCP), (EZE00100082,TMAX), (EZE00100082,TMIN), (ITE00100554,TMAX), (ITE00100554,TMIN), (GM000010962,PRCP), (EZE00100082,TMAX), (EZE00100082,TMIN), (ITE00100554,TMAX), (ITE00100554,TMIN), (GM000010962,PRCP), (EZE00100082,TMAX), (EZE00100082,TMIN), (ITE00100554,TMAX), (ITE00100554,TMIN), (GM000010962,PRCP), (EZE00100082,TMAX), (EZE00100082,TMIN), (ITE00100554,TMAX), (ITE00100554,TMIN), (GM000010962,PRCP)...


  • zip multiple items
val tmp = stationID.zip(entryType).zip(temperature).map({ 
  case ((a,b), c) => (a,b,c)
})
tmp: org.apache.spark.rdd.RDD[(String, String, Float)] = MapPartitionsRDD[89] at map at <console>:36


tmp.collect()
res95: Array[(String, String, Float)] = Array((ITE00100554,TMAX,18.5), (ITE00100554,TMIN,5.3600006), (GM000010962,PRCP,32.0), (EZE00100082,TMAX,16.52), (EZE00100082,TMIN,7.700001), (ITE00100554,TMAX,21.2), (ITE00100554,TMIN,9.5), (GM000010962,PRCP,32.0), (EZE00100082,TMAX,24.08), (EZE00100082,TMIN,8.6), (ITE00100554,TMAX,27.86), (ITE00100554,TMIN,23.720001), (GM000010962,PRCP,32.72), (EZE00100082,TMAX,30.2), (EZE00100082,TMIN,18.86), (ITE00100554,TMAX,32.0), (ITE00100554,TMIN,29.66), (GM000010962,PRCP,32.0), (EZE00100082,TMAX,22.1), (EZE00100082,TMIN,18.68), (ITE00100554,TMAX,33.8), (ITE00100554,TMIN,30.92), (GM000010962,PRCP,32.0), (EZE00100082,TMAX,24.8), (EZE00100082,TMIN,21.56), (ITE00100554,TMAX,34.34), (ITE00100554,TMIN,34.34), (GM000010962,PRCP,32.0), (EZE00100082,TMAX,24.98), (E...


  • 위 코드를 ()없이 표현 할 수도 있다.
val tmp = stationID zip entryType zip temperature map { 
  case ((a,b), c) => (a,b,c)
}
tmp: org.apache.spark.rdd.RDD[(String, String, Float)] = MapPartitionsRDD[104] at map at <console>:36


tmp.collect()
res99: Array[(String, String, Float)] = Array((ITE00100554,TMAX,18.5), (ITE00100554,TMIN,5.3600006), (GM000010962,PRCP,32.0), (EZE00100082,TMAX,16.52), (EZE00100082,TMIN,7.700001), (ITE00100554,TMAX,21.2), (ITE00100554,TMIN,9.5), (GM000010962,PRCP,32.0), (EZE00100082,TMAX,24.08), (EZE00100082,TMIN,8.6), (ITE00100554,TMAX,27.86), (ITE00100554,TMIN,23.720001), (GM000010962,PRCP,32.72), (EZE00100082,TMAX,30.2), (EZE00100082,TMIN,18.86), (ITE00100554,TMAX,32.0), (ITE00100554,TMIN,29.66), (GM000010962,PRCP,32.0), (EZE00100082,TMAX,22.1), (EZE00100082,TMIN,18.68), (ITE00100554,TMAX,33.8), (ITE00100554,TMIN,30.92), (GM000010962,PRCP,32.0), (EZE00100082,TMAX,24.8), (EZE00100082,TMIN,21.56), (ITE00100554,TMAX,34.34), (ITE00100554,TMIN,34.34), (GM000010962,PRCP,32.0), (EZE00100082,TMAX,24.98), (E...


  • As a function, to append pair tuples
def parseLine(line:String) ={
    val fields = line.split(",")
    val stationID = fields(0)
    val entryType = fields(2)
    val temperature = fields(3).toFloat*0.1f*(9.0f/5.0f) +32.0f  // from Celslus to Fahrenheit
    (stationID, entryType, temperature)
}
parseLine: (line: String)(String, String, Float)


val parsedLines = lines.map(parseLine)
parsedLines: org.apache.spark.rdd.RDD[(String, String, Float)] = MapPartitionsRDD[19] at map at <console>:40


parsedLines.take(10).foreach(println)
(ITE00100554,TMAX,18.5)
(ITE00100554,TMIN,5.3600006)
(GM000010962,PRCP,32.0)
(EZE00100082,TMAX,16.52)
(EZE00100082,TMIN,7.700001)
(ITE00100554,TMAX,21.2)
(ITE00100554,TMIN,9.5)
(GM000010962,PRCP,32.0)
(EZE00100082,TMAX,24.08)
(EZE00100082,TMIN,8.6)


  • Filtering: mininum temperature만 고려싶다면 아래와 같이 나타낼수 있다.
val minTemps = parsedLines.filter(x => x._2 == "TMIN")
minTemps: org.apache.spark.rdd.RDD[(String, String, Float)] = MapPartitionsRDD[20] at filter at <console>:42


minTemps.take(10).foreach(println)
(ITE00100554,TMIN,5.3600006)
(EZE00100082,TMIN,7.700001)
(ITE00100554,TMIN,9.5)
(EZE00100082,TMIN,8.6)
(ITE00100554,TMIN,23.720001)
(EZE00100082,TMIN,18.86)
(ITE00100554,TMIN,29.66)
(EZE00100082,TMIN,18.68)
(ITE00100554,TMIN,30.92)
(EZE00100082,TMIN,21.56)


val stationTemps = minTemps.map(x => (x._1, x._3.toFloat))
stationTemps: org.apache.spark.rdd.RDD[(String, Float)] = MapPartitionsRDD[23] at map at <console>:44


stationTemps.take(10).foreach(println)
(ITE00100554,5.3600006)
(EZE00100082,7.700001)
(ITE00100554,9.5)
(EZE00100082,8.6)
(ITE00100554,23.720001)
(EZE00100082,18.86)
(ITE00100554,29.66)
(EZE00100082,18.68)
(ITE00100554,30.92)
(EZE00100082,21.56)


  • 각 key마다 최소값을 비교해가면서 찾음
// first step (x,y) => min(5.36, 7.7)
val minTempsByStation = stationTemps.reduceByKey((x,y) => math.min(x,y))
minTempsByStation: org.apache.spark.rdd.RDD[(String, Float)] = ShuffledRDD[26] at reduceByKey at <console>:47


val results = minTempsByStation.collect()
results: Array[(String, Float)] = Array((EZE00100082,7.700001), (ITE00100554,5.3600006))


results.sorted
res41: Array[(String, Float)] = Array((EZE00100082,7.700001), (ITE00100554,5.3600006))


for (result <- results.sorted){
    val station = result._1
    val temp = result._2
    val formattedTemp = f"$temp%2f F"
    println(s"$station minimum temperature: $formattedTemp")
}
EZE00100082 minimum temperature: 7.700001 F
ITE00100554 minimum temperature: 5.360001 F

DSBA