안녕하세요. 위기의코딩맨입니다.
비타입 트랜스포메이션 연산은 데이터의 실제 타입을 사용하지 않은 변환 연산을 수행한다는 의미를 갖고있습니다.
Spark에서 비타입 트랜스포메이션 연산을 진행할때는
Row, Column, functions의 세가지 주제를 잘 이해해야합니다.
org.apache.spark.sql.Row
org.apache.spark.sql.Column
org.apache.spark.sql.functions
Spark에서 제공하는 API 3가지를 의미합니다.
중요한 점은 연산을 진행할때, 해당 데이터의 타입과 해당 API 타입과 다르면 오류를 반환하기 때문에
유의해야합니다.
유용하게 사용되는 API 몇가지 메서드들을 살펴보겠습니다.
[ ===, !== ]
2개의 컬럼 값들이 같은지 판단하는 메서드입니다.
기본적으로 "==", "!="를 사용하지만 "===", "!=="로 표시한다는 점!
[ alias(), as() ]
SQL문에서 컬럼에 별칭을 부여할때 "as"를 사용하는 것처럼 같은 용도로 사용됩니다.
scala> df.select('age+1).show()
+---------+
|(age + 1)|
+---------+
| 6|
| 11|
| 16|
+---------+
해당 SQL문은 age+1를 진행한 결과인데,
여기에 컬럼명을 변경해보겠습니다.
scala> df.select(('age+1).as("AGE")).show()
+---+
|AGE|
+---+
| 6|
| 11|
| 16|
+---+
해당 컬럼명이 AGE로 변경된 것을 확인할 수 있습니다.
다시 되돌릴려면 같은 방법으로 진행하시거나, alias()를 사용하시면 됩니다.
[ isin() ]
Column의 값으로 인자가 지정된 값에 포함되어 있는지 확인하는 메서드입니다.
scala> val numbers = spark.sparkContext.broadcast(List(1,3,5,7,9))
scala> spark.range(0,10).where($"id".isin(numbers.value:_*)).show()
+---+
| id|
+---+
| 1|
| 3|
| 5|
| 7|
| 9|
+---+
해당 값은 1,3,5,7,9를 브로드캐스트 변수를 사용하여 List로 등록하고
$"id"isin(numbers.value:_*) 부분은 numbers List에 포함되어 있는지 확인하는 구문입니다.
[ when() ]
Column 값에 대한 if()~else() 구분을 사용하기 위한 분기처리 연산을 수행하는 메서드입니다.
scala> val ds = spark.range(0,5)
ds: org.apache.spark.sql.Dataset[Long] = [id: bigint]
scala> val col = when(ds("id") % 2 === 0, "even").otherwise("odd").as("type")
col: org.apache.spark.sql.Column = CASE WHEN ((id % 2) = 0) THEN even ELSE odd END AS `type`
+---+----+
| id|type|
+---+----+
| 0|even|
| 1| odd|
| 2|even|
| 3| odd|
| 4|even|
+---+----+
해당 예제는 0~5으로 구성된 Dataframe을 만들고 짝수와 홀수를 나누는 예제입니다.
when()은 org.apach.spark.sql.functions 객체의 메서드를 사용합니다.
[ max(), mean() ]
해당 컬럼의 가장 큰 값과 평균 값을 구하는 메서드이며, org.apach.spark.sql.functions을 사용합니다.
scala> df.select(max('age), mean('age)).show
+--------+--------+
|max(age)|avg(age)|
+--------+--------+
| 15| 10.0|
+--------+--------+
[ collect_list(), collect_set() ]
트정 Column 값을 모아서 하나의 리스트, Set으로 된 Column을 생성합니다.
collect_set()은 중복이 포함되지 않고 생성이 됩니다.
scala> df.show
+--------+---+----------+
| name|age| job|
+--------+---+----------+
|Person_1| 5| student|
|Person_2| 10| student|
|Person_3| 15|programmer|
+--------+---+----------+
scala> val df_2 = df.union(df)
scala> df_2.show()
+--------+---+----------+
| name|age| job|
+--------+---+----------+
|Person_1| 5| student|
|Person_2| 10| student|
|Person_3| 15|programmer|
|Person_1| 5| student|
|Person_2| 10| student|
|Person_3| 15|programmer|
+--------+---+----------+
scala> df_2.select(collect_list("name")).show(false)
+------------------------------------------------------------+
|collect_list(name) |
+------------------------------------------------------------+
|[Person_1, Person_2, Person_3, Person_1, Person_2, Person_3]|
+------------------------------------------------------------+
scala> df_2.select(collect_set("name")).show(false)
+------------------------------+
|collect_set(name) |
+------------------------------+
|[Person_3, Person_2, Person_1]|
+------------------------------+
해당 df를 union()을 활영하여 한번 더 병합을 해줘 중복이 포함된 df_2를 생성했습니다.
그리고 collect_list()와 collect_set()을 이용해서 "name"의 값들을 가져온 값을 확인해보면
collect_list()는 중복이 포함된 결과를 가져왔고, collect_set()은 중복이 제거된 결과를 나타냈습니다.
[ count(), countDistinct() ]
특정 Column의 데이터 개수를 계산하는 메서드입니다.
count()는 중복을 포함한 값을 사용하고 countDistinct()는 중복을 제거한 값을 가져옵니다.
scala> df_2.select(count("name"), countDistinct("name")).show(false)
+-----------+--------------------+
|count(name)|count(DISTINCT name)|
+-----------+--------------------+
|6 |3 |
+-----------+--------------------+
[ sum() ]
가장 흔하게 사용되는 집계 함수로 데이터의 합계를 반환합니다.
scala> df.select(sum("age")).show(false)
+--------+
|sum(age)|
+--------+
|30 |
+--------+
오늘은 간단하게 Spark SQL에서 사용되는 기본 연산을 살펴보았습니다.
아직 살펴볼 연산들이 많아서 (1)~ 단계로 나눠서 진행해보도록 하겠습니다.
'Spark' 카테고리의 다른 글
[We-Co] Spark SQL API 메서드(3) (0) | 2021.09.14 |
---|---|
[We-Co] Spark SQL API 메서드(2) (0) | 2021.09.13 |
[We-Co] Spark Dataset의 기본연산 (0) | 2021.09.09 |
[We-Co] Spark Dataset 액션연산 (0) | 2021.09.08 |
[We-Co] Spark Dataset,Dataframe을 이용한 단어 수 세기 예제 (0) | 2021.09.06 |