안녕하세요. 위기의코딩맨입니다.
오늘은 Spark RDD 집합 연산을 알아보도록 하겠습니다.
![](https://t1.daumcdn.net/keditor/emoticon/friends1/large/002.gif)
[ distinct() ]
RDD의 중복된 요소들을 제외하여 새로운 RDD를 생성하는 메서드입니다.
val rdd = sc.parallelize(List(1,2,3,1,2,3,1,2,3))
val result = rdd.distinct()
print(result.collect.mkString(", "))
결과는 1,2,3이 출력되는 것을 확인할 수 있습니다.
[ cartesian() ]
2개의 RDD 요소를 카테시안곱을 진행하여 결과로 새로운 RDD를 생성하는 메서드입니다.
val rdd = sc.parallelize(List(1,2,3))
val rdd2 = sc.parallelize(List("a","b","c"))
val result = rdd.cartesian(rdd2)
print(result.collect.mkString(", "))
결과는 (1,"a"), (1,"b"), (1,"c"), (2,"a"), (3, "b").... (3,"c") 결과가 출력됩니다.
[ subtract() ]
2개의 RDD가 존재할 때, 사용 rdd에 속하고, 인자로 사용되는 rdd에는 속하지 않는 요소로
구성된 새로운 RDD를 생성하는 메서드입니다.
val rdd = sc.parallelize(List("a","b","c","d"))
val rdd2 = sc.parallelize(List("b","c"))
val result = rdd.subtract(rdd2)
print(result.collect.mkString(", "))
결과는 "a", "d" 가 출력되는 것을 확인할 수 있습니다.
[ union() ]
2개의 RDD가 존재할 때, rdd1과 rdd2에 속하는 구성요소를 새로운 RDD로 생성하는 메서드입니다.
val rdd = sc.parallelize(List("a","b","c"))
val rdd2 = sc.parallelize(List("d","e"))
val result = rdd.union(rdd2)
print(result.collect.mkString(", "))
결과는 "a", "d", "c", "d", "e"가 출력되는 것을 확인할 수 있습니다.
[ intersection() ]
2개의 RDD가 존재할 때, 2개의 RDD에 동시에 구성된 요소를 새로운 RDD로 중복제거된 상태로 생성하는 메서드입니다.
val rdd = sc.parallelize(List("a","b","c"))
val rdd2 = sc.parallelize(List("a","a","b"))
val result = rdd.intersection(rdd2)
print(result.collect.mkString(", "))
해당 결과는 "a","b" 2개의 인자가 출력되는 것을 확인할 수 있습니다.
[ join() ]
해당 메서드는 RDD의 구성이 키와 값으로 구성된 경우에만 사용 가능합니다.
2개의 RDD에서 서로 같은 키를 갖고 있는 요소를 모아 그룹으로 묶어 새로운 RDD로 생성하는 메서드입니다.
val rdd = sc.parallelize(List("a","b","c")).map((_,1))
val rdd2 = sc.parallelize(List("b","c")).map((_,2))
val result = rdd.join(rdd2)
print(result.collect.mkString(", "))
해당 결과는 (b,(1,2)) (c,(1,2))로 출력되는 것을 확인할 수 있습니다.
[ leftOuterJoin(), rightOuterJoin() ]
해당 메서드는 RDD의 구성이 키와 값으로 구성된 경우에만 사용 가능합니다.
Join을 진행할 때, 이름 그대로 왼쪽 조인, 오른쪽 조인을 수행하여 새로운 RDD를 돌려줍니다.
val rdd = sc.parallelize(List("a","b","c")).map((_,1))
val rdd2 = sc.parallelize(List("b","c")).map((_,2))
val result = rdd.leftOuterJoin(rdd2)
val result2 = rdd.rightOuterJoin(rdd2)
print("Letf : " + result.collect.mkString(", "))
print("Right : " + result2.collect.mkString(", "))
결과는
Left : (a,(1,None)), (b,(1,Some(2)), (c,(1,Some(2))
Right : (b, (Some(1),2)), (c,(Some(1),2))
출력되는 것을 확인할 수 있습니다.
[ subtractByKey() ]
해당 메서드는 RDD의 구성이 키와 값으로 구성된 경우에만 사용 가능합니다.
2개의 RDD 중에 rdd1의 요소중에 rdd2에 같은 키가 존재하는 요소를 제외한 나머지로 구성된 새로운 RDD로 돌려줍니다.
val rdd1 = sc.parallelize(List("a","b","c")).map((_,1))
val rdd2 = sc.parallelize(List("b","c")).map((_,2))
val result = rdd1.subtractByKey(rdd2)
print(result.collect.mkString("\n"))
해당 결과는 (a,1)의 값이 출력되는 것을 확인하실 수 있습니다.
오늘은 Spark의 RDD에서 집합에서 주로 사용되는 연산들을 모아서 간단하게 알아보았습니다.
조금씩 알아가는 재미가 있는 Spark..!
![](https://t1.daumcdn.net/keditor/emoticon/friends1/large/001.gif)
'Spark' 카테고리의 다른 글
[We-Co] Spark RDD PIPE 및 파티션 연산 (0) | 2021.08.27 |
---|---|
[We-Co] Spark RDD 집계 연산 (0) | 2021.08.26 |
[We-Co] groupBy(), groupByKey(), cogroup() - Spark (0) | 2021.08.24 |
[We-Co] RDD 생성 (0) | 2021.08.11 |
[We-Co] SparkContext (0) | 2021.08.11 |