We-Co

[We-Co] Spark Dataset 본문

Spark

[We-Co] Spark Dataset

위기의코딩맨 2021. 9. 6. 11:30
반응형

안녕하세요. 위기의코딩맨입니다.

오늘은 Spark에서 중요한 개념인 Dataset에 대해 간단하게 알아보도록 하겠습니다.

 

 

[ DataSet ] 

Dataset 나오기 이전에 DataFrame이라는 클래스를 구현해서 언어와 상관 없이 사용하고 있었습니다.

Dataset은 버전 Saprk 1.6에서 처음 소개되었으며, Java언어와 Scala언어에서만 사용이 가능했었습니다.

이때 버전에서는 Dataset이 DataFrame을 대체한다는 느낌이 없었기 때문에 두 가지 모두 사용했습니다.

그러나 Spark 2.0 버전부터 DataFrame 클래스가 Dataset 클래스로 통합되어 타입 별칭 기능을 가진

Scala 언어에서만 기존 방식 처럼 모두 사용가능 했지만 해당 기능이 없던 Java에서는 통합된 Dataset 클래스만

사용이 가능하게 됐습니다.

 

또한, RDD와 마찬가지로 트랜스포메이션, 액션연산을 포함하고 있습니다.

 

Scala는 DataFram, Dataset 클래스 Java는 Dataset 클래스, Python은 R을 사용하는 경우에 Datafram을 사용하게 됐습니다.

 

기존 예제를 보며 Dataset에대해 알아보도록 하겠습니다.

 

 

scala> val rdd = sc.parallelize(List(1,2,3))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24 
scala> print(rdd.collect.mkString(", "))

 

출력은 1, 2, 3이 되는것을 확인하실 수 있습니다.

기존에 사용했던 방식은 데이터를 생성해서 RDD에 넣어주었습니다.

같은 내용으로 Dataset을 생성해보겠습니다.

 

 

scala> val ds = List(1,2,3).toDS
21/09/06 11:22:12 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/D:/SPARK/spark-3.1.1-bin-hadoop3.2/bin/spark-warehouse').
21/09/06 11:22:12 INFO SharedState: Warehouse path is 'file:/D:/SPARK/spark-3.1.1-bin-hadoop3.2/bin/spark-warehouse'.
21/09/06 11:22:13 INFO CodeGenerator: Code generated in 232.2984 ms
ds: org.apache.spark.sql.Dataset[Int] = [value: int]

scala> ds.show
21/09/06 11:22:21 INFO CodeGenerator: Code generated in 10.4867 ms
21/09/06 11:22:21 INFO CodeGenerator: Code generated in 20.981 ms
+-----+
|value|
+-----+
|    1|
|    2|
|    3| 
+-----+

 

Dataset을 생성한 예제입니다. RDD를 사용했을 때처럼 출력을 하기위한 print()를 하지 않아도 내용을 표시할 수 있습니다. 또한, 해당 Data의 스키마정보를 확인할 수 있습니다.

 

scala> ds.printSchema
root
 |-- value: integer (nullable = false)

 

Dataset은 값과 스키마 정보까지 함께 포함하고 있기 때문에 스키마 기반으로 데이터 처리할때 성능 최적화할 수 있습니다.

 

그렇다면 RDD와의 차이는 무엇이 있을까요?

데이터를 처리하는 연산은 트랜스포메이션으로 연산을 진행하고, 결과를 생성하는 연산은 액션연산으로 분류 되는 것은 RDD와 유사합니다.

그러나 Dataset은 트랜스포메이션 연산에서 데이터를 처리하는 방법에 따라

타입연산(typed operations)와 비타입연산(untyped operations)으로 나눌 수 있습니다.

 

 

scala> val data= 1 to 100 toList
warning: there was one feature warning; for details, enable `:setting -feature' or `:replay -feature'
data: List[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100)

scala> val ds = data.toDS
ds: org.apache.spark.sql.Dataset[Int] = [value: int]

scala> val result = ds.map(_+1)
result: org.apache.spark.sql.Dataset[Int] = [value: int]

 

scala> print(result.collect.mkString(", "))

 

2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101

 

scala> val result2 = ds.select(col("value")+1)
result2: org.apache.spark.sql.DataFrame = [(value + 1): int]

 

scala> result2.show
+-----------+
|(value + 1)|
+-----------+
|          2|
|          3|
|          4|
|          5|
|          6|
|          7|
|          8|
|          9|
|         10|
|         11|
|         12|
|         13|
|         14|
|         15|
|         16|
|         17|
|         18|
|         19|
|         20|
|         21|
+-----------+
only showing top 20 rows

 

 

예제를 살펴보면, result는 RDD의 map()메서드와 동일한 방법을 사용해서 데이터를 출력한 것을 확인할 수 있으며

result2는 Dataset을 데이터베이스의 테이블과 유사하게 처리를 한 방법입니다.

그래서 "value"+1이라는 부분이 테이블의 col(컬럼)의 값에 +1의 연산을 진행해라 라는 의미를 갖습니다.

여기서 중요한점은 해당 타입이 Int형이 아닌 org.apache.spark.sql.Column 타입이라는 것입니다.

위에서 말씀드렸던 비타입 연산이란  org.apache.spark.sql.Column과  org.apache.spark.sql.Row의 객체를

처리하는 연산이라고 할 수 있습니다.

 

 

 

Dataset의 연산 방식과 RDD의 연산 방식의 차이점, 연산방법을 알아보았습니다.

 

반응형