We-Co

[We-Co] Spark SQL API 메서드(1) 본문

Spark

[We-Co] Spark SQL API 메서드(1)

위기의코딩맨 2021. 9. 12. 21:58
반응형

안녕하세요. 위기의코딩맨입니다.

비타입 트랜스포메이션 연산은 데이터의 실제 타입을 사용하지 않은 변환 연산을 수행한다는 의미를 갖고있습니다.

Spark에서 비타입 트랜스포메이션 연산을 진행할때는 

Row, Column, functions의 세가지 주제를 잘 이해해야합니다. 

 

org.apache.spark.sql.Row

org.apache.spark.sql.Column

org.apache.spark.sql.functions

 

Spark에서 제공하는 API 3가지를 의미합니다. 

중요한 점은 연산을 진행할때, 해당 데이터의 타입과 해당 API 타입과 다르면 오류를 반환하기 때문에

유의해야합니다.

 

유용하게 사용되는 API 몇가지 메서드들을 살펴보겠습니다.

 

[ ===, !== ]

2개의 컬럼 값들이 같은지 판단하는 메서드입니다.

기본적으로 "==", "!="를 사용하지만 "===", "!=="로 표시한다는 점!

 

[ alias(), as() ]

SQL문에서 컬럼에 별칭을 부여할때 "as"를 사용하는 것처럼 같은 용도로 사용됩니다.

 

scala> df.select('age+1).show()

 

+---------+
|(age + 1)|
+---------+
|        6|
|       11|
|       16|
+---------+

 

해당 SQL문은 age+1를 진행한 결과인데,

여기에 컬럼명을 변경해보겠습니다.

 

scala> df.select(('age+1).as("AGE")).show()

 

+---+
|AGE|
+---+
|  6|
| 11|
| 16|
+---+

 

해당 컬럼명이 AGE로 변경된 것을 확인할 수 있습니다.

다시 되돌릴려면 같은 방법으로 진행하시거나, alias()를 사용하시면 됩니다.

 

[ isin() ]

Column의 값으로 인자가 지정된 값에 포함되어 있는지 확인하는 메서드입니다.

 

scala> val numbers = spark.sparkContext.broadcast(List(1,3,5,7,9))

scala> spark.range(0,10).where($"id".isin(numbers.value:_*)).show()

+---+
| id|
+---+
|  1|
|  3|
|  5|
|  7|
|  9|
+---+

 

해당 값은 1,3,5,7,9를 브로드캐스트 변수를 사용하여 List로 등록하고

$"id"isin(numbers.value:_*) 부분은 numbers List에 포함되어 있는지 확인하는 구문입니다.

 

 

[ when() ]

 

Column 값에 대한 if()~else() 구분을 사용하기 위한 분기처리 연산을 수행하는 메서드입니다.

 

scala> val ds = spark.range(0,5)
ds: org.apache.spark.sql.Dataset[Long] = [id: bigint] 
scala> val col = when(ds("id") % 2 === 0, "even").otherwise("odd").as("type")
col: org.apache.spark.sql.Column = CASE WHEN ((id % 2) = 0) THEN even ELSE odd END AS `type`

 

+---+----+
| id|type|
+---+----+
|  0|even|
|  1| odd|
|  2|even|
|  3| odd|
|  4|even|
+---+----+

 

해당 예제는 0~5으로 구성된 Dataframe을 만들고 짝수와 홀수를 나누는 예제입니다.

when()은 org.apach.spark.sql.functions 객체의 메서드를 사용합니다.

 

 

[ max(), mean() ]

 

해당 컬럼의 가장 큰 값과 평균 값을 구하는 메서드이며, org.apach.spark.sql.functions을 사용합니다.


scala> df.select(max('age), mean('age)).show

 

+--------+--------+
|max(age)|avg(age)|
+--------+--------+
|      15|    10.0|
+--------+--------+

 

[ collect_list(), collect_set() ]

 

트정 Column 값을 모아서 하나의 리스트, Set으로 된 Column을 생성합니다.

collect_set()은 중복이 포함되지 않고 생성이 됩니다.

 

scala> df.show

 

+--------+---+----------+
|    name|age|       job|
+--------+---+----------+
|Person_1|  5|   student|
|Person_2| 10|   student|
|Person_3| 15|programmer|
+--------+---+----------+

 

scala> val df_2 = df.union(df)

scala> df_2.show()

 

+--------+---+----------+
|    name|age|       job|
+--------+---+----------+
|Person_1|  5|   student|
|Person_2| 10|   student|
|Person_3| 15|programmer|
|Person_1|  5|   student|
|Person_2| 10|   student|
|Person_3| 15|programmer|
+--------+---+----------+

 

scala> df_2.select(collect_list("name")).show(false)

+------------------------------------------------------------+
|collect_list(name)                                          |
+------------------------------------------------------------+
|[Person_1, Person_2, Person_3, Person_1, Person_2, Person_3]|
+------------------------------------------------------------+

 

scala> df_2.select(collect_set("name")).show(false)

+------------------------------+
|collect_set(name)             |
+------------------------------+
|[Person_3, Person_2, Person_1]|
+------------------------------+

 

해당 df를 union()을 활영하여 한번 더 병합을 해줘 중복이 포함된 df_2를 생성했습니다.

그리고 collect_list()와 collect_set()을 이용해서 "name"의 값들을 가져온 값을 확인해보면

collect_list()는 중복이 포함된 결과를 가져왔고, collect_set()은 중복이 제거된 결과를 나타냈습니다.

 

[ count(), countDistinct() ]

특정 Column의 데이터 개수를 계산하는 메서드입니다.

count()는 중복을 포함한 값을 사용하고 countDistinct()는 중복을 제거한 값을 가져옵니다.

 

scala> df_2.select(count("name"), countDistinct("name")).show(false)

 

+-----------+--------------------+
|count(name)|count(DISTINCT name)|
+-----------+--------------------+
|6             |3                       |
+-----------+--------------------+

 

[ sum() ]

가장 흔하게 사용되는 집계 함수로 데이터의 합계를 반환합니다.

 

scala> df.select(sum("age")).show(false)

 

+--------+
|sum(age)|
+--------+
|30        |
+--------+

 

 

 

 

오늘은 간단하게 Spark SQL에서 사용되는 기본 연산을 살펴보았습니다.

아직 살펴볼 연산들이 많아서 (1)~ 단계로 나눠서 진행해보도록 하겠습니다.

반응형