We-Co

[We-Co] Spark RDD의 집합 연산 본문

Spark

[We-Co] Spark RDD의 집합 연산

위기의코딩맨 2021. 8. 26. 21:19
반응형

안녕하세요. 위기의코딩맨입니다.

오늘은 Spark RDD 집합 연산을 알아보도록 하겠습니다.

 

[ distinct() ]

RDD의 중복된 요소들을 제외하여 새로운 RDD를 생성하는 메서드입니다.

 

val rdd = sc.parallelize(List(1,2,3,1,2,3,1,2,3))

val result = rdd.distinct()

print(result.collect.mkString(", "))

 

결과는 1,2,3이 출력되는 것을 확인할 수 있습니다.

 

 

[ cartesian() ]

2개의 RDD 요소를 카테시안곱을 진행하여 결과로 새로운 RDD를 생성하는 메서드입니다.

 

val rdd = sc.parallelize(List(1,2,3))

val rdd2 = sc.parallelize(List("a","b","c"))

val result = rdd.cartesian(rdd2)

print(result.collect.mkString(", "))

 

결과는 (1,"a"), (1,"b"), (1,"c"), (2,"a"), (3, "b").... (3,"c") 결과가 출력됩니다.

 

[ subtract() ]

2개의 RDD가 존재할 때,  사용 rdd에 속하고, 인자로 사용되는 rdd에는 속하지 않는 요소로

구성된 새로운 RDD를 생성하는 메서드입니다.

 

 

val rdd = sc.parallelize(List("a","b","c","d"))

val rdd2 = sc.parallelize(List("b","c"))

val result = rdd.subtract(rdd2)

print(result.collect.mkString(", "))

 

결과는 "a", "d" 가 출력되는 것을 확인할 수 있습니다.

 

[ union() ] 

2개의 RDD가 존재할 때, rdd1과 rdd2에 속하는 구성요소를 새로운 RDD로 생성하는 메서드입니다.

 

val rdd = sc.parallelize(List("a","b","c"))

val rdd2 = sc.parallelize(List("d","e"))

val result = rdd.union(rdd2)

print(result.collect.mkString(", "))

 

결과는 "a", "d", "c", "d", "e"가 출력되는 것을 확인할 수 있습니다.

 

 

[ intersection() ]

2개의 RDD가 존재할 때, 2개의 RDD에 동시에 구성된 요소를 새로운 RDD로 중복제거된 상태로 생성하는 메서드입니다.

 

val rdd = sc.parallelize(List("a","b","c"))

val rdd2 = sc.parallelize(List("a","a","b"))

val result = rdd.intersection(rdd2)

print(result.collect.mkString(", "))

 

해당 결과는 "a","b" 2개의 인자가 출력되는 것을 확인할 수 있습니다.

 

[ join() ]

해당 메서드는 RDD의 구성이 키와 값으로 구성된 경우에만 사용 가능합니다.

2개의 RDD에서 서로 같은 키를 갖고 있는 요소를 모아 그룹으로 묶어 새로운 RDD로 생성하는 메서드입니다.

 

val rdd = sc.parallelize(List("a","b","c")).map((_,1))

val rdd2 = sc.parallelize(List("b","c")).map((_,2))

val result = rdd.join(rdd2)

print(result.collect.mkString(", "))

 

해당 결과는 (b,(1,2)) (c,(1,2))로 출력되는 것을 확인할 수 있습니다.

 

[ leftOuterJoin(), rightOuterJoin() ] 

해당 메서드는 RDD의 구성이 키와 값으로 구성된 경우에만 사용 가능합니다.

Join을 진행할 때, 이름 그대로 왼쪽 조인, 오른쪽 조인을 수행하여 새로운 RDD를 돌려줍니다.

 

val rdd = sc.parallelize(List("a","b","c")).map((_,1))

val rdd2 = sc.parallelize(List("b","c")).map((_,2))

val result = rdd.leftOuterJoin(rdd2)

val result2 = rdd.rightOuterJoin(rdd2)

 

print("Letf : " + result.collect.mkString(", "))

print("Right : " + result2.collect.mkString(", "))

 

결과는

Left : (a,(1,None)), (b,(1,Some(2)), (c,(1,Some(2))

Right : (b, (Some(1),2)), (c,(Some(1),2))

출력되는 것을 확인할 수 있습니다.

 

[ subtractByKey() ]

해당 메서드는 RDD의 구성이 키와 값으로 구성된 경우에만 사용 가능합니다.

2개의 RDD 중에 rdd1의 요소중에 rdd2에 같은 키가 존재하는 요소를 제외한 나머지로 구성된 새로운 RDD로 돌려줍니다.

 

val rdd1 = sc.parallelize(List("a","b","c")).map((_,1))

val rdd2 = sc.parallelize(List("b","c")).map((_,2))

val result = rdd1.subtractByKey(rdd2)

print(result.collect.mkString("\n"))

 

해당 결과는 (a,1)의 값이 출력되는 것을 확인하실 수 있습니다.

 

 

 

오늘은 Spark의 RDD에서 집합에서 주로 사용되는 연산들을 모아서 간단하게 알아보았습니다.

조금씩 알아가는 재미가 있는 Spark..!

반응형

'Spark' 카테고리의 다른 글

[We-Co] Spark RDD PIPE 및 파티션 연산  (0) 2021.08.27
[We-Co] Spark RDD 집계 연산  (0) 2021.08.26
[We-Co] groupBy(), groupByKey(), cogroup() - Spark  (0) 2021.08.24
[We-Co] RDD 생성  (0) 2021.08.11
[We-Co] SparkContext  (0) 2021.08.11