Temperature Examples
코드 예시(notebook 코드 참조)
import org.apache.spark._
Intitializing Scala interpreter ...
Spark Web UI available at http://163.152.---.---:----
SparkContext available as 'sc' (version = 2.3.1, master = local[*], app id = local-1553043368515)
SparkSession available as 'spark'
import org.apache.spark._
val lines = sc.textFile("./data/1800.csv")
lines: org.apache.spark.rdd.RDD[String] = ./data/1800.csv MapPartitionsRDD[1] at textFile at <console>:28
lines.take(10).foreach(println)
ITE00100554,18000101,TMAX,-75,,,E,
ITE00100554,18000101,TMIN,-148,,,E,
GM000010962,18000101,PRCP,0,,,E,
EZE00100082,18000101,TMAX,-86,,,E,
EZE00100082,18000101,TMIN,-135,,,E,
ITE00100554,18000102,TMAX,-60,,I,E,
ITE00100554,18000102,TMIN,-125,,,E,
GM000010962,18000102,PRCP,0,,,E,
EZE00100082,18000102,TMAX,-44,,,E,
EZE00100082,18000102,TMIN,-130,,,E,
val stationID = lines.map(line => line.split(",")(0))
stationID: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[4] at map at <console>:30
stationID.take(10).foreach(println)
ITE00100554
ITE00100554
GM000010962
EZE00100082
EZE00100082
ITE00100554
ITE00100554
GM000010962
EZE00100082
EZE00100082
val entryType = lines.map(line => line.split(",")(2))
entryType: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[5] at map at <console>:30
entryType.take(10).foreach(println)
TMAX
TMIN
PRCP
TMAX
TMIN
TMAX
TMIN
PRCP
TMAX
TMIN
val temperature = lines.map(line => line.split(",")(3).toFloat)
temperature: org.apache.spark.rdd.RDD[Float] = MapPartitionsRDD[8] at map at <console>:30
temperature.take(10).foreach(println)
-75.0
-148.0
0.0
-86.0
-135.0
-60.0
-125.0
0.0
-44.0
-130.0
val temperature = lines.map(line => line.split(",")(3).toFloat*0.1f*(9.0f/5.0f)+32.0f)
temperature: org.apache.spark.rdd.RDD[Float] = MapPartitionsRDD[9] at map at <console>:30
temperature.take(10).foreach(println)
18.5
5.3600006
32.0
16.52
7.700001
21.2
9.5
32.0
24.08
8.6
val tmp = stationID.zip(entryType)
tmp: org.apache.spark.rdd.RDD[(String, String)] = ZippedPartitionsRDD2[86] at zip at <console>:34
tmp.collect()
res94: Array[(String, String)] = Array((ITE00100554,TMAX), (ITE00100554,TMIN), (GM000010962,PRCP), (EZE00100082,TMAX), (EZE00100082,TMIN), (ITE00100554,TMAX), (ITE00100554,TMIN), (GM000010962,PRCP), (EZE00100082,TMAX), (EZE00100082,TMIN), (ITE00100554,TMAX), (ITE00100554,TMIN), (GM000010962,PRCP), (EZE00100082,TMAX), (EZE00100082,TMIN), (ITE00100554,TMAX), (ITE00100554,TMIN), (GM000010962,PRCP), (EZE00100082,TMAX), (EZE00100082,TMIN), (ITE00100554,TMAX), (ITE00100554,TMIN), (GM000010962,PRCP), (EZE00100082,TMAX), (EZE00100082,TMIN), (ITE00100554,TMAX), (ITE00100554,TMIN), (GM000010962,PRCP), (EZE00100082,TMAX), (EZE00100082,TMIN), (ITE00100554,TMAX), (ITE00100554,TMIN), (GM000010962,PRCP), (EZE00100082,TMAX), (EZE00100082,TMIN), (ITE00100554,TMAX), (ITE00100554,TMIN), (GM000010962,PRCP)...
val tmp = stationID.zip(entryType).zip(temperature).map({
case ((a,b), c) => (a,b,c)
})
tmp: org.apache.spark.rdd.RDD[(String, String, Float)] = MapPartitionsRDD[89] at map at <console>:36
tmp.collect()
res95: Array[(String, String, Float)] = Array((ITE00100554,TMAX,18.5), (ITE00100554,TMIN,5.3600006), (GM000010962,PRCP,32.0), (EZE00100082,TMAX,16.52), (EZE00100082,TMIN,7.700001), (ITE00100554,TMAX,21.2), (ITE00100554,TMIN,9.5), (GM000010962,PRCP,32.0), (EZE00100082,TMAX,24.08), (EZE00100082,TMIN,8.6), (ITE00100554,TMAX,27.86), (ITE00100554,TMIN,23.720001), (GM000010962,PRCP,32.72), (EZE00100082,TMAX,30.2), (EZE00100082,TMIN,18.86), (ITE00100554,TMAX,32.0), (ITE00100554,TMIN,29.66), (GM000010962,PRCP,32.0), (EZE00100082,TMAX,22.1), (EZE00100082,TMIN,18.68), (ITE00100554,TMAX,33.8), (ITE00100554,TMIN,30.92), (GM000010962,PRCP,32.0), (EZE00100082,TMAX,24.8), (EZE00100082,TMIN,21.56), (ITE00100554,TMAX,34.34), (ITE00100554,TMIN,34.34), (GM000010962,PRCP,32.0), (EZE00100082,TMAX,24.98), (E...
()
없이 표현 할 수도 있다.val tmp = stationID zip entryType zip temperature map {
case ((a,b), c) => (a,b,c)
}
tmp: org.apache.spark.rdd.RDD[(String, String, Float)] = MapPartitionsRDD[104] at map at <console>:36
tmp.collect()
res99: Array[(String, String, Float)] = Array((ITE00100554,TMAX,18.5), (ITE00100554,TMIN,5.3600006), (GM000010962,PRCP,32.0), (EZE00100082,TMAX,16.52), (EZE00100082,TMIN,7.700001), (ITE00100554,TMAX,21.2), (ITE00100554,TMIN,9.5), (GM000010962,PRCP,32.0), (EZE00100082,TMAX,24.08), (EZE00100082,TMIN,8.6), (ITE00100554,TMAX,27.86), (ITE00100554,TMIN,23.720001), (GM000010962,PRCP,32.72), (EZE00100082,TMAX,30.2), (EZE00100082,TMIN,18.86), (ITE00100554,TMAX,32.0), (ITE00100554,TMIN,29.66), (GM000010962,PRCP,32.0), (EZE00100082,TMAX,22.1), (EZE00100082,TMIN,18.68), (ITE00100554,TMAX,33.8), (ITE00100554,TMIN,30.92), (GM000010962,PRCP,32.0), (EZE00100082,TMAX,24.8), (EZE00100082,TMIN,21.56), (ITE00100554,TMAX,34.34), (ITE00100554,TMIN,34.34), (GM000010962,PRCP,32.0), (EZE00100082,TMAX,24.98), (E...
def parseLine(line:String) ={
val fields = line.split(",")
val stationID = fields(0)
val entryType = fields(2)
val temperature = fields(3).toFloat*0.1f*(9.0f/5.0f) +32.0f // from Celslus to Fahrenheit
(stationID, entryType, temperature)
}
parseLine: (line: String)(String, String, Float)
val parsedLines = lines.map(parseLine)
parsedLines: org.apache.spark.rdd.RDD[(String, String, Float)] = MapPartitionsRDD[19] at map at <console>:40
parsedLines.take(10).foreach(println)
(ITE00100554,TMAX,18.5)
(ITE00100554,TMIN,5.3600006)
(GM000010962,PRCP,32.0)
(EZE00100082,TMAX,16.52)
(EZE00100082,TMIN,7.700001)
(ITE00100554,TMAX,21.2)
(ITE00100554,TMIN,9.5)
(GM000010962,PRCP,32.0)
(EZE00100082,TMAX,24.08)
(EZE00100082,TMIN,8.6)
val minTemps = parsedLines.filter(x => x._2 == "TMIN")
minTemps: org.apache.spark.rdd.RDD[(String, String, Float)] = MapPartitionsRDD[20] at filter at <console>:42
minTemps.take(10).foreach(println)
(ITE00100554,TMIN,5.3600006)
(EZE00100082,TMIN,7.700001)
(ITE00100554,TMIN,9.5)
(EZE00100082,TMIN,8.6)
(ITE00100554,TMIN,23.720001)
(EZE00100082,TMIN,18.86)
(ITE00100554,TMIN,29.66)
(EZE00100082,TMIN,18.68)
(ITE00100554,TMIN,30.92)
(EZE00100082,TMIN,21.56)
val stationTemps = minTemps.map(x => (x._1, x._3.toFloat))
stationTemps: org.apache.spark.rdd.RDD[(String, Float)] = MapPartitionsRDD[23] at map at <console>:44
stationTemps.take(10).foreach(println)
(ITE00100554,5.3600006)
(EZE00100082,7.700001)
(ITE00100554,9.5)
(EZE00100082,8.6)
(ITE00100554,23.720001)
(EZE00100082,18.86)
(ITE00100554,29.66)
(EZE00100082,18.68)
(ITE00100554,30.92)
(EZE00100082,21.56)
// first step (x,y) => min(5.36, 7.7)
val minTempsByStation = stationTemps.reduceByKey((x,y) => math.min(x,y))
minTempsByStation: org.apache.spark.rdd.RDD[(String, Float)] = ShuffledRDD[26] at reduceByKey at <console>:47
val results = minTempsByStation.collect()
results: Array[(String, Float)] = Array((EZE00100082,7.700001), (ITE00100554,5.3600006))
results.sorted
res41: Array[(String, Float)] = Array((EZE00100082,7.700001), (ITE00100554,5.3600006))
for (result <- results.sorted){
val station = result._1
val temp = result._2
val formattedTemp = f"$temp%2f F"
println(s"$station minimum temperature: $formattedTemp")
}
EZE00100082 minimum temperature: 7.700001 F
ITE00100554 minimum temperature: 5.360001 F