안녕하세요. 위기의코딩맨 입니다.
오늘은 Spark의 groupBy와 groupByKey에 대해 알아보도록 하겠습니다.
[ groupBy ]
RDD의 값들을 설정한 기준에 따라서 여러개의 그룹으로 나누고 해당하는 그룹으로 구성된 새로운 RDD를 생성하는 것이 groupBy()의 역할입니다.
키와 요소들의 시퀀스로 구성되어 있으며, 밑의 예제로 한번 알아보도록 하겠습니다.
scala> val rdd = sc.parallelize(1 to 20)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> val result = rdd.groupBy{ case i: Int if(i%2==0) => "even" case _ => "odd"}
result: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[2] at groupBy at <console>:25
scala> result.collect.foreach{ v=> print(s"${v._1}, [${v._2.mkString(",")}]")}
결과 even, [2,4,6,8,10,12,14,16,18,20]odd, [1,3,5,7,9,11,13,15,17,19]
rdd 를 만들어서 그 안에 1~20의 값을 넣어주도록 합니다.
그리고 해당 값의 인자가 짝수면 even, 홀수면 odd로 그룹을 나누도록 groupBy()를 사용하여 case를 설정합니다.
result에 그 값을 넣어주어 마지막에 인자를 "," 구분주어 출력되도록 설정합니다.
결과는 설정한 case에 맞도록 출력되는 것을 확인할 수 있습니다.
![](https://t1.daumcdn.net/keditor/emoticon/friends1/large/006.gif)
[ groupByKey ]
groupBy()는 요소들을 키를 생성하는 작업과 그룹으로 분류하는 작업을 동시에 수행한다고 합니다.
groupByKey()는 이미 RDD의 구성이 키와 값으로 이루어진 경우에 사용 가능한 메서드입니다.
scala> val rdd = sc.parallelize(List("a","b","c","b","c")).map((_, 1))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[4] at map at <console>:24
scala> val result = rdd.groupByKey
result: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[5] at groupByKey at <console>:25
scala> result.collect.foreach{ v=> print(s"${v._1}, [${v._2.mkString(",")}]")}
결과 a, [1] b,[1,1] c,[1,1]
rdd에 a,b,c,b,c의 리스트 값을 넣어주도록 합니다. map을 사용하여 인자를 1 값으로 채워주고,
해당 rdd를 groupByKey로 묶어 result로 설정해주도록 합니다.
출력하게되면 기존의 키 값을 통해 인자 값들이 여러 개 였지만, 결과는 하나로 묶이는 것을 확인 할 수 있습니다.
[ cogroup ]
RDD의 구성요소들이 키와 값으로 구성된 경우에만 사용이 가능한 메서드입니다.
여러 RDD에서 같은 키를 갖는 값을 찾아서 키와 그 키에 속하는 요소의 시퀀스로 구성된 튜플을 만들고,
그 튜플들로 새로운 RDD를 생성합니다.
scala> val rdd1 = sc.parallelize(List(("k1", "v1"), ("k2", "v2"), ("k1","v3")))
rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[6] at parallelize at <console>:24
scala> val rdd2 = sc.parallelize(List(("k1", "v4")))
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[7] at parallelize at <console>:24
scala> val result = rdd1.cogroup(rdd2)
result: org.apache.spark.rdd.RDD[(String, (Iterable[String], Iterable[String]))] = MapPartitionsRDD[10] at cogroup at <console>:27
scala> result.collect.foreach{ case (k, (v_1, v_2)) => { print(s"($k, [${v_1.mkString(",")}], [${v_2.mkString(",")}])")}}
결과 (k1, [v1,v3], [v4]) (k2, [v2], [])
여기서 중요한 점은 rdd1과 rdd2에 k1의 같은 키값이 존재한다는 것입니다.
그래서 출력 결과를 확인해보면 k1 키는 rdd1에 [v1,v3]이 묶이고
rdd2에 k1의 v4가 묵여서 출력이 되는 것을 확인할 수 있습니다.
다음 k2는 rdd1에만 키 값이 존재하고 rdd2에는 키가 존재하지 않아 [v2], [] 값이 출력되는 것을 확인할 수 있습니다.
오늘은 Spark의 Group으로 묶어내는 방법 세가지를 간단하게 알아보았습니다.
![](https://t1.daumcdn.net/keditor/emoticon/friends1/large/005.gif)
'Spark' 카테고리의 다른 글
[We-Co] Spark RDD 집계 연산 (0) | 2021.08.26 |
---|---|
[We-Co] Spark RDD의 집합 연산 (0) | 2021.08.26 |
[We-Co] RDD 생성 (0) | 2021.08.11 |
[We-Co] SparkContext (0) | 2021.08.11 |
[We-Co] Word Count - Spark 예제 (2) | 2021.08.06 |