  • RDD을 위해, SparkContext를 생성했다면, SQL를 사용하기 위해 SqlContext를 정의
  • json/csv파일을 read/write 할 수 있음

코드 예시(notebook 코드 참조)

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import spark.implicits._ // RDD to table
  • case class: simple class in scala
case class Person(ID:Int, name:String, age:Int, numFriends:Int)
  • 데이터 프레임의 column이름을 정의할때 case class와 map을 이용하여 table schema를 만들 수 있음
  • notebook에서는 이러한 방식이 job할당이 잘 안됨
  • 아래와 같이 다른 방식으로 접근

import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}
val ID = StructField("ID", IntegerType)
val name = StructField("name", StringType)
val age = StructField("age", IntegerType)
val num = StructField("numFriends", IntegerType)
val schema = StructType(Array(ID, name, age, num))
ID: org.apache.spark.sql.types.StructField = StructField(ID,IntegerType,true)
name: org.apache.spark.sql.types.StructField = StructField(name,StringType,true)
age: org.apache.spark.sql.types.StructField = StructField(age,IntegerType,true)
num: org.apache.spark.sql.types.StructField = StructField(numFriends,IntegerType,true)
schema: org.apache.spark.sql.types.StructType = StructType(StructField(ID,IntegerType,true), StructField(name,StringType,true), StructField(age,IntegerType,true), StructField(numFriends,IntegerType,true))

  • SparkContext(sc)가 아니라 SparkSession(spark)를 사용해야 데이터프레임을 사용할 수 있다.
  • 데이터 불러오는 형식
val schemaPeople ="data/fakefriends.csv")
schemaPeople: org.apache.spark.sql.DataFrame = [ID: int, name: string ... 2 more fields]

 |-- ID: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- numFriends: integer (nullable = true)

  • show를 이용해 데이터프레임을 확인할 수 있다.
| ID|    name|age|numFriends|
|  0|    Will| 33|       385|
|  1|Jean-Luc| 26|         2|
|  2|    Hugh| 55|       221|
|  3|  Deanna| 40|       465|
|  4|   Quark| 68|        21|
|  5|  Weyoun| 59|       318|
|  6|  Gowron| 37|       220|
|  7|    Will| 54|       307|
|  8|  Jadzia| 38|       380|
|  9|    Hugh| 27|       181|
| 10|     Odo| 53|       191|
| 11|     Ben| 57|       372|
| 12|   Keiko| 54|       253|
| 13|Jean-Luc| 56|       444|
| 14|    Hugh| 43|        49|
| 15|     Rom| 36|        49|
| 16|  Weyoun| 22|       323|
| 17|     Odo| 35|        13|
| 18|Jean-Luc| 45|       455|
| 19|  Geordi| 60|       246|
only showing top 20 rows

  • table에 이름을 부여하는 방법으로 선택적으로 query를 사용할 수 있음
spark.sql("SELECT * FROM people limit 5").show
  • limit 5: 상위 5개
spark.sql("SELECT * FROM people order by age desc limit 5").show
val teenagers = spark.sql("SELECT * FROM people WHERE age >= 13 AND age <= 19")
teenagers: org.apache.spark.sql.DataFrame = [ID: int, name: string ... 2 more fields]

  • RDD와 유사하게 collect를 사용하여 객체를 출력할수 있다.
val results = teenagers.collect()
results: Array[org.apache.spark.sql.Row] = Array([21,Miles,19,268], [52,Beverly,19,269], [54,Brunt,19,5], [106,Beverly,18,499], [115,Dukat,18,397], [133,Quark,19,265], [136,Will,19,335], [225,Elim,19,106], [304,Will,19,404], [341,Data,18,326], [366,Keiko,19,119], [373,Quark,19,272], [377,Beverly,18,418], [404,Kasidy,18,24], [409,Nog,19,267], [439,Data,18,417], [444,Keiko,18,472], [492,Dukat,19,36], [494,Kasidy,18,194])


  • spark session 정지