Sorting(sortByKey)

October 30, 2018    Spark

word count 예제

  • 텍스트 파일을 불러와 parsing한 후 단어(key)를 카운팅하는 예제
  • 그리고 그 단어 갯수에 대하여 내림차순으로 sorting


코드 예시(notebook 코드 참조)

import org.apache.spark._
Intitializing Scala interpreter ...


Spark Web UI available at http://163.152.---.---:----
SparkContext available as 'sc' (version = 2.3.1, master = local[*], app id = local-1553057037076)
SparkSession available as 'spark'

import org.apache.spark._


val input = sc.textFile("./data/book.txt")
input: org.apache.spark.rdd.RDD[String] = ./data/book.txt MapPartitionsRDD[5] at textFile at <console>:28


input.take(8).foreach(println)
Self-Employment: Building an Internet Business of One
Achieving Financial and Personal Freedom through a Lifestyle Technology Business
By Frank Kane



Copyright � 2015 Frank Kane. 
All rights reserved worldwide.


// match any character A-Z upper or lower case and any number 0 to 9
val words = input.flatMap(x => x.split("\\W+"))
words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[6] at flatMap at <console>:30


val lowercaseWords = words.map( x => x.toLowerCase())
lowercaseWords: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at map at <console>:32


  • action단계에서 count 실행
val wordCount = lowercaseWords.countByValue()
wordCount: scala.collection.Map[String,Long] = Map(serious -> 1, foolproof -> 1, precious -> 2, inflammatory -> 1, hourly -> 3, embedded -> 1, salesperson -> 7, plentiful -> 1, rate -> 9, 2014 -> 8, plugin -> 3, headache -> 1, purchasing -> 9, ons -> 1, bing -> 1, looks -> 2, ranking -> 2, irs -> 3, california -> 2, scare -> 1, finalized -> 1, associations -> 1, accident -> 3, physically -> 2, conversations -> 1, contracts -> 4, scenario -> 1, nudge -> 2, gamble -> 4, ideas -> 27, sketches -> 1, static -> 1, oculus -> 5, unity -> 1, tweeted -> 1, joining -> 1, particularly -> 1, used -> 18, eye -> 3, striking -> 2, minority -> 1, automatic -> 2, widely -> 4, impressions -> 5, checklist -> 3, e -> 1, conversion -> 4, worded -> 1, unidirectional -> 1, significantly -> 4, beautiful -> 1, p...


wordCount.take(10).foreach(println)
(serious,1)
(foolproof,1)
(precious,2)
(inflammatory,1)
(hourly,3)
(embedded,1)
(salesperson,7)
(plentiful,1)
(rate,9)
(2014,8)


sorting

  • Spark는 1) transforming, 2) action 두 단계로 이뤄진다. 일반적으로 transforming단계에서 operation들을 표현하는 RDD를 미리 선언하고 action단계에서 결과를 확인할 수 있다.
  • 위 예제같은 경우, countByKey가 action함수이므로 그 이후의 추가적인 연산이 어렵다.
  • 따라서, transforming단계에서 countByKey기능을 하는 RDD를 유지하고 싶을 때, 아래와 같이 작성할 수 있다.
lowercaseWords.take(10).foreach(println)
self
employment
building
an
internet
business
of
one
achieving
financial


val tmp = lowercaseWords.map(x =>(x,1))
tmp: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[12] at map at <console>:34


tmp.take(10).foreach(println)
(self,1)
(employment,1)
(building,1)
(an,1)
(internet,1)
(business,1)
(of,1)
(one,1)
(achieving,1)
(financial,1)


  • 위 count를 계산하는 countByvalue()는 spark processing에서 action에 해당 된다.
  • 하지만, 아래와 같은 버전은 transforming 단계에서 count operation을 가진 RDD를 생성한 것이다.
// (x, y) => (count, 누적값) 
val wordCounts = lowercaseWords.map(x =>(x,1)).reduceByKey((x,y) => x + y)
wordCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[14] at reduceByKey at <console>:35


wordCounts.take(10).foreach(println)
(transitions,1)
(someone,62)
(intimately,1)
(ll,114)
(surge,4)
(312,1)
(envelope,2)
(order,32)
(handled,4)
(behind,3)


// false: reversed (decending)
val wordCountsSorted = wordCounts.map(x => (x._2, x._1)).sortByKey(false)
wordCountsSorted: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[30] at sortByKey at <console>:37


wordCountsSorted.take(10).foreach(println)
(1878,you)
(1828,to)
(1420,your)
(1292,the)
(1191,a)
(970,of)
(934,and)
(747,that)
(649,it)
(616,in)

DSBA