Streaming

November 12, 2018    Spark

Streaming

  • Streaming을 사용하게 되면 real-time으로 연산결과를 확인/저장할 수 있다.


특징

  • 연산방식
    • byte단위가 아닌 1초단위로 실시간 연산이 수행
    • 일반적으로 small RDD chunk단위로 수행됨
  • 저장
    • checkpoint를 이용하여 저장
  • 연산
    • Dstreaming 객체의 형태지만 RDD함수로 연산도 할 수 있음
  • 연산범위
    • window operation으로 특정시점에 대한 연산이 가능
      • e.g. reducedByWindow()
    • 세션별로 연산이 가능
      • e.g. updateStateByKey()


코드 예시(notebook 코드 참조)

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._


  • Streaming을 하기 위해 sparkContext, sparkSession가 아닌 StreamingContext를 생성해 주자.
val ssc = new StreamingContext(sc, Seconds(2))
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@66c6ae57



Setup netcat

  • Terminal에 입력되는 내용(텍스트)이 실시간으로 스파크에 전달되게 한다.
  • Terminal를 열어 아래와 같이 실행
    • OpenBSD netcat(nc)
    • -l: Listen mode, for inbound connects
    • -k: Keep inbound sockets open for multiple connects
    • “5000”: port
# on the terminal
nc -lk 5000


  • 다시 돌아와서, spark와 netcat의 연결을 선언하는 코드는 아래와 같다.
//ssc.socketTextStream("IP address", port)
val lines = ssc.socketTextStream("localhost", 5000)
lines: org.apache.spark.streaming.dstream.ReceiverInputDStream[String] = org.apache.spark.streaming.dstream.SocketInputDStream@3c501c53


  • 띄어쓰기로 문장 parsing
val words = lines.flatMap(_.split(" "))
words: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.FlatMappedDStream@788fa4fc


  • reduceByKeyAndWindow 사용법을 살펴보자.
  • 아래 예제에서는,
    • window length: 30초만큼의 RDD연산(계산되는 범위)
    • sliding interval: 10초마다 계산(계산되는 주)
    • 계산방식은 단어(key)별로 단어갯수(value)들을 더하는 방식(x+y)
import org.apache.spark.streaming.StreamingContext._
val pairs = words.map(word => (word, 1))
// Reduce last 30 seconds of data, every 10 seconds
val wordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b) , Seconds(30), Seconds(10))
import org.apache.spark.streaming.StreamingContext._
pairs: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.MappedDStream@6a98c5dd
wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@2450eaf5


wordCounts.print


  • ssc.start()를 사용하여 necat과 최종적으로 연결이 된다.
  • 따라서, necat에 입력이 되는 텍스트들이 위에 쓰여진 코드의 결과들로 산출 될 수 있다.
ssc.start()             // Start the computation
ssc.awaitTermination()  // Wait for the computation to terminate


  • 예제로, 아래와 같은 텍스트를 입력해 보았다.


DSBA