We-Co

[We-Co] Spark Accumulator 본문

Spark

[We-Co] Spark Accumulator

위기의코딩맨 2021. 9. 3. 12:26
반응형

안녕하세요. 위기의 코딩맨입니다.

오늘은 Accumulator에 대해 간단하게 알아보도록 하겠습니다.

 

브로드캐스트 변수는 읽기 동작을 행하는 것이라면 Accumulator은 쓰기 동작을 위한것으로 생각하시면됩니다.

데이터 분석을 위해 미리 기록할 형식을 만들어 놓고 그 형식에 맞춰서 기록하고 분석하는 것입니다.

 

Accumulator을 사용하시려면 org.apache.spark.util.AccumulatorV2를 생성하여 사용해야합니다.

 

 

scala> import org.apache.spark.util.AccumulatorV2
import org.apache.spark.util.AccumulatorV2

scala> val acc1 =sc.longAccumulator("invalidFormat")
acc1: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 450, name: Some(invalidFormat), value: 0)

scala> val acc2 = sc.collectionAccumulator[String]("invalidFormat2")
acc2: org.apache.spark.util.CollectionAccumulator[String] = CollectionAccumulator(id: 451, name: Some(invalidFormat2), value: [])

scala> val data = List("A1:V1","A2:V2","A3","A4:V4","A5;V5","A6::A6")
data: List[String] = List(A1:V1, A2:V2, A3, A4:V4, A5;V5, A6::A6)

 

scala> sc.parallelize(data,3).foreach{ v =>
     | if(v.split(":").length != 2){
     | acc1.add(1L)
     | acc2.add(v)
     | }
     | }

 

 

scala> print("잘못된 데이터 수:" +acc1.value)

scala> print("잘못된 데이터 :" +acc2.value)
잘못된 데이터 수:3 

잘못된 데이터 :[A5;V5, A6::A6, A3]

 

예제를 확인해보면 org.apache.spark.AccumulatorV2를 가져와서

invalidFormat인 Accumulator를 생성하고 등록합니다.

그리고 데이터가 저장된 리스트를 하나 생성해주고

":" 를 기준으로 형식을 설정해주고, 잘못된 데이터가 존재하면 acc1과 acc2에 데이터를 저장해서 출력합니다.

첫번째 출력된 데이터는 ";"로 되어있고, 두번째 출력된 데이터는 "::", 세번째는 ":"가 없으므로

설정한 기준과 달라 데이터가 출력됩니다.

 

 

오늘은 Accumulator을 이용하여 데이터 기준을 설정하고,

해당 기준에 위배되는 데이터를 찾아 출력해보는 예제에 대해 알아보았습니다.

반응형