We-Co

[We-Co] Spark SQL API 메서드(2) 본문

Spark

[We-Co] Spark SQL API 메서드(2)

위기의코딩맨 2021. 9. 13. 14:55
반응형

안녕하세요. 위기의코딩맨입니다.

오늘은 API 메서드(2)로 API의 연산자를 더 알아보도록 하겠습니다.

 

Spark SQL API(1)

 

[We-Co] Spark SQL API 메서드(1)

안녕하세요. 위기의코딩맨입니다. 비타입 트랜스포메이션 연산은 데이터의 실제 타입을 사용하지 않은 변환 연산을 수행한다는 의미를 갖고있습니다. Spark에서 비타입 트랜스포메이션 연산을

we-co.tistory.com

 

 

[ grouping(), grouping_id() ]

Dataframe이 제공하는 연산 중, 소계를 구해주는 역할을 진행합니다.

group으로 묶어서 해당 결과를 보여주는 역할을 하는데 예제로 설명해드리겠습니다.

 

scala> case class Test(store: String, product: String, amount: Int, price: Int)
defined class Test 
scala> val row1 = Test("S1", "note", 5, 1000)
row1: Test = Test(S1,note,5,1000) 
scala> val row2 = Test("S2", "bag", 1, 2000)
row2: Test = Test(S2,bag,1,2000) 
scala> val row3 = Test("S2", "note",15, 3000)
row3: Test = Test(S2,note,15,3000) 
scala> val row4 = Test("S1", "pen", 10, 500)
row4: Test = Test(S1,pen,10,500) 
scala> val data = List(row1,row2,row3,row4)

+-----+-------+------+-----+
|store|product|amount|price|
+-----+-------+------+-----+
|   S1|   note|     5| 1000|
|   S2|    bag|     1| 2000|
|   S2|   note|    15| 3000|
|   S1|    pen|    10|  500|
+-----+-------+------+-----+

scala> df_T.cube('store, 'product).agg(sum("amount"),grouping("store")).show

+-----+-------+-----------+---------------+
|store|product|sum(amount)|grouping(store)|
+-----+-------+-----------+---------------+
| null |    bag |          1 |              1|
|   S1 |   note |          5 |              0|
| null |    pen |         10 |              1|
| null |   note |         20 |              1|
|   S2 |   null  |         16 |              0|
| null |   null  |          31 |             1|
|   S2 |    bag |          1 |              0|
|   S1 |   null  |         15 |              0|
|   S2 |   note |         15 |              0|
|   S1 |    pen |         10 |              0|
+-----+-------+-----------+---------------+

 

먼저 grouping을 보면 store를 기준으로 잡고 amount의 값의 합계를 구한 예제입니다.

store에 null이 들어간거면 S1, S2 포함한 product 기준의 합계를 나타내는 것이며

store에 null이 아닌 S1, S2가 들어있고 product에 null이 들어가있으면 S1,S2의 총합계를 나타냅니다.

둘 다 null이 아닌 경우는 2개의 기준의 합계를 나타냅니다.

 

 

scala> df_T.cube('store, 'product).agg(sum("amount"), grouping_id("store","product")).show

+-----+-------+-----------+---------------------------+
|store|product|sum(amount)|grouping_id(store, product)|
+-----+-------+-----------+---------------------------+
| null |    bag |          1 |                          2|
|   S1 |   note |          5 |                          0|
| null |    pen |         10 |                          2|
| null |   note |         20 |                          2|
|   S2 |   null  |         16 |                          1|
| null |   null  |         31 |                          3|
|   S2 |    bag |          1 |                          0|
|   S1 |   null  |         15 |                          1|
|   S2 |   note |         15 |                          0|
|   S1 |    pen |         10 |                          0|
+-----+-------+-----------+---------------------------+

 

 

grouping_id는 grouping과 마찬가지로 기준을 잡고 합계를 구하는데

null,null 총합계는 3null, product는    2store, null은      1store,product는  0각각의 Id를 부여해서 기준을 보여주도록 하는 기능입니다.아래는 해당 웹UI에서 진행된 방금 예제를 나타내고 있습니다.

웹 UI

[ array_contains(), size(), sort_array() ]

array_contains()는 배열에 특정 값이 존재하는지 여부를 판단할 수 있습니다.

size()는 배열의 크기를 확인하며, 

sort_array()는 배열을 정렬하는 용도로 사용됩니다.

 

scala> val df = Seq(Array(9,1,5,3,9)).toDF("array")
21/09/13 14:12:15 INFO CodeGenerator: Code generated in 13.7932 ms
df: org.apache.spark.sql.DataFrame = [array: array<int>]

scala> df.select('array, array_contains('array,2), size('array)).show(false)
21/09/13 14:12:52 INFO CodeGenerator: Code generated in 8.7278 ms
21/09/13 14:12:52 INFO CodeGenerator: Code generated in 7.3783 ms
+---------------+------------------------+-----------+
|array          |   array_contains(array, 2)|  size(array)|
+---------------+------------------------+-----------+
|[9, 1, 5, 3, 9]   |     false                   |   5          |
+---------------+------------------------+-----------+.

 

array_contains()를 사용하여 2가 존재하는지 확인해보고, Size()를 사용하여 크기를 확인하는 예제입니다.

 

scala> df.select('array, sort_array('array)).show(false)
+---------------+-----------------------+
|array              |sort_array(array, true)|
+---------------+-----------------------+
|[9, 1, 5, 3, 9]   |    [1, 3, 5, 9, 9]        |
+---------------+-----------------------+

 

sort_array()를 사용하여 해당 array를 정렬해 보았습니다.

 

[ explode(), posexplode() ]

expload()는 배열 Column에 포함된 여러 개 요소를 행으로 변환해줍니다.

posexplode()동일한 동작을하지만 새로운 행을 만들 때 위치정보를 함께 포함해서 제공됩니다.

 

scala> df.show
+---------------+
|          array|
+---------------+
|[9, 1, 5, 3, 9]|
+---------------+

scala> df.select(explode('array)).show(false)

+---+
|col|
+---+
|9  |
|1  |
|5  |
|3  |
|9  |
+---+

scala> df.select(posexplode('array)).show(false)

 

+---+---+
|pos|col|
+---+---+
|0  |9  |
|1  |1  |
|2  |5  |
|3  |3  |
|4  |9  |
+---+---+

 

[ round(), sqrt() ]

round()는 반올림, sqrt()는 제곱근 값을 구하는 연산 메서드입니다.

 

scala> import org.apache.spark.sql.functions._

scala> import spark.implicits._

scala> Seq(1.513, 3.234, 4.42).toDF("value").select(round('value,1)).show
21/09/13 14:27:51 INFO CodeGenerator: Code generated in 4.9761 ms
+---------------+
|round(value, 1)|
+---------------+
|            1.5   |
|            3.2   |
|            4.4   |
+---------------+

 

소수점 1자리까지 나타내도록 설정하여 출력한 결과 입니다.

 

scala> Seq(25,9,10).toDF("value").select(sqrt('value)).show
+------------------+
|       SQRT(value)|
+------------------+
|               5.0    |
|               3.0    |
|3.1622776601683795|
+------------------+

 

해당 25, 9, 10의 제곱근을 찾아낸 출력 값입니다.

 

scala> df.show
+---------------+
|          array   |  
+---------------+
|[9, 1, 5, 3, 9]   |
+---------------+

 

[ desc(), asc() ]

특정 칼럼에 대한 정렬 방법을 지정하는 메서드입니다.

 

scala> df_T.sort(desc("price")).show

+-----+-------+------+-----+
|store|product|amount|price|
+-----+-------+------+-----+
|   S2|   note|    15    | 3000|
|   S2|    bag|     1|     2000|
|   S1|   note|     5|     1000|
|   S1|    pen|    10|      500|
+-----+-------+------+-----+

 

scala> df_T.sort(asc("price")).show

+-----+-------+------+-----+
|store|product|amount|price|
+-----+-------+------+-----+
|   S1|    pen|    10|      500|
|   S1|   note|     5|     1000|
|   S2|    bag|     1|     2000|
|   S2|   note|    15|    3000|
+-----+-------+------+-----+

 

내림차순, 오름차순 순으로 정렬한 결과입니다.

특정 Column에 null이 존재한다면 desc_null_xxx(), asc_null_xxx() 의 함수를 사용하시면 됩니다.

xxx는 first, last 중 선택하시면 됩니다.

 

[ split(), length() ]

문자열 처리에 사용되는 대표적은 메서드입니다.

split()은 기준으로 문자열을 자르는 역할을 하고

length()는 해당 문자의 길이를 나타냅니다.

 

scala> Seq(("Hi My Name is WeCo")).toDF("value").select('value, split('value," "), length('value)).show(false)
21/09/13 14:43:01 INFO CodeGenerator: Code generated in 12.5706 ms
21/09/13 14:43:01 INFO CodeGenerator: Code generated in 8.0027 ms
21/09/13 14:43:01 INFO CodeGenerator: Code generated in 5.8483 ms
+-----------------------+----------------------------+-------------+
|value                      |split(value,  , -1)             |  length(value)|
+-----------------------+----------------------------+-------------+
|Hi My Name is WeCo|   [Hi, My, Name, is, WeCo]|   18           |
+-----------------------+------------------------------+-------------+

 

"Hi My Name is WeCo" 문자열을 " "기준으로 자르고, 문자열의 크기를 출력해보았습니다.

 

 

[ udf() ]

funcions에서 데이터 처리에 필요한 함수를 제공하지만, 직접 함수를 정의해서 사용할 때 사용하는 함수입니다.

 

 

scala> import spark.implicits._
import spark.implicits._
21/09/13 14:48:19 INFO BlockManagerInfo: Removed broadcast_58_piece0 on 192.168.25.49:1782 in memory (size: 3.3 KiB, free: 366.3 MiB)
21/09/13 14:48:19 INFO BlockManagerInfo: Removed broadcast_57_piece0 on 192.168.25.49:1782 in memory (size: 3.3 KiB, free: 366.3 MiB)

scala> val fn1 = udf((S: String) => S match{
     | case "S1" => true
     | case _ => false
     | })
fn1: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$5104/648507226@71afd79b,BooleanType,List(Some(class[value[0]: string])),Some(class[value[0]: boolean]),None,false,true)
 
scala> df_T.select('store, 'product, 'amount, 'price, fn1('store)).show 
+-----+-------+------+-----+----------+
|store|product|amount|price|UDF(store)|
+-----+-------+------+-----+----------+
|   S1|   note|     5| 1000|      true|
|   S2|    bag|     1| 2000|     false|
|   S2|   note|    15| 3000|     false|
|   S1|    pen|    10|  500|      true|
+-----+-------+------+-----+----------+

 

예제를 살펴보면 fn1의 함수를 정의하고 S의 인자를 받았을 때 그 인자가 "S1"이면 true를 반환하고 그 외는 false를 반환하도록 정의했습니다.

결과는 S1는 true를 반환 받은것을 확인할 수 있습니다.

 

 

오늘은 API 메서드에 대해 간단하게 알아봤는데

아직 조금 더 남아서 API 메서드(3)으로 찾아뵙도록 하겠습니다.

반응형

'Spark' 카테고리의 다른 글

[We-Co] Spark to_json(), from_json()  (0) 2021.09.15
[We-Co] Spark SQL API 메서드(3)  (0) 2021.09.14
[We-Co] Spark SQL API 메서드(1)  (0) 2021.09.12
[We-Co] Spark Dataset의 기본연산  (0) 2021.09.09
[We-Co] Spark Dataset 액션연산  (0) 2021.09.08