일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | |||||
3 | 4 | 5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 | 15 | 16 |
17 | 18 | 19 | 20 | 21 | 22 | 23 |
24 | 25 | 26 | 27 | 28 | 29 | 30 |
- 맛집
- 홍대 맛집
- 캐글
- 부스트클래스
- 자연어
- DataSet
- 연남 맛집
- 위기의코딩맨
- pycharm
- TensorFlow
- spark
- 서울 맛집
- tensorflow 예제
- 연남동 맛집
- AI 엔지니어 기초 다지기
- Ai
- NLP
- 데이터 시각화
- 부스트캠프 ai tech 준비과정
- AI Tech 준비과정
- Spark MLlib
- Transformer
- AI tech
- mllib
- r
- yolo
- Python
- kaggle
- RDD
- 부스트캠프
- Today
- Total
We-Co
[We-Co] Spark RDD PIPE 및 파티션 연산 본문
안녕하세요. 위기의 코딩맨입니다.
오늘은 Spark RDD PIP 및 파티션 연산에 대해 알아보도록 하겠습니다.
[ pipe() ]
데이터를 처리할 때 외부 프로세스를 사용할 수 있습니다.
val rdd = sc.parallelize(List("1,2,3","4,5,6","7,8,9"))
val result = rdd.pipe("cut -f 1,3 -d,")
print(result.collect.mkString(", "))
결과는 1,3 4,6 7,9 의 결과를 얻을 수 있습니다. 해당 건은 3개 숫자의 문자열을 리눅스의 cut 유틸리티를 이용해 분리하고 1, 3번째 숫자를 가져오는 예제입니다.
[ coalesce(), repartition() ]
RDD를 생성하고, filter() 연산 등, 많은 트랜스포메이션 연산을 수행을 진행하다 보면 설정된 파티션의 개수가 적합하지 않은 경우가 많이 발생하게 됩니다. 이 경우에 coaleesce(), repartition()을 사용해 현재의 RDD 파티션 개수를 조정할 수 있습니다.
coalesce()는 파티션 수를 줄이거나 늘리는 것 모두 가능하지만 repartition()은 줄이는 것만 가능합니다.
coalesce()는 모두 사용가능하지만 줄이는 성능은 repartition()이 더 좋기 때문에
파티션을 늘릴때는 coalesce() 줄일때는 repartition()을 사용하는 것이 좋습니다.
val rdd1 = sc.parallelize(1 to 100000, 10)
val rdd2 = rdd1.coalesce(5)
val rdd3 = rdd2.repartition(10)
print(s"partition size : ${rdd1.getNumPartitions}")
print(s"partition size : ${rdd2.getNumPartitions}")
print(s"partition size : ${rdd3.getNumPartitions}")
출력 결과를 확인해보면
partition size : 10
partition size : 5
partition size : 10
값으로 출력 되는 것을 확인할 수 있습니다.
[ repartitionAndsortWithinPartitions() ]
RDD의 구성이 키와 값으로 구성되어 있을 경우 사용가능합니다.
RDD를 구성하는 모든 데이터를 특정한 기준을 잡고, 여러 개의 파티션으로 나누고, 각각의 파티션 단위로 정렬하여
그 결과를 새로운 RDD를 생성하는 메서드입니다. 그리고 각 데이터가 어떤 파티션에 속할지 결정하기 위한 파티셔너를 설정해야 합니다. (org.apache.spark.HashPartitioner)
파티셔너는 각 데이터의 키 값을 이용하여 데이터가속할 파티션을 결정합니다.
해당 예제는 조금 더 이해가 필요해서 나중에 따로 알아보도록 하겠습니다.
[ partitionsBy() ]
RDD의 구성이 키와 값으로 구성되어 있을 경우 사용가능합니다.
(org.apache.spark.HashPartitioner)을 파티셔너를 설정해주고, 사용하시면 됩니다.
각 요소의 키를 특정 파티션에 할당하는 역할을 합니다.
val rdd1 = sc.parallelize(List("apple", "mouse","monitor"),5).map{a=>(a,a.length)}
val rdd2 = rdd1.partitionBy(new HashPartitioner(3))
print(s"rdd1:${rdd1.getNumPartitions}, rdd2:${rdd2.getNumPartitions}")
출력 결과는 rdd1:5, rdd2: 3으로 결과가 출력됩니다. 예제를 통해 파티션 크기를 5크기로 지정하여 rdd1을 생성하고 연산에 new HashPartitioner(3)를 추가해총 3개의 파티션이 생성된 것을 확인 할 수 있습니다.
오늘은 pipe 연산에 필요한 메서드들을 확인해보았습니다.
간단하게만 알아보고 나중에 응용해서 써보도록 합시다!!
'Spark' 카테고리의 다른 글
[We-Co] Spark RDD 출력 연산 (1) (0) | 2021.09.02 |
---|---|
[We-Co] Spark RDD filter 및 정렬 연산 (0) | 2021.09.01 |
[We-Co] Spark RDD 집계 연산 (0) | 2021.08.26 |
[We-Co] Spark RDD의 집합 연산 (0) | 2021.08.26 |
[We-Co] groupBy(), groupByKey(), cogroup() - Spark (0) | 2021.08.24 |