DataFrame.Operation(groupBy, cache)

November 8, 2018    Spark

DataFrame Operation

  • DataFrame 객체 상태에서 어떻게 operation이 수행되는지 살펴보자.

코드 예시(notebook 코드 참조)

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import org.apache.log4j._
import java.nio.charset.CodingErrorAction

// Set the log level to only print errors

  • 데이터
    • 1 col: user id
    • 2 col: movie id
    • 3 col: rating
    • 4 col: timestamp
val lines = sc.textFile("data/")
lines: org.apache.spark.rdd.RDD[String] = data/ MapPartitionsRDD[1] at textFile at <console>:38

196	242	3	881250949
186	302	3	891717742
22	377	1	878887116
244	51	2	880606923
166	346	1	886397596

  • 아래 예제에서 map_는 각 row를 의미한다.
val mID ="\t")(1).toInt)
mID: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at map at <console>:40


  • RDD를 DataFrame으로 바꿔보자.
// Convert records of the RDD (mID) to Rows and DF
val moviesDS => x.toInt).toDF("movieID")
moviesDS: org.apache.spark.sql.DataFrame = [movieID: int]
|    242|
|    302|
|    377|
|     51|
|    346|
|    474|
|    265|
|    465|
|    451|
|     86|
|    257|
|   1014|
|    222|
|     40|
|     29|
|    785|
|    387|
|    274|
|   1042|
|   1184|
only showing top 20 rows

  • moviesDS은 데이터프레임이다.
  • 데이터프레임 상태에서 operation을 수행해보자.
    • groupBy은 large operation이기 때문에 적절한 위치에 cache()가 필요하다.
    • count(): 그룹별 counting
|    496|  231|
|    471|  221|
|    463|   71|
|    148|  128|
|   1342|    2|
|    833|   49|
|   1088|   13|
|   1591|    6|
|   1238|    8|
|   1580|    1|
|   1645|    1|
|    392|   68|
|    623|   39|
|    540|   43|
|    858|    3|
|    737|   59|
|    243|  132|
|   1025|   44|
|   1084|   21|
|   1127|   11|
only showing top 20 rows

  • 위의 결과를 내림차순으로 정렬하여 cache를 사용하자.
    • cache(): 결과를 계속 재 생성하는 것이 아니라 저장
val topMovieIDs = moviesDS
topMovieIDs: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [movieID: int, count: bigint]
|     50|  583|
|    258|  509|
|    100|  508|
|    181|  507|
|    294|  485|
|    286|  481|
|    288|  478|
|      1|  452|
|    300|  431|
|    121|  429|
|    174|  420|
|    127|  413|
|     56|  394|
|      7|  392|
|     98|  390|
|    237|  384|
|    117|  378|
|    172|  367|
|    222|  365|
|    204|  350|
only showing top 20 rows

  • DataFrametake를 사용하면 각 row를 선택하는 것을 확인 할 수 있다.
val top10 = topMovieIDs.take(10)
top10: Array[org.apache.spark.sql.Row] = Array([50,583], [258,509], [100,508], [181,507], [294,485], [286,481], [288,478], [1,452], [300,431], [121,429])


  • 이전 글과 중복되는 내용으로 loadMovieNames여기에 자세히 설명되어 있다.
  • movieID와 name으로 맵핑하는 Map을 만들어 준다.
def loadMovieNames() : Map[Int, String] = {

// Handle character encoding issues:
implicit val codec = Codec("UTF-8")

// Create a Map of Intsgroup count to Strings, and populate it from u.item.
var movieNames:Map[Int, String] = Map()

 val lines = Source.fromFile("data/u.item").getLines()
 for (line <- lines) {
   var fields = line.split('|')
   if (fields.length > 1) {
    // (User ID -> Moive ID)
    movieNames += (fields(0).toInt -> fields(1))

 return movieNames
val names = loadMovieNames()
loadMovieNames: ()Map[Int,String]
val names = loadMovieNames()
loadMovieNames: ()Map[Int,String]
names: Map[Int,String] = Map(645 -> Paris Is Burning (1990), 892 -> Flubber (1997), 69 -> Forrest Gump (1994), ...)

  • top10의 movieID를 받아 하나씩 이름으로 변환해준다.
for (result <- top10) {
  // result is just a Row at this point; we need to cast it back.
  // Each row has movieID, count as above.
  println (names(result(0).asInstanceOf[Int]) + ": " + result(1))
Star Wars (1977): 583
Contact (1997): 509
Fargo (1996): 508
Return of the Jedi (1983): 507
Liar Liar (1997): 485
English Patient, The (1996): 481
Scream (1996): 478
Toy Story (1995): 452
Air Force One (1997): 431
Independence Day (ID4) (1996): 429

// Stop the session