We-Co

[We-Co] Spark Dataset,Dataframe을 이용한 단어 수 세기 예제 본문

Spark

[We-Co] Spark Dataset,Dataframe을 이용한 단어 수 세기 예제

위기의코딩맨 2021. 9. 6. 14:09
반응형

안녕하세요. 위기의코딩맨 입니다.
오늘은 Dataset과 dataframe을 이용하여 단어 수를 카운트하는 예제를 작성해보겠습니다.

우선 코드를 작성하기 전에 pom.xml 파일을 설정해주셔야 합니다.

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.0</version>
</dependency>

이제 코드를 작성하는데 단계별로 나눠서 진행하겠습니다.

1. SaprkSession 생성
2. SparkSession으로부터 Dataset 또는 Dataframe 생성
3. 생성된 Dataset 또는 Dataframe을 통한 데이터 처리
4. SaprkSession 종료

[ SparkSession 생성 ]

scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession
scala> val spark = SparkSession.builder().appName("Sample").master("local[*]").getOrCreate()

Sparksesion을 이용하기 위해 import를 진행해주고 builder() 메서드를 이용하여 스파크 세션을 생성합니다.

[ SparkSession으로부터 Dataset 또는 Dataframe 생성 ]

scala> val source = "file://{파일경로}/input.txt"
source: String = file://{파일경로}/input.txt
scala> val df = spark.read.text(source)
21/09/06 13:25:27 INFO InMemoryFileIndex: It took 25 ms to list leaf files for 1 paths.
df: org.apache.spark.sql.DataFrame = [value: string]

Count할 텍스트 파일경로를 설정해주고, df 객체안에 생성한 SparkSession을 이용하여 해당 텍스트파일을 읽어 들이는
Dataframe을 생성하고 read()를 이용하여 해당 텍스트 파일을 읽습니다.

[생성된 Dataset 또는 Dataframe을 통한 데이터 처리]

Dataframe을 생성했고, 이제 해당 텍스트파일안에 실제 안어 수를 세는 작업을 진행해야합니다.

scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._
scala> import spark.implicits._
import spark.implicits._

scala> val wordDF = df.select(explode(split(col("value")," ")).as("word"))
wordDF: org.apache.spark.sql.DataFrame = [word: string]
scala> val result = wordDF.groupBy("word").count
result: org.apache.spark.sql.DataFrame = [word: string, count: bigint]

예제를 살펴보면 텍스트파일을 읽어들인 Dataframe df는 로우와 칼럼 구조를 갖고있는데,
기본적으로 "value"의 이름의 칼럼이 생성됩니다.

org.apache.spark.sql.functions_는 col("value")를 사용하기 위한 것이며,
org.apache.spakr.sql.Column 타입의 인스턴스입니다.
해당 문자를 " "기준으로 자르고 as("word")는 컬럼의 이름을 word로 지정하기 위한 것입니다.

result.show로 해당 값을 출력해보면 input.txt파일안에 들어있는 단어들을 count하여 가져오는 것을 확인할수 있습니다.
+----+-----+
|word|count|
+----+-----+
|name| 1|
| is | 1|
| my | 1|
| hi | 1|
|weco| 1|
+----+-----+


위의 방식은 비타입연산의 종류이며, 타입 연산의 방법은

scala> import spark.implicits._
import spark.implicits._

scala> val ds = df.as[(String)]
ds: org.apache.spark.sql.Dataset[String] = [value: string]

scala> val wordDF = ds.flatMap(_.split(" "))
wordDF: org.apache.spark.sql.Dataset[String] = [value: string]

scala> val result_2 = wordDF.groupByKey(v => v).count
result_2: org.apache.spark.sql.Dataset[(String, Long)] = [key: string, count(1): bigint]

예제를 살펴보면 flatMap(), groupByKey()등 RDD에서 사용하는 메서드를 많이 사용합니다.
val ds = df.as[(String)] 부분은 Dataframe을 Dataset으로 변환하는 부분입입니다.
변환된 Dataset을 flatMap을 이용해서 문장을 각 단어로 나누고, groupByKey()를 이용하여 같은 단어끼리 그룹을 생성하고 count()를 이용하여 개수를 구해 저장합니다.

+----+--------+
| key|count(1)|
+----+--------+
|name| 1|
| is | 1|
| my| 1|
| hi | 1|
|weco| 1|
+----+--------+

result_2.show를 진행하게되면 출력되는 출력 값입니다.


오늘은 Dataset과 Dataframe을 이용한 단어 수 세기 예제를 풀어보았습니다.
조금씩 배워나가는 기분...

반응형

'Spark' 카테고리의 다른 글

[We-Co] Spark Dataset의 기본연산  (0) 2021.09.09
[We-Co] Spark Dataset 액션연산  (0) 2021.09.08
[We-Co] Spark Dataset  (0) 2021.09.06
[We-Co] Spark Accumulator  (0) 2021.09.03
[We-Co] Spark RDD 출력 연산 (2)  (0) 2021.09.02