안녕하세요. 위기의 코딩맨입니다.
오늘은 Spark SQL API 메서드(3) 번째 작성을 진행해보겠습니다.
[ intersect(), except() ]
2개의 Dataframe에서 모두 속하는 인자들만 구성된 Dataframe을 생성하는 메서드입니다.
scala> val a = spark.range(1,5)
a: org.apache.spark.sql.Dataset[Long] = [id: bigint]
scala> val b = spark.range(2,6)
b: org.apache.spark.sql.Dataset[Long] = [id: bigint]
scala> val c = a.intersect(b)
c: org.apache.spark.sql.Dataset[Long] = [id: bigint]
a 는 1,2,3,4
b 는 2,3,4,5
c 는 2,3,4 의 결과 값이 출력되는 것을 확인할 수 있습니다.
그렇다면 하나의 Dataframe에서 나머지 다른 하나에 속하지 않는 원소만 추출할 수 있는 방법은 무엇일까요?
except()를 사용하시면 됩니다.
scala> val d = a.except(b)
d: org.apache.spark.sql.Dataset[Long] = [id: bigint]
+---+
| id |
+---+
| 1 |
+---+
a기준으로 사용한 내용이므로 1이 출력되는 것을 확인할 수 있습니다.
[ orderBy() ]
SQL에서 Order by와 같은 동작을 수행합니다.
정렬에 사용될 칼럼 정보를 순서대로 나타내도록 합니다..
scala> val df = List((3,"z"),(10,"a"),(5,"c")).toDF("idx","name")
21/09/14 10:45:53 INFO CodeGenerator: Code generated in 16.8947 ms
df: org.apache.spark.sql.DataFrame = [idx: int, name: string]
scala> df.orderBy("name","idx").show
+---+----+
|idx|name|
+---+----+
| 10| a |
| 5| c |
| 3| z |
+---+----+
scala> df.orderBy("idx","name").show
+---+----+
|idx|name|
+---+----+
| 3| z |
| 5| c |
| 10| a |
+---+----+
name 기준 정렬, idx 기준 정렬로 나눈 결과값입니다.
[ rollup() ]
scala> df_T.rollup("store","product").agg("price" -> "sum").sort(asc_nulls_last("store"), asc_nulls_last("product")).show
+-----+-------+----------+
|store|product|sum(price)|
+-----+-------+----------+
| S1| note| 1000|
| S1| pen| 500|
| S1| null | 1500|
| S2| bag| 2000|
| S2| note| 3000|
| S2| null | 5000|
| null| null | 6500|
+-----+-------+----------+
결과를 보시면, rollup은 store, product 조합별 합계와 더불어서
store에 대한 소계와 전체 총 계를 추가로 계산을 진행합니다.
cube()와 많이 비슷하지만 cube()는 rollup()의 결과에 두번째 열인 store에 대한 소계까지 포함해서 계산해주는 차이가 있습니다.
[ withColumn(), withColumnRenamed() ]
Dataframe에서 새로운 Column을 추가하거나 기존의 Column의 이름을 변경하는 메서드입니다.
scala> df_T.show
+-----+-------+------+-----+
|store|product|amount|price|
+-----+-------+------+-----+
| S1| note| 5| 1000|
| S2| bag| 1| 2000|
| S2| note| 15| 3000|
| S1| pen| 10| 500|
+-----+-------+------+-----+
scala> val df_T_2 = df_T.withColumnRenamed("price","newprice")
df_T_2: org.apache.spark.sql.DataFrame = [store: string, product: string ... 2 more fields]
scala> df_T_2.show
+-----+-------+------+--------+
|store|product|amount|newprice|
+-----+-------+------+--------+
| S1| note| 5| 1000|
| S2| bag| 1| 2000|
| S2| note| 15| 3000|
| S1| pen| 10| 500|
+-----+-------+------+--------+
df_T의 "price"를 "newprice"로 Column명을 변경한 결과입니다.
Spark SQL API 메서드(3)의 시간도 끝이났습니다.
응용할 수 있도록 계속 공부해야겠습니다.
'Spark' 카테고리의 다른 글
[We-Co] Spark Streaming (0) | 2021.09.16 |
---|---|
[We-Co] Spark to_json(), from_json() (0) | 2021.09.15 |
[We-Co] Spark SQL API 메서드(2) (0) | 2021.09.13 |
[We-Co] Spark SQL API 메서드(1) (0) | 2021.09.12 |
[We-Co] Spark Dataset의 기본연산 (0) | 2021.09.09 |