We-Co

[We-Co] Spark SQL API 메서드(3) 본문

Spark

[We-Co] Spark SQL API 메서드(3)

위기의코딩맨 2021. 9. 14. 11:33
반응형

안녕하세요. 위기의 코딩맨입니다.

오늘은 Spark SQL API 메서드(3) 번째 작성을 진행해보겠습니다.

 

Spark SQL API 메서드(2)

 

[We-Co] Spark SQL API 메서드(2)

안녕하세요. 위기의코딩맨입니다. 오늘은 API 메서드(2)로 API의 연산자를 더 알아보도록 하겠습니다. Spark SQL API(1) [We-Co] Spark SQL API 메서드(1) 안녕하세요. 위기의코딩맨입니다. 비타입 트랜스포메

we-co.tistory.com

 

 

[ intersect(), except() ]

2개의 Dataframe에서 모두 속하는 인자들만 구성된 Dataframe을 생성하는 메서드입니다.

 

scala> val a = spark.range(1,5)
a: org.apache.spark.sql.Dataset[Long] = [id: bigint] 
scala> val b = spark.range(2,6)
b: org.apache.spark.sql.Dataset[Long] = [id: bigint]

scala> val c = a.intersect(b)
c: org.apache.spark.sql.Dataset[Long] = [id: bigint]

 

a 는 1,2,3,4

b 는 2,3,4,5

c 는  2,3,4 의 결과 값이 출력되는 것을 확인할 수 있습니다.

 

그렇다면 하나의 Dataframe에서 나머지 다른 하나에 속하지 않는 원소만 추출할 수 있는 방법은 무엇일까요?

except()를 사용하시면 됩니다.

 

scala> val d = a.except(b)
d: org.apache.spark.sql.Dataset[Long] = [id: bigint]

+---+
| id  |
+---+
|  1  |
+---+

 

a기준으로 사용한 내용이므로 1이 출력되는 것을 확인할 수 있습니다.

 

[ orderBy() ]

SQL에서 Order by와 같은 동작을 수행합니다.

정렬에 사용될 칼럼 정보를 순서대로 나타내도록 합니다..

 

scala> val df = List((3,"z"),(10,"a"),(5,"c")).toDF("idx","name")
21/09/14 10:45:53 INFO CodeGenerator: Code generated in 16.8947 ms
df: org.apache.spark.sql.DataFrame = [idx: int, name: string]

scala> df.orderBy("name","idx").show

+---+----+
|idx|name|
+---+----+
| 10|   a |
|  5|   c  |
|  3|   z  |
+---+----+

scala> df.orderBy("idx","name").show

+---+----+
|idx|name|
+---+----+
|  3|   z  |
|  5|   c  |
| 10|   a  |
+---+----+

 

name 기준 정렬, idx 기준 정렬로 나눈 결과값입니다.

 

 

[ rollup() ]

 

scala> df_T.rollup("store","product").agg("price" -> "sum").sort(asc_nulls_last("store"), asc_nulls_last("product")).show

 

+-----+-------+----------+
|store|product|sum(price)|
+-----+-------+----------+
|   S1|   note|      1000|
|   S1|    pen|       500|
|   S1|   null |      1500|
|   S2|    bag|      2000|
|   S2|   note|      3000|
|   S2|   null |      5000|
| null|   null |      6500|
+-----+-------+----------+

 

결과를 보시면, rollup은 store, product 조합별 합계와 더불어서

store에 대한 소계와 전체 총 계를 추가로 계산을 진행합니다.

cube()와 많이 비슷하지만 cube()는 rollup()의 결과에 두번째 열인 store에 대한 소계까지 포함해서 계산해주는 차이가 있습니다.

 

 

[ withColumn(), withColumnRenamed() ]

Dataframe에서 새로운 Column을 추가하거나 기존의 Column의 이름을 변경하는 메서드입니다.


scala> df_T.show
+-----+-------+------+-----+
|store|product|amount|price|
+-----+-------+------+-----+
|   S1|   note|     5| 1000|
|   S2|    bag|     1| 2000|
|   S2|   note|    15| 3000|
|   S1|    pen|    10|  500|
+-----+-------+------+-----+


scala> val df_T_2 = df_T.withColumnRenamed("price","newprice")
df_T_2: org.apache.spark.sql.DataFrame = [store: string, product: string ... 2 more fields]

scala> df_T_2.show
+-----+-------+------+--------+
|store|product|amount|newprice|
+-----+-------+------+--------+
|   S1|   note|     5|    1000|
|   S2|    bag|     1|    2000|
|   S2|   note|    15|    3000|
|   S1|    pen|    10|     500|
+-----+-------+------+--------+

 

df_T의 "price"를 "newprice"로 Column명을 변경한 결과입니다.

 

 

 

 

Spark SQL API 메서드(3)의 시간도 끝이났습니다.

응용할 수 있도록 계속 공부해야겠습니다.

반응형

'Spark' 카테고리의 다른 글

[We-Co] Spark Streaming  (0) 2021.09.16
[We-Co] Spark to_json(), from_json()  (0) 2021.09.15
[We-Co] Spark SQL API 메서드(2)  (0) 2021.09.13
[We-Co] Spark SQL API 메서드(1)  (0) 2021.09.12
[We-Co] Spark Dataset의 기본연산  (0) 2021.09.09