sparkSQL
코드 예시(notebook 코드 참조)
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import spark.implicits._ // RDD to table
Intitializing Scala interpreter ...
Spark Web UI available at http://163.152.---.---:---
SparkContext available as 'sc' (version = 2.3.1, master = local[*], app id = local-1554313599365)
SparkSession available as 'spark'
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import org.apache.log4j._
import spark.implicits._
case class Person(ID:Int, name:String, age:Int, numFriends:Int)
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}
val ID = StructField("ID", IntegerType)
val name = StructField("name", StringType)
val age = StructField("age", IntegerType)
val num = StructField("numFriends", IntegerType)
val schema = StructType(Array(ID, name, age, num))
ID: org.apache.spark.sql.types.StructField = StructField(ID,IntegerType,true)
name: org.apache.spark.sql.types.StructField = StructField(name,StringType,true)
age: org.apache.spark.sql.types.StructField = StructField(age,IntegerType,true)
num: org.apache.spark.sql.types.StructField = StructField(numFriends,IntegerType,true)
schema: org.apache.spark.sql.types.StructType = StructType(StructField(ID,IntegerType,true), StructField(name,StringType,true), StructField(age,IntegerType,true), StructField(numFriends,IntegerType,true))
sc
)가 아니라 SparkSession(spark
)를 사용해야 데이터프레임을 사용할 수 있다.StructField
).csvStructField
).jsonval schemaPeople = spark.read.schema(schema).csv("data/fakefriends.csv")
schemaPeople: org.apache.spark.sql.DataFrame = [ID: int, name: string ... 2 more fields]
schemaPeople.printSchema
root
|-- ID: integer (nullable = true)
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- numFriends: integer (nullable = true)
show
를 이용해 데이터프레임을 확인할 수 있다.schemaPeople.show()
+---+--------+---+----------+
| ID| name|age|numFriends|
+---+--------+---+----------+
| 0| Will| 33| 385|
| 1|Jean-Luc| 26| 2|
| 2| Hugh| 55| 221|
| 3| Deanna| 40| 465|
| 4| Quark| 68| 21|
| 5| Weyoun| 59| 318|
| 6| Gowron| 37| 220|
| 7| Will| 54| 307|
| 8| Jadzia| 38| 380|
| 9| Hugh| 27| 181|
| 10| Odo| 53| 191|
| 11| Ben| 57| 372|
| 12| Keiko| 54| 253|
| 13|Jean-Luc| 56| 444|
| 14| Hugh| 43| 49|
| 15| Rom| 36| 49|
| 16| Weyoun| 22| 323|
| 17| Odo| 35| 13|
| 18|Jean-Luc| 45| 455|
| 19| Geordi| 60| 246|
+---+--------+---+----------+
only showing top 20 rows
schemaPeople.createOrReplaceTempView("people")
spark.sql("SELECT * FROM people limit 5").show
+---+--------+---+----------+
| ID| name|age|numFriends|
+---+--------+---+----------+
| 0| Will| 33| 385|
| 1|Jean-Luc| 26| 2|
| 2| Hugh| 55| 221|
| 3| Deanna| 40| 465|
| 4| Quark| 68| 21|
+---+--------+---+----------+
spark.sql("SELECT * FROM people order by age desc limit 5").show
+---+-----+---+----------+
| ID| name|age|numFriends|
+---+-----+---+----------+
|116| Ben| 69| 75|
|205| Morn| 69| 236|
| 99|Keiko| 69| 491|
| 97|Nerys| 69| 361|
| 62|Keiko| 69| 9|
+---+-----+---+----------+
val teenagers = spark.sql("SELECT * FROM people WHERE age >= 13 AND age <= 19")
teenagers: org.apache.spark.sql.DataFrame = [ID: int, name: string ... 2 more fields]
collect
를 사용하여 객체를 출력할수 있다.val results = teenagers.collect()
results: Array[org.apache.spark.sql.Row] = Array([21,Miles,19,268], [52,Beverly,19,269], [54,Brunt,19,5], [106,Beverly,18,499], [115,Dukat,18,397], [133,Quark,19,265], [136,Will,19,335], [225,Elim,19,106], [304,Will,19,404], [341,Data,18,326], [366,Keiko,19,119], [373,Quark,19,272], [377,Beverly,18,418], [404,Kasidy,18,24], [409,Nog,19,267], [439,Data,18,417], [444,Keiko,18,472], [492,Dukat,19,36], [494,Kasidy,18,194])
results.foreach(println)
[21,Miles,19,268]
[52,Beverly,19,269]
[54,Brunt,19,5]
[106,Beverly,18,499]
[115,Dukat,18,397]
[133,Quark,19,265]
[136,Will,19,335]
[225,Elim,19,106]
[304,Will,19,404]
[341,Data,18,326]
[366,Keiko,19,119]
[373,Quark,19,272]
[377,Beverly,18,418]
[404,Kasidy,18,24]
[409,Nog,19,267]
[439,Data,18,417]
[444,Keiko,18,472]
[492,Dukat,19,36]
[494,Kasidy,18,194]
spark.stop()