본문 바로가기
Spark

[We-Co] Spark Accumulator

by 위기의코딩맨 2021. 9. 3.
반응형

안녕하세요. 위기의 코딩맨입니다.

오늘은 Accumulator에 대해 간단하게 알아보도록 하겠습니다.

 

브로드캐스트 변수는 읽기 동작을 행하는 것이라면 Accumulator은 쓰기 동작을 위한것으로 생각하시면됩니다.

데이터 분석을 위해 미리 기록할 형식을 만들어 놓고 그 형식에 맞춰서 기록하고 분석하는 것입니다.

 

Accumulator을 사용하시려면 org.apache.spark.util.AccumulatorV2를 생성하여 사용해야합니다.

 

 

scala> import org.apache.spark.util.AccumulatorV2
import org.apache.spark.util.AccumulatorV2

scala> val acc1 =sc.longAccumulator("invalidFormat")
acc1: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 450, name: Some(invalidFormat), value: 0)

scala> val acc2 = sc.collectionAccumulator[String]("invalidFormat2")
acc2: org.apache.spark.util.CollectionAccumulator[String] = CollectionAccumulator(id: 451, name: Some(invalidFormat2), value: [])

scala> val data = List("A1:V1","A2:V2","A3","A4:V4","A5;V5","A6::A6")
data: List[String] = List(A1:V1, A2:V2, A3, A4:V4, A5;V5, A6::A6)

 

scala> sc.parallelize(data,3).foreach{ v =>
     | if(v.split(":").length != 2){
     | acc1.add(1L)
     | acc2.add(v)
     | }
     | }

 

 

scala> print("잘못된 데이터 수:" +acc1.value)

scala> print("잘못된 데이터 :" +acc2.value)
잘못된 데이터 수:3 

잘못된 데이터 :[A5;V5, A6::A6, A3]

 

예제를 확인해보면 org.apache.spark.AccumulatorV2를 가져와서

invalidFormat인 Accumulator를 생성하고 등록합니다.

그리고 데이터가 저장된 리스트를 하나 생성해주고

":" 를 기준으로 형식을 설정해주고, 잘못된 데이터가 존재하면 acc1과 acc2에 데이터를 저장해서 출력합니다.

첫번째 출력된 데이터는 ";"로 되어있고, 두번째 출력된 데이터는 "::", 세번째는 ":"가 없으므로

설정한 기준과 달라 데이터가 출력됩니다.

 

 

오늘은 Accumulator을 이용하여 데이터 기준을 설정하고,

해당 기준에 위배되는 데이터를 찾아 출력해보는 예제에 대해 알아보았습니다.

반응형