key/values Examples
코드 예시(notebook 코드 참조)
import org.apache.spark._
Intitializing Scala interpreter ...
Spark Web UI available at http://163.152.---.---:----
SparkContext available as 'sc' (version = 2.3.1, master = local[*], app id = local-1552908203569)
SparkSession available as 'spark'
import org.apache.spark._
val lines = sc.textFile("./data/fakefriends.csv")
lines: org.apache.spark.rdd.RDD[String] = ./data/fakefriends.csv MapPartitionsRDD[1] at textFile at <console>:28
lines.take(10).foreach(println)
0,Will,33,385
1,Jean-Luc,26,2
2,Hugh,55,221
3,Deanna,40,465
4,Quark,68,21
5,Weyoun,59,318
6,Gowron,37,220
7,Will,54,307
8,Jadzia,38,380
9,Hugh,27,181
val fields = lines.map(line => line.split(","))
fields: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[44] at map at <console>:30
val age = fields.map(field => field(2).toInt )
val numFriends = fields.map(field => field(3).toInt )
age: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[45] at map at <console>:32
numFriends: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[46] at map at <console>:33
age.take(10).foreach(println)
33
26
55
40
68
59
37
54
38
27
numFriends.take(10).foreach(println)
385
2
221
465
21
318
220
307
380
181
val merged = age.zip(numFriends)
merged: org.apache.spark.rdd.RDD[(Int, Int)] = ZippedPartitionsRDD2[48] at zip at <console>:35
merged.take(10).foreach(println)
(33,385)
(26,2)
(55,221)
(40,465)
(68,21)
(59,318)
(37,220)
(54,307)
(38,380)
(27,181)
scala
문법: def = {output}
def parseLine(line: String)={
val fields = line.split(",")
val age = fields(2).toInt
val numFriends = fields(3).toInt
(age, numFriends) // will be appended as element
}
parseLine: (line: String)(Int, Int)
val rdd = lines.map(parseLine)
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[17] at map at <console>:38
rdd.take(10).foreach(println)
(33,385)
(26,2)
(55,221)
(40,465)
(68,21)
(59,318)
(37,220)
(54,307)
(38,380)
(27,181)
val value_expand = rdd.mapValues(x => (x,1))
value_expand: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = MapPartitionsRDD[41] at mapValues at <console>:40
value_expand.take(10).foreach(println)
(33,(385,1))
(26,(2,1))
(55,(221,1))
(40,(465,1))
(68,(21,1))
(59,(318,1))
(37,(220,1))
(54,(307,1))
(38,(380,1))
(27,(181,1))
//reduceByKey(현재 value, 누적 value) => (현재 value 1 + 누적 value 1, 현재 value 2 + 누적 value 2)
val key_cum = value_expand.reduceByKey((x,y)=> (x._1 + y._1, x._2 + y._2))
key_cum: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = ShuffledRDD[42] at reduceByKey at <console>:43
key_cum.take(10).foreach(println)
(34,(1473,6))
(52,(3747,11))
(56,(1840,6))
(66,(2488,9))
(22,(1445,7))
(28,(2091,10))
(54,(3615,13))
(46,(2908,13))
(48,(2814,10))
(30,(2594,11))
val averageByAge = key_cum.mapValues(x => x._1/x._2)
averageByAge: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[43] at mapValues at <console>:44
averageByAge.take(10).foreach(println)
(34,245)
(52,340)
(56,306)
(66,276)
(22,206)
(28,209)
(54,278)
(46,223)
(48,281)
(30,235)
val results = averageByAge.collect()
results: Array[(Int, Int)] = Array((34,245), (52,340), (56,306), (66,276), (22,206), (28,209), (54,278), (46,223), (48,281), (30,235), (50,254), (32,207), (36,246), (24,233), (62,220), (64,281), (42,303), (40,250), (18,343), (20,165), (38,193), (58,116), (44,282), (60,202), (26,242), (68,269), (19,213), (39,169), (41,268), (61,256), (21,350), (47,233), (55,295), (53,222), (25,197), (29,215), (59,220), (65,298), (35,211), (27,228), (57,258), (51,302), (33,325), (37,249), (23,246), (45,309), (63,384), (67,214), (69,235), (49,184), (31,267), (43,230))
results(0)
res71: (Int, Int) = (34,245)
results.slice(0,2)
res72: Array[(Int, Int)] = Array((34,245), (52,340))