Streaming
특징
코드 예시(notebook 코드 참조)
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
sparkContext
, sparkSession
가 아닌 StreamingContext
를 생성해 주자.val ssc = new StreamingContext(sc, Seconds(2))
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@66c6ae57
Setup netcat
-l
: Listen mode, for inbound connects-k
: Keep inbound sockets open for multiple connects# on the terminal
nc -lk 5000
//ssc.socketTextStream("IP address", port)
val lines = ssc.socketTextStream("localhost", 5000)
lines: org.apache.spark.streaming.dstream.ReceiverInputDStream[String] = org.apache.spark.streaming.dstream.SocketInputDStream@3c501c53
val words = lines.flatMap(_.split(" "))
words: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.FlatMappedDStream@788fa4fc
reduceByKeyAndWindow
사용법을 살펴보자.window length
: 30초만큼의 RDD연산(계산되는 범위)sliding interval
: 10초마다 계산(계산되는 주)key
)별로 단어갯수(value
)들을 더하는 방식(x+y
)import org.apache.spark.streaming.StreamingContext._
val pairs = words.map(word => (word, 1))
// Reduce last 30 seconds of data, every 10 seconds
val wordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b) , Seconds(30), Seconds(10))
import org.apache.spark.streaming.StreamingContext._
pairs: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.MappedDStream@6a98c5dd
wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@2450eaf5
wordCounts.print
ssc.start()
를 사용하여 necat
과 최종적으로 연결이 된다.necat
에 입력이 되는 텍스트들이 위에 쓰여진 코드의 결과들로 산출 될 수 있다.ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate