DataFrame Operation
코드 예시(notebook 코드 참조)
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import org.apache.log4j._
import scala.io.Codec
import scala.io.Source
import java.nio.charset.CodingErrorAction
// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)
u.data
데이터
val lines = sc.textFile("data/u.data")
lines: org.apache.spark.rdd.RDD[String] = data/u.data MapPartitionsRDD[1] at textFile at <console>:38
lines.take(5).foreach(println)
196 242 3 881250949
186 302 3 891717742
22 377 1 878887116
244 51 2 880606923
166 346 1 886397596
map
의 _
는 각 row를 의미한다.//RDD
val mID = lines.map(_.split("\t")(1).toInt)
mID: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at map at <console>:40
mID.take(5).foreach(println)
242
302
377
51
346
// Convert records of the RDD (mID) to Rows and DF
val moviesDS = mID.map(x=> x.toInt).toDF("movieID")
moviesDS: org.apache.spark.sql.DataFrame = [movieID: int]
moviesDS.show()
+-------+
|movieID|
+-------+
| 242|
| 302|
| 377|
| 51|
| 346|
| 474|
| 265|
| 465|
| 451|
| 86|
| 257|
| 1014|
| 222|
| 40|
| 29|
| 785|
| 387|
| 274|
| 1042|
| 1184|
+-------+
only showing top 20 rows
moviesDS
은 데이터프레임이다.groupBy
은 large operation이기 때문에 적절한 위치에 cache()
가 필요하다.count()
: 그룹별 countingmoviesDS
.groupBy("movieID")
.count().show()
+-------+-----+
|movieID|count|
+-------+-----+
| 496| 231|
| 471| 221|
| 463| 71|
| 148| 128|
| 1342| 2|
| 833| 49|
| 1088| 13|
| 1591| 6|
| 1238| 8|
| 1580| 1|
| 1645| 1|
| 392| 68|
| 623| 39|
| 540| 43|
| 858| 3|
| 737| 59|
| 243| 132|
| 1025| 44|
| 1084| 21|
| 1127| 11|
+-------+-----+
only showing top 20 rows
cache
를 사용하자.
cache()
: 결과를 계속 재 생성하는 것이 아니라 저장val topMovieIDs = moviesDS
.groupBy("movieID")
.count()
.orderBy(desc("count"))
.cache()
topMovieIDs: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [movieID: int, count: bigint]
topMovieIDs.show()
+-------+-----+
|movieID|count|
+-------+-----+
| 50| 583|
| 258| 509|
| 100| 508|
| 181| 507|
| 294| 485|
| 286| 481|
| 288| 478|
| 1| 452|
| 300| 431|
| 121| 429|
| 174| 420|
| 127| 413|
| 56| 394|
| 7| 392|
| 98| 390|
| 237| 384|
| 117| 378|
| 172| 367|
| 222| 365|
| 204| 350|
+-------+-----+
only showing top 20 rows
DataFrame
에 take
를 사용하면 각 row를 선택하는 것을 확인 할 수 있다.val top10 = topMovieIDs.take(10)
top10: Array[org.apache.spark.sql.Row] = Array([50,583], [258,509], [100,508], [181,507], [294,485], [286,481], [288,478], [1,452], [300,431], [121,429])
top10.foreach(println)
[50,583]
[258,509]
[100,508]
[181,507]
[294,485]
[286,481]
[288,478]
[1,452]
[300,431]
[121,429]
loadMovieNames
는 여기에 자세히 설명되어 있다.def loadMovieNames() : Map[Int, String] = {
// Handle character encoding issues:
implicit val codec = Codec("UTF-8")
codec.onMalformedInput(CodingErrorAction.REPLACE)
codec.onUnmappableCharacter(CodingErrorAction.REPLACE)
// Create a Map of Intsgroup count to Strings, and populate it from u.item.
var movieNames:Map[Int, String] = Map()
val lines = Source.fromFile("data/u.item").getLines()
for (line <- lines) {
var fields = line.split('|')
if (fields.length > 1) {
// (User ID -> Moive ID)
movieNames += (fields(0).toInt -> fields(1))
}
}
return movieNames
}
val names = loadMovieNames()
loadMovieNames: ()Map[Int,String]
names: Map[Int,String] = Map(645 -> Paris Is Burning (1990), 892 -> Flubber (1997), 69 -> Forrest Gump (1994), 1322 -> Metisse (Caf� au Lait) (1993), 1665 -> Brother's Kiss, A (1997), 1036 -> Drop Dead Fred (1991), 1586 -> Lashou shentan (1992), 1501 -> Prisoner of the Mountains (Kavkazsky Plennik) (1996), 809 -> Rising Sun (1993), 1337 -> Larger Than Life (1996), 1411 -> Barbarella (1968), 629 -> Victor/Victoria (1982), 1024 -> Mrs. Dalloway (1997), 1469 -> Tom and Huck (1995), 365 -> Powder (1995), 1369 -> Forbidden Christ, The (Cristo proibito, Il) (1950), 138 -> D3: The Mighty Ducks (1996), 1190 -> That Old Feeling (1997), 1168 -> Little Buddha (1993), 760 -> Screamers (1995), 101 -> Heavy Metal (1981), 1454 -> Angel and the Badman (1947), 1633 -> �...
for (result <- top10) {
// result is just a Row at this point; we need to cast it back.
// Each row has movieID, count as above.
println (names(result(0).asInstanceOf[Int]) + ": " + result(1))
}
Star Wars (1977): 583
Contact (1997): 509
Fargo (1996): 508
Return of the Jedi (1983): 507
Liar Liar (1997): 485
English Patient, The (1996): 481
Scream (1996): 478
Toy Story (1995): 452
Air Force One (1997): 431
Independence Day (ID4) (1996): 429
// Stop the session
spark.stop()