안녕하세요. 위기의 코딩맨입니다.
오늘은 RDD의 filter 및 정렬 연산에 대해 알아보도록 하겠습니다.
[ filter() ]
용어 그대로 내가 원하는 요소를 뽑아내는 함수입니다
scala> val rdd = sc.parallelize(1 to 5)
scala> val result = rdd.filter(_>2)
scala> print(result.collect.mkString(", "))
결과를 확인해보면 3, 4, 5 가 출력되는 것을 확인할 수 있습니다.
1~ 5까지 숫자를 RDD에 넣어주고
2보다 큰 수를 result에 넣어주도록 filter()의 조건을 설정해주면 결과 값이 출력 됩니다.
[ sortByKey() ]
sortByKey() 함수는 키 값을 기준으로 RDD의 요소들을 정렬하는 연산입니다.
키를 기준으로 잡기 때문에 RDD는 키와 값 형태로 구성되어야 합니다.
scala> val rdd = sc.parallelize(List("q","z","a"))
scala> val result = rdd.map((_,1)).sortByKey()
scala> print(result.collect.mkString(", "))
결과를 확인해보면 (a,1), (q,1), (z,1)의 값으로 출력되는 것을 확인할 수 있습니다.
rdd에 q, z, a를 요소를 넣어주고, map을 이용하여 값을 1로 설정하고, sortByKey()를 이용하여 a, q, z로 정렬하면
원하는 결과 값을 얻을 수 있습니다.
[ Keys(), values() ]
RDD의 구성이 키와 값의 형태로 되어있어야 사용 가능합니다.
이름 그대로 Keys()는 RDD에서 키 값만 가져오는 함수, values()는 값만 가져오는 함수입니다.
위에 sortByKey()에 사용되었던 result를 사용해서 출력해보겠습니다
scala> print(result.keys.collect.mkString(", "))
출력 결과는 a, q, z 정렬된 값 그대로 Key값만 출력되는 것을 확인할 수 있습니다.
scala> print(result.values.collect.mkString(", "))
출력 결과는 1, 1, 1 map으로 1의 값들을 넣어준 값들이 출력 됩니다.
키와 값으로 구성된 RDD가 아니면 오류가 발생합니다.
[ sample() ]
sample()를 이용하여 샘플을 추출해서 새로운 RDD를 생성할 수 있는 함수입니다.
sample() 함수를 스칼라용 api 문서에서 제공하는 정의를 확인해보겠습니다.
sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.netLong):RDD[T]
첫번째 인자 withReplacement는 복원 추출을 진행할 여부를 확인하는 부분입니다. true면 복원 추출, false면 비복원 추출을 수행합니다.
두번째 인자 fraction은 복원 추출, 비복원 추출 때 의미가 달라집니다.
복원 추출일 경우 샘플내에서 요소들이 나타내는 횟수에 대한 기대값, 각 요소의 평균 발생 횟수를 의미합니다.
반드시 0 이상의 값을 지정해야합니다.
비복원 추출일 경우는 각 요소가 포함될 확률을 의미합니다. 0과 1사이의 값으로 지정해주면 됩니다.
중요한점은 sample()은 샘플의 크기를 지정해 놓고 추출하는 메서드가 아닙니다.
예를 들면 fraction에 0.5를 입력한다고 해서 전체 크기의 절반에 해당하는 샘플을 추출하지 않는다는 것입니다.
크기를 지정하고 샘플을 추출하기위해선 takeSample()을 사용해야 합니다.
마지막 세번째 인자 seed는 일반적인 무작위 값 추출시 사용하는 것과 유사한 개념이다.
반복 시행 시 결과가 바뀌지 않으며, 일정한 값이 나오도록 제어하는 목적으로 사용가능합니다.
scala> val rdd = sc.parallelize(1 to 100)
scala> val result1 = rdd.sample(false, 0.5)
scala> val result2 = rdd.sample(true, 1.5)
scala> print(result1.collect.mkString(","))
출력 값으로는 2,3,5,6,8,9,10,12,19,20,21,27,30,31,32,33,39,40,41,47,52,55,56,57,60,62,66,67,70,71,73,74,75,76,79,83,84,85,87,88,90,91,92,93,94,96,98,99이 출력이 되는데
scala> print(result1.take(5).mkString(","))
위의 코드로 출력하게 되면 2,3,5,6,8이 출력됩니다. 앞에 출력된 5개의 값이 출력되는 느낌입니다.
scala> print(result2.collect.mkString(","))
1,2,2,4,5,5,5,6,6,7,8,9,9,9,9,10,10,11,11,12,12,13,14,14,17,18,19,19,20,21,21,22,22,24,24,25,26,26,27,28,28,28,28,33,34,34,35,35,36,36,38,38,38,39,39,39,39,41,41,41,43,43,43,44,45,45,45,46,46,47,49,49,50,50,50,52,53,54,54,55,55,55,56,56,57,58,58,59,59,60,61,61,61,62,62,62,62,63,63,64,66,67,67,67,68,69,69,69,70,70,71,73,74,74,76,76,77,77,78,79,80,80,80,81,83,84,84,84,85,85,86,87,88,90,92,92,92,93,94,94,94,95,95,96,96,96,96,96,97,98,99,99,99,100
1,2,2,4,5의 값이 출력되는데
scala> print(result2.take(5).mkString(","))
1,2,2,4,5의 값이 출력됩니다. 앞에서 출력된 결과 값에서 앞의 5개의 값이 출력됩니다.
오늘은 RDD의 filter 및 정렬 연산에 대해 알아보았습니다.
모두 응용할 수 있도록 공부 열심히!
'Spark' 카테고리의 다른 글
[We-Co] Spark RDD 출력 연산 (2) (0) | 2021.09.02 |
---|---|
[We-Co] Spark RDD 출력 연산 (1) (0) | 2021.09.02 |
[We-Co] Spark RDD PIPE 및 파티션 연산 (0) | 2021.08.27 |
[We-Co] Spark RDD 집계 연산 (0) | 2021.08.26 |
[We-Co] Spark RDD의 집합 연산 (0) | 2021.08.26 |