We-Co

[We-Co] groupBy(), groupByKey(), cogroup() - Spark 본문

Spark

[We-Co] groupBy(), groupByKey(), cogroup() - Spark

위기의코딩맨 2021. 8. 24. 11:45
반응형

안녕하세요. 위기의코딩맨 입니다.

오늘은 Spark의 groupBy와 groupByKey에 대해 알아보도록 하겠습니다.

 

[ groupBy ]

RDD의 값들을 설정한 기준에 따라서 여러개의 그룹으로 나누고 해당하는 그룹으로 구성된 새로운 RDD를 생성하는 것이 groupBy()의 역할입니다.

키와 요소들의 시퀀스로 구성되어 있으며, 밑의 예제로 한번 알아보도록 하겠습니다.

 

scala> val rdd = sc.parallelize(1 to 20) 

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> val result = rdd.groupBy{ case i: Int if(i%2==0) => "even" case _ => "odd"} 

result: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[2] at groupBy at <console>:25

 

scala> result.collect.foreach{ v=> print(s"${v._1}, [${v._2.mkString(",")}]")}

 

결과 even, [2,4,6,8,10,12,14,16,18,20]odd, [1,3,5,7,9,11,13,15,17,19]

 

rdd 를 만들어서 그 안에 1~20의 값을 넣어주도록 합니다.

그리고 해당 값의 인자가 짝수면 even, 홀수면 odd로 그룹을 나누도록 groupBy()를 사용하여 case를 설정합니다.

result에 그 값을 넣어주어 마지막에 인자를 "," 구분주어 출력되도록 설정합니다.

결과는 설정한 case에 맞도록 출력되는 것을 확인할 수 있습니다.

 

[ groupByKey ]

groupBy()는 요소들을 키를 생성하는 작업과 그룹으로 분류하는 작업을 동시에 수행한다고 합니다.

groupByKey()는 이미 RDD의 구성이 키와 값으로 이루어진 경우에 사용 가능한 메서드입니다.

 

scala> val rdd = sc.parallelize(List("a","b","c","b","c")).map((_, 1))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[4] at map at <console>:24

 

scala> val result = rdd.groupByKey
result: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[5] at groupByKey at <console>:25

 

scala> result.collect.foreach{ v=> print(s"${v._1}, [${v._2.mkString(",")}]")}

 

결과 a, [1] b,[1,1] c,[1,1]

 

rdd에 a,b,c,b,c의 리스트 값을 넣어주도록 합니다. map을 사용하여 인자를 1 값으로 채워주고,

해당 rdd를 groupByKey로 묶어 result로 설정해주도록 합니다.

출력하게되면 기존의 키 값을 통해 인자 값들이 여러 개 였지만,  결과는 하나로 묶이는 것을 확인 할 수 있습니다.

 

 

[ cogroup ]

RDD의 구성요소들이 키와 값으로 구성된 경우에만 사용이 가능한 메서드입니다.

여러 RDD에서 같은 키를 갖는 값을 찾아서 키와 그 키에 속하는 요소의 시퀀스로 구성된 튜플을 만들고,

그 튜플들로 새로운 RDD를 생성합니다.

 

 

scala> val rdd1 = sc.parallelize(List(("k1", "v1"), ("k2", "v2"), ("k1","v3")))
rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[6] at parallelize at <console>:24

 

scala> val rdd2 = sc.parallelize(List(("k1", "v4")))
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[7] at parallelize at <console>:24

 

scala> val result = rdd1.cogroup(rdd2)
result: org.apache.spark.rdd.RDD[(String, (Iterable[String], Iterable[String]))] = MapPartitionsRDD[10] at cogroup at <console>:27

 

scala> result.collect.foreach{ case (k, (v_1, v_2)) => { print(s"($k, [${v_1.mkString(",")}], [${v_2.mkString(",")}])")}}

 

결과 (k1, [v1,v3], [v4]) (k2, [v2], [])

 

여기서 중요한 점은 rdd1과 rdd2에 k1의 같은 키값이 존재한다는 것입니다.

그래서 출력 결과를 확인해보면 k1 키는 rdd1에 [v1,v3]이 묶이고

rdd2에 k1의 v4가 묵여서 출력이 되는 것을 확인할 수 있습니다.

다음 k2는 rdd1에만 키 값이 존재하고 rdd2에는 키가 존재하지 않아 [v2], [] 값이 출력되는 것을 확인할 수 있습니다.

 

 

 

 

오늘은 Spark의 Group으로 묶어내는 방법 세가지를 간단하게 알아보았습니다.

반응형

'Spark' 카테고리의 다른 글

[We-Co] Spark RDD 집계 연산  (0) 2021.08.26
[We-Co] Spark RDD의 집합 연산  (0) 2021.08.26
[We-Co] RDD 생성  (0) 2021.08.11
[We-Co] SparkContext  (0) 2021.08.11
[We-Co] Word Count - Spark 예제  (2) 2021.08.06