본문 바로가기
Spark

[We-Co] Spark Dataset의 기본연산

by 위기의코딩맨 2021. 9. 9.
반응형

안녕하세요. 위기의코딩맨입니다.

오늘은 Spark Dataset에서 기본 제공되는 연산에대해 알아보도록 하겠습니다.

 

Dataset이 제공하는 연산은 크게 4가지로 제공되는데,

첫번째는 기본 연산, 두번째는 타입 트랜스포메이션 연산, 세번째는 비타입 트랜스포메이션 연산, 마지막 액션 연산으로 나눌수 있습니다.

 

scala> case class Person(name:String, age: Int, job:String)
defined class Person 
scala> val row = Person("Person_1",5,"student")
row: Person = Person(Person_1,5,student) 
scala> val row2 = Person("Person_2",10,"student")
row2: Person = Person(Person_2,10,student) 
scala> val row3 = Person("Person_3",15,"programmer")
row3: Person = Person(Person_3,15,programmer) 
scala> val data = List(row,row2,row3)
data: List[Person] = List(Person(Person_1,5,student), Person(Person_2,10,student), Person(Person_3,15,programmer)) 
scala> val df = spark.createDataFrame(data)

 

저번과 마찬가지로 해당 예제 소스를 이용하여 연산을 진행해보겠습니다.

[ cache(), persist() ]

Dataset은 RDD와 마찬가지로 작업 중인 데이터를 메모리에 저장합니다. 

그리고 Dataset의 경우 스키마 정보를 활용해 칼럼 단위로 데이터를 다룰 수 있어 최적화된 방식으로

메모리에 저장할 수 있습니다.

 

cache()와 persist()는 이러한 메모리에 저장하는 역할을 진행합니다.

persist()는 각종 제이터 저장방법과 관련된 옵션을 선택할 수 있습니다.

cache()는 persist()와 동일한 기능을 진행하지만, 스파크 SQL의 기본 값은 MEMORY_AND_DISK 타입을 사용합니다.

 

scala> df.persist(StorageLevel.MEMORY_AND_DISK_2)

res13: df.type = [name: string, age: int ... 1 more field]

 

MEMORY_AND_DISK_2의 의미는 메모리에 저장을 먼저 진행하고 저장공간이 부족할 시, DISK에 저장한다는 의미이며 끝에 2는 복제 횟수가 2를 의미합니다.

해당 결과만으로 캐시가 성공적으로 수행되었는지는 알 수 없습니다.

캐시가 잘 수행됐는지 여부를 확인하기 위해서는 웹 UI를 통해 확인해야합니다.

 

http://<spark-shell이용 IP>:4040/ 

 

위 해당 사이트로 접속해보시면됩니다.

Spark UI

해당하는 웹UI로 접속이 되는데, 상단에 'Storage' 탭을 클릭하여 캐시 상태를 조회하는 화면으로 이동합니다.

하지만 위에 실행한 cache는 수행되지 않은걸로 보입니다.

해당 실행은 트랜스포메이션 메서드이기 때문에 액션이 실행 되어야 실행되기 때문입니다.

 

scala> df.count

 

cache

액션 연산자인 count를 사용해보고 다시 storage 탭을 확인해보니, 캐시정보를 확인할 수 있습니다.

웹UI를 사용하지 않더라도 실제 캐시 여부보다 조금 더 상세한 캐시 상태를 확인할 수 있는 방법이있습니다.

 

scala> spark.sharedState.cacheManager.lookupCachedData(df)
res16: Option[org.apache.spark.sql.execution.CachedData] =
Some(CachedData(LocalRelation [name#0, age#1, job#2]
,InMemoryRelation [name#0, age#1, job#2], StorageLevel(disk, memory, deserialized, 2 replicas)
   +- LocalTableScan [name#0, age#1, job#2]))

 

CaachedData를 사용하여 df의 캐시정보를 확인할 수 있습니다. 만약 액션 연산이 실행 전이면 None이 리턴됩니다.

 

 

[ printSchema(), columns, dtypes, schema ]

Dataset은 다양한 방법으로 스키마 정보를 조회할 수 있습니다.

 

scala> df.printSchema()
root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = false)
 |-- job: string (nullable = true)

 


scala> df.columns
res19: Array[String] = Array(name, age, job)

scala> df.dtypes
res20: Array[(String, String)] = Array((name,StringType), (age,IntegerType), (job,StringType))

 

scala> df.schema
21/09/09 11:28:30 INFO BlockManagerInfo: Removed broadcast_4_piece0 on 192.168.25.49:1782 in memory (size: 6.9 KiB, free: 366.3 MiB)
21/09/09 11:28:30 INFO BlockManagerInfo: Removed broadcast_5_piece0 on 192.168.25.49:1782 in memory (size: 5.0 KiB, free: 366.3 MiB)
res22: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(age,IntegerType,false), StructField(job,StringType,true))

 

해당 방식들을 이용해 스키마 정보를 확인할 수 있습니다.

 

 

[ createOrReplaceTempView() ]

Dataframe을 테이블로 변환하는 메서드입니다.

테이블로 변환된 후, SQl을 사용해 데이터 처리를 진행할 수 있습니다.

 

scala> df.createOrReplaceTempView("users")

scala> spark.sql("select name, age from users where age >10").show

+--------+---+
|    name|age|
+--------+---+
|Person_3| 15|
+--------+---+

 

해당 테이블을 생성하고 10살 초과된 사람을 찾는 쿼리를 실행하면 해당 결과를 반환받을 수 있습니다.

 

 

[ explain() ]

Dataframe 처리와 관련된 실행 정보를 출력하는 메서드입니다.

 

 

scala> spark.sql("select name, age from users where age >10").explain(true)
== Parsed Logical Plan ==
'Project ['name, 'age]
+- 'Filter ('age > 10)
   +- 'UnresolvedRelation [users], [], false

== Analyzed Logical Plan ==
name: string, age: int
Project [name#0, age#1]
+- Filter (age#1 > 10)
   +- SubqueryAlias users
      +- LocalRelation [name#0, age#1, job#2]

== Optimized Logical Plan ==
Project [name#0, age#1]
+- Filter (age#1 > 10)
   +- InMemoryRelation [name#0, age#1, job#2], StorageLevel(disk, memory, deserialized, 2 replicas)
         +- LocalTableScan [name#0, age#1, job#2]

== Physical Plan ==
*(1) Filter (age#1 > 10)
+- InMemoryTableScan [name#0, age#1], [(age#1 > 10)]
      +- InMemoryRelation [name#0, age#1, job#2], StorageLevel(disk, memory, deserialized, 2 replicas)
            +- LocalTableScan [name#0, age#1, job#2]

 

 

 

오늘은 Dataset의 기본연산에 대해 간단하게 알아보았고,

웹UI에서 캐시를 확인하는 방법도 알아보았습니다.

반응형