sparkSQL(spark.read.schema)

November 6, 2018    Spark

sparkSQL

  • RDD을 위해, SparkContext를 생성했다면, SQL를 사용하기 위해 SqlContext를 정의
  • json/csv파일을 read/write 할 수 있음


코드 예시(notebook 코드 참조)

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import spark.implicits._ // RDD to table
Intitializing Scala interpreter ...

Spark Web UI available at http://163.152.---.---:---
SparkContext available as 'sc' (version = 2.3.1, master = local[*], app id = local-1554313599365)
SparkSession available as 'spark'

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import org.apache.log4j._
import spark.implicits._


  • case class: simple class in scala
case class Person(ID:Int, name:String, age:Int, numFriends:Int)
  • 데이터 프레임의 column이름을 정의할때 case class와 map을 이용하여 table schema를 만들 수 있음
  • notebook에서는 이러한 방식이 job할당이 잘 안됨
  • 아래와 같이 다른 방식으로 접근


import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}


val ID = StructField("ID", IntegerType)
val name = StructField("name", StringType)
val age = StructField("age", IntegerType)
val num = StructField("numFriends", IntegerType)
val schema = StructType(Array(ID, name, age, num))
ID: org.apache.spark.sql.types.StructField = StructField(ID,IntegerType,true)
name: org.apache.spark.sql.types.StructField = StructField(name,StringType,true)
age: org.apache.spark.sql.types.StructField = StructField(age,IntegerType,true)
num: org.apache.spark.sql.types.StructField = StructField(numFriends,IntegerType,true)
schema: org.apache.spark.sql.types.StructType = StructType(StructField(ID,IntegerType,true), StructField(name,StringType,true), StructField(age,IntegerType,true), StructField(numFriends,IntegerType,true))


  • SparkContext(sc)가 아니라 SparkSession(spark)를 사용해야 데이터프레임을 사용할 수 있다.
  • 데이터 불러오는 형식
    • spark.read.schema(StructField).csv
    • spark.read.schema(StructField).json
val schemaPeople = spark.read.schema(schema).csv("data/fakefriends.csv")
schemaPeople: org.apache.spark.sql.DataFrame = [ID: int, name: string ... 2 more fields]


schemaPeople.printSchema
root
 |-- ID: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- numFriends: integer (nullable = true)


  • show를 이용해 데이터프레임을 확인할 수 있다.
schemaPeople.show()
+---+--------+---+----------+
| ID|    name|age|numFriends|
+---+--------+---+----------+
|  0|    Will| 33|       385|
|  1|Jean-Luc| 26|         2|
|  2|    Hugh| 55|       221|
|  3|  Deanna| 40|       465|
|  4|   Quark| 68|        21|
|  5|  Weyoun| 59|       318|
|  6|  Gowron| 37|       220|
|  7|    Will| 54|       307|
|  8|  Jadzia| 38|       380|
|  9|    Hugh| 27|       181|
| 10|     Odo| 53|       191|
| 11|     Ben| 57|       372|
| 12|   Keiko| 54|       253|
| 13|Jean-Luc| 56|       444|
| 14|    Hugh| 43|        49|
| 15|     Rom| 36|        49|
| 16|  Weyoun| 22|       323|
| 17|     Odo| 35|        13|
| 18|Jean-Luc| 45|       455|
| 19|  Geordi| 60|       246|
+---+--------+---+----------+
only showing top 20 rows


  • table에 이름을 부여하는 방법으로 선택적으로 query를 사용할 수 있음
schemaPeople.createOrReplaceTempView("people")
spark.sql("SELECT * FROM people limit 5").show
+---+--------+---+----------+
| ID|    name|age|numFriends|
+---+--------+---+----------+
|  0|    Will| 33|       385|
|  1|Jean-Luc| 26|         2|
|  2|    Hugh| 55|       221|
|  3|  Deanna| 40|       465|
|  4|   Quark| 68|        21|
+---+--------+---+----------+


  • limit 5: 상위 5개
spark.sql("SELECT * FROM people order by age desc limit 5").show
+---+-----+---+----------+
| ID| name|age|numFriends|
+---+-----+---+----------+
|116|  Ben| 69|        75|
|205| Morn| 69|       236|
| 99|Keiko| 69|       491|
| 97|Nerys| 69|       361|
| 62|Keiko| 69|         9|
+---+-----+---+----------+


val teenagers = spark.sql("SELECT * FROM people WHERE age >= 13 AND age <= 19")
teenagers: org.apache.spark.sql.DataFrame = [ID: int, name: string ... 2 more fields]


  • RDD와 유사하게 collect를 사용하여 객체를 출력할수 있다.
val results = teenagers.collect()
results: Array[org.apache.spark.sql.Row] = Array([21,Miles,19,268], [52,Beverly,19,269], [54,Brunt,19,5], [106,Beverly,18,499], [115,Dukat,18,397], [133,Quark,19,265], [136,Will,19,335], [225,Elim,19,106], [304,Will,19,404], [341,Data,18,326], [366,Keiko,19,119], [373,Quark,19,272], [377,Beverly,18,418], [404,Kasidy,18,24], [409,Nog,19,267], [439,Data,18,417], [444,Keiko,18,472], [492,Dukat,19,36], [494,Kasidy,18,194])


results.foreach(println)
[21,Miles,19,268]
[52,Beverly,19,269]
[54,Brunt,19,5]
[106,Beverly,18,499]
[115,Dukat,18,397]
[133,Quark,19,265]
[136,Will,19,335]
[225,Elim,19,106]
[304,Will,19,404]
[341,Data,18,326]
[366,Keiko,19,119]
[373,Quark,19,272]
[377,Beverly,18,418]
[404,Kasidy,18,24]
[409,Nog,19,267]
[439,Data,18,417]
[444,Keiko,18,472]
[492,Dukat,19,36]
[494,Kasidy,18,194]


  • spark session 정지
spark.stop()

DSBA