안녕하세요. 위기의 코딩맨입니다.
오늘은 Accumulator에 대해 간단하게 알아보도록 하겠습니다.
브로드캐스트 변수는 읽기 동작을 행하는 것이라면 Accumulator은 쓰기 동작을 위한것으로 생각하시면됩니다.
데이터 분석을 위해 미리 기록할 형식을 만들어 놓고 그 형식에 맞춰서 기록하고 분석하는 것입니다.
Accumulator을 사용하시려면 org.apache.spark.util.AccumulatorV2를 생성하여 사용해야합니다.
scala> import org.apache.spark.util.AccumulatorV2
import org.apache.spark.util.AccumulatorV2
scala> val acc1 =sc.longAccumulator("invalidFormat")
acc1: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 450, name: Some(invalidFormat), value: 0)
scala> val acc2 = sc.collectionAccumulator[String]("invalidFormat2")
acc2: org.apache.spark.util.CollectionAccumulator[String] = CollectionAccumulator(id: 451, name: Some(invalidFormat2), value: [])
scala> val data = List("A1:V1","A2:V2","A3","A4:V4","A5;V5","A6::A6")
data: List[String] = List(A1:V1, A2:V2, A3, A4:V4, A5;V5, A6::A6)
scala> sc.parallelize(data,3).foreach{ v =>
| if(v.split(":").length != 2){
| acc1.add(1L)
| acc2.add(v)
| }
| }
scala> print("잘못된 데이터 수:" +acc1.value)
scala> print("잘못된 데이터 :" +acc2.value)
잘못된 데이터 수:3
잘못된 데이터 :[A5;V5, A6::A6, A3]
예제를 확인해보면 org.apache.spark.AccumulatorV2를 가져와서
invalidFormat인 Accumulator를 생성하고 등록합니다.
그리고 데이터가 저장된 리스트를 하나 생성해주고
":" 를 기준으로 형식을 설정해주고, 잘못된 데이터가 존재하면 acc1과 acc2에 데이터를 저장해서 출력합니다.
첫번째 출력된 데이터는 ";"로 되어있고, 두번째 출력된 데이터는 "::", 세번째는 ":"가 없으므로
설정한 기준과 달라 데이터가 출력됩니다.
오늘은 Accumulator을 이용하여 데이터 기준을 설정하고,
해당 기준에 위배되는 데이터를 찾아 출력해보는 예제에 대해 알아보았습니다.
'Spark' 카테고리의 다른 글
[We-Co] Spark Dataset,Dataframe을 이용한 단어 수 세기 예제 (0) | 2021.09.06 |
---|---|
[We-Co] Spark Dataset (0) | 2021.09.06 |
[We-Co] Spark RDD 출력 연산 (2) (0) | 2021.09.02 |
[We-Co] Spark RDD 출력 연산 (1) (0) | 2021.09.02 |
[We-Co] Spark RDD filter 및 정렬 연산 (0) | 2021.09.01 |