DataFrame.Operation(groupBy, cache)

November 8, 2018    Spark

DataFrame Operation

  • DataFrame 객체 상태에서 어떻게 operation이 수행되는지 살펴보자.


코드 예시(notebook 코드 참조)

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import org.apache.log4j._
import scala.io.Codec
import scala.io.Source
import java.nio.charset.CodingErrorAction


// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)


  • u.data 데이터
    • 1 col: user id
    • 2 col: movie id
    • 3 col: rating
    • 4 col: timestamp
val lines = sc.textFile("data/u.data")
lines: org.apache.spark.rdd.RDD[String] = data/u.data MapPartitionsRDD[1] at textFile at <console>:38


lines.take(5).foreach(println)
196	242	3	881250949
186	302	3	891717742
22	377	1	878887116
244	51	2	880606923
166	346	1	886397596


  • 아래 예제에서 map_는 각 row를 의미한다.
//RDD
val mID = lines.map(_.split("\t")(1).toInt)
mID: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at map at <console>:40


mID.take(5).foreach(println)
242
302
377
51
346


  • RDD를 DataFrame으로 바꿔보자.
// Convert records of the RDD (mID) to Rows and DF
val moviesDS = mID.map(x=> x.toInt).toDF("movieID")
moviesDS: org.apache.spark.sql.DataFrame = [movieID: int]


moviesDS.show()
+-------+
|movieID|
+-------+
|    242|
|    302|
|    377|
|     51|
|    346|
|    474|
|    265|
|    465|
|    451|
|     86|
|    257|
|   1014|
|    222|
|     40|
|     29|
|    785|
|    387|
|    274|
|   1042|
|   1184|
+-------+
only showing top 20 rows


  • moviesDS은 데이터프레임이다.
  • 데이터프레임 상태에서 operation을 수행해보자.
    • groupBy은 large operation이기 때문에 적절한 위치에 cache()가 필요하다.
    • count(): 그룹별 counting
moviesDS
    .groupBy("movieID")
    .count().show()
+-------+-----+
|movieID|count|
+-------+-----+
|    496|  231|
|    471|  221|
|    463|   71|
|    148|  128|
|   1342|    2|
|    833|   49|
|   1088|   13|
|   1591|    6|
|   1238|    8|
|   1580|    1|
|   1645|    1|
|    392|   68|
|    623|   39|
|    540|   43|
|    858|    3|
|    737|   59|
|    243|  132|
|   1025|   44|
|   1084|   21|
|   1127|   11|
+-------+-----+
only showing top 20 rows


  • 위의 결과를 내림차순으로 정렬하여 cache를 사용하자.
    • cache(): 결과를 계속 재 생성하는 것이 아니라 저장
val topMovieIDs = moviesDS
    .groupBy("movieID")
    .count()
    .orderBy(desc("count"))
    .cache()
topMovieIDs: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [movieID: int, count: bigint]


topMovieIDs.show()
+-------+-----+
|movieID|count|
+-------+-----+
|     50|  583|
|    258|  509|
|    100|  508|
|    181|  507|
|    294|  485|
|    286|  481|
|    288|  478|
|      1|  452|
|    300|  431|
|    121|  429|
|    174|  420|
|    127|  413|
|     56|  394|
|      7|  392|
|     98|  390|
|    237|  384|
|    117|  378|
|    172|  367|
|    222|  365|
|    204|  350|
+-------+-----+
only showing top 20 rows


  • DataFrametake를 사용하면 각 row를 선택하는 것을 확인 할 수 있다.
val top10 = topMovieIDs.take(10)
top10: Array[org.apache.spark.sql.Row] = Array([50,583], [258,509], [100,508], [181,507], [294,485], [286,481], [288,478], [1,452], [300,431], [121,429])


top10.foreach(println)
[50,583]
[258,509]
[100,508]
[181,507]
[294,485]
[286,481]
[288,478]
[1,452]
[300,431]
[121,429]


  • 이전 글과 중복되는 내용으로 loadMovieNames여기에 자세히 설명되어 있다.
  • movieID와 name으로 맵핑하는 Map을 만들어 준다.
def loadMovieNames() : Map[Int, String] = {

// Handle character encoding issues:
implicit val codec = Codec("UTF-8")
codec.onMalformedInput(CodingErrorAction.REPLACE)
codec.onUnmappableCharacter(CodingErrorAction.REPLACE)

// Create a Map of Intsgroup count to Strings, and populate it from u.item.
var movieNames:Map[Int, String] = Map()

 val lines = Source.fromFile("data/u.item").getLines()
 for (line <- lines) {
   var fields = line.split('|')
   if (fields.length > 1) {
    // (User ID -> Moive ID)
    movieNames += (fields(0).toInt -> fields(1))
   }
 }


 return movieNames
}
val names = loadMovieNames()
loadMovieNames: ()Map[Int,String]
names: Map[Int,String] = Map(645 -> Paris Is Burning (1990), 892 -> Flubber (1997), 69 -> Forrest Gump (1994), 1322 -> Metisse (Caf� au Lait) (1993), 1665 -> Brother's Kiss, A (1997), 1036 -> Drop Dead Fred (1991), 1586 -> Lashou shentan (1992), 1501 -> Prisoner of the Mountains (Kavkazsky Plennik) (1996), 809 -> Rising Sun (1993), 1337 -> Larger Than Life (1996), 1411 -> Barbarella (1968), 629 -> Victor/Victoria (1982), 1024 -> Mrs. Dalloway (1997), 1469 -> Tom and Huck (1995), 365 -> Powder (1995), 1369 -> Forbidden Christ, The (Cristo proibito, Il) (1950), 138 -> D3: The Mighty Ducks (1996), 1190 -> That Old Feeling (1997), 1168 -> Little Buddha (1993), 760 -> Screamers (1995), 101 -> Heavy Metal (1981), 1454 -> Angel and the Badman (1947), 1633 -> �...


  • top10의 movieID를 받아 하나씩 이름으로 변환해준다.
for (result <- top10) {
  // result is just a Row at this point; we need to cast it back.
  // Each row has movieID, count as above.
  println (names(result(0).asInstanceOf[Int]) + ": " + result(1))
}
Star Wars (1977): 583
Contact (1997): 509
Fargo (1996): 508
Return of the Jedi (1983): 507
Liar Liar (1997): 485
English Patient, The (1996): 481
Scream (1996): 478
Toy Story (1995): 452
Air Force One (1997): 431
Independence Day (ID4) (1996): 429


// Stop the session
spark.stop()

DSBA