안녕하세요. 위기의코딩맨입니다.
오늘은 RDD 출력연산 2번째 시간입니다!!
바로 시작해보겠습니다.
[ aggregate() ]
reduce()와 fold() 메서드는 입출력이 모두 같은 타입으로 진행해야한다는 제약조건이 있지만,
aggregate()는 그러한 제약조건이 걸려있지않습니다.
간단하게 알아보면 총 3개의 인자를 사용합니다.
첫번째로는 fold()와 유사하게 초깃값을 지정해주고, 두번째는 각 파티션 단위 부분합을 구하기위한 병합함수,
세번째는 부분합을 최종적으로 하나로 합치기 위한 병합함수로 구성되어있습니다.
스칼라 API 문서에 aggregate() 정의는
def aggregate[U] (zeroValue: U) (seqOp: ( U, T) => U, combOp:( U, U) => U)(implicit arg0: ClassTag[U]): U
첫번재 인자는 zeroValue, 즉 초깃 값으로 사용할 값이며, 두번째 seqOp는 U, T 타입의 값을 입력값으로 전달 받아 U타입으로 값을 돌려주고 있습니다. U이때 T는 RDD의 요소들이 갖는 타입, U는 zeroValue로 전달했던 초깃값의 타입을 의미합니다.
seqQp, combOp 순서로 병합을 진행하게 됩니다.
[ sum ]
RDD의 요소를 모두 더하여 값을 반환하는 연산입니다.
모든 요수가 숫자일 경우 사용가능하며, reduceByKey(), combineByKey()는 키 - 값 형태에서만 진행이되지만
sum은 키 - 값 형태가 아니여도 진행이 가능합니다.
scala> val rdd = sc.parallelize(1 to 10)
scala> val result = rdd.sum
result: Double = 55.0
scala> print(result)
결과를 확인해보면 55.0이 출력되며, 타입은 Double 타입인 것을 확인하실 수 있습니다.
[ foreach(), foreachPartition() ]
먼저, foreach()는 RDD의 모든 요소에 특정한 함수를 적용하는 메서드입니다.
인자로 한개의 입력 값을 가지는 함수를 전달 받으며, 이렇게 전달받은 함수에 각 RDD 요소를 하나씩 입력 값으로 사용할 수 있습니다.
scala> val rdd = sc.parallelize(1 to 10, 3)
scala> rdd.foreach{ v=> println(s"foreach Value = ${v}") }
foreach Value = 1
foreach Value = 2
foreach Value = 4
foreach Value = 3
foreach Value = 5
foreach Value = 6
foreach Value = 7
foreach Value = 8
foreach Value = 9
foreach Value = 10
foreachPartition()은 foreach()와 같이 실행 함수를 인자를 전달받지만,
차이점은 개별 요소가 아닌, 파티션 단위로 적용합니다.
scala> rdd.foreachPartition(values => { println("Partition Value")
| for (v <- values) println(s"Value : ${v}")})
Partition Value
Partition Value
Value : 1
Value : 7
Value : 4
Value : 8
Value : 2
Value : 9
Value : 5
Value : 10
결과를 확인해보면 foreach()는 각 요소마다 출력되고, foreachPartitions()는 각 파티션마다 한번식 실행된 것을 확인할 수 있습니다.
[ cache(), persist(), unpersist() ]
RDD는 액션 연산이 수행될 때마다 관련 트랜스포메이션 연산을 반복하게 됩니다.
기존에 사용한 데이터가 메모리에 남아있다면, 그대로 사용하면 되지만, 그렇지 않다면 새로운 RDD를 생성해야합니다.
RDD는 반복적으로 사용될 경우 데이터를 메모리에 저장해 두는것이 새로운 RDD를 생성하는 것보다 유리합니다.
cache()와 persist()는 첫 액션이 실행된 후, RDD 정보를 메모리, 디스크에 저장해서 다음 수행할 때
재생성하지 않고 즉시 실행할 수있게 해주는 메서드입니다.
presist()는 storagelevel의 옵션을 사용해서 저장 위치, 저장방식 등 상세히 지정할 수 있는 기능을 제공합니다.
MEMORY_ONLY는 메모리에만 저장하라는 의미이고
MEMORY_AND_DISK_SER은 메모리에 저장하다가 공간이 부족할 경우 DISK를사영하여 직렬화된 포맷을 이용하라는 의미입니다.
마지막으로 unpersist()는 이지 저장중인 데이터가 더 이상 필요 없을 때 캐시 설정을 취소하는데 사용됩니다.
scala> val rdd = sc.parallelize(1 to 100, 10)
scala> rdd.cache
scala> rdd.persist(StorageLevel.MEMORY_ONLY)
[ partitions() ]
RDD의 파티션 정보가 담긴 배열을 돌려주는 메서드입니다.
파티션의 인덱스 정보를 알려주는 index() 메서드도 포함하고 있으며, 파티션 크기를 알아보기 위해 많이 사용됩니다.
단순하게 크기 정보를 얻으려면 getNumPartitions() 메서드를 사용합니다.
scala> val rdd = sc.parallelize(1 to 1000, 10)
scala> print(rdd.partitions.size)
10
scala> print(rdd.getNumPartitions)
10
결과를 확인해보면 해당 파티션의 크기를 나타내는 것을 확인할 수 있습니다.
Spark RDD 출력 연산을 간단하게 알아보았습니다..
내용이 좀 많아 2개로 나눠서 진행했습니다.
응용할 수 있도록 화이팅
'Spark' 카테고리의 다른 글
[We-Co] Spark Dataset (0) | 2021.09.06 |
---|---|
[We-Co] Spark Accumulator (0) | 2021.09.03 |
[We-Co] Spark RDD 출력 연산 (1) (0) | 2021.09.02 |
[We-Co] Spark RDD filter 및 정렬 연산 (0) | 2021.09.01 |
[We-Co] Spark RDD PIPE 및 파티션 연산 (0) | 2021.08.27 |