We-Co

[We-Co] Spark MLlib - Pipeline, Logistic Regression 본문

Spark

[We-Co] Spark MLlib - Pipeline, Logistic Regression

위기의코딩맨 2021. 9. 28. 12:06
반응형

안녕하세요. 위기의코딩맨입니다.

오늘은 계속해서 Spark MLlib의 Logistic Regression

로지스틱 회귀 알고리즘을 사용해서 간단한 학습을 진행해보겠습니다.

 

키, 몸무게, 나이 정보를 이용하여 성별을 예측해보는 Pipeline API를 사용하여 예제를 살펴보면서

알아보도록 하겠습니다.

 

먼저, 언어는 Scala를 사용했으며, IDE Eclipse를 사용하여 진행했습니다.

 

 

[ 예제 ]

import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel}
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.sql.SparkSession

import org.apache.log4j.Logger
import org.apache.log4j.Level

object SparkSample01 {

  def main(args: Array[String]) { 
    Logger.getLogger("org").setLevel(Level.OFF)
    Logger.getLogger("akka").setLevel(Level.OFF)


    val spark = SparkSession.builder().appName("SparkSameple01").master("local[*]").getOrCreate()
    
    //훈련용 데이터 키, 몸무게, 나이, 성별
    val training_Data = spark.createDataFrame(Seq(
        (161.0, 69.17, 29, 1.0),
        (175.5, 88.82, 15, 1.0),
        (155.2, 45.87, 22, 0.0))).toDF("height","weight","age","gender")
      

  //테스트 데이터 키, 몸무게, 나이
    val Test_Data = spark.createDataFrame(Seq(
        (169.3, 75.17, 42),
        (185.1, 85.82, 38),
        (161.2, 62.37, 28))).toDF("height","weight","age")


    training_Data.show(false)
    
    val assembler = new VectorAssembler().setInputCols(Array("height", "weight", "age")).setOutputCol("features")
    //taining Data에 features Col 추가
    val assembled_Training = assembler.transform(training_Data)
    assembled_Training.show(false)
 
    
    // Model Create AG, logistic Regression
    val Learning_Rate = new LogisticRegression().setMaxIter(10).setRegParam(0.01).setLabelCol("gender")
    
    //pipeline
    val pipeline = new Pipeline().setStages(Array(assembler,Learning_Rate))
    val pipelineModel = pipeline.fit(training_Data) 
    //pipelineModel을 이요하여 예측값 생성
    pipelineModel.transform(training_Data).show()
    
    //Model 저장
    val Save_Path = "C:/Users/JGH/Desktop/pipelinemodel"
    pipelineModel.write.overwrite().save(Save_Path)
    
    //저장 Model 불러오기
    val loadedPipelineModel = PipelineModel.load(Save_Path) 
    
    spark.stop
  } 
}

 

 

먼저, 사용할 라이브러리들을 import하고

spark 실행 로그를 줄이기위해 

 

import org.apache.log4j.Logger
import org.apache.log4j.Level

 

Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)

 

해당 코드를 작성해주었습니다.

다음으로 가장 먼저 진행해야하는 Dataframe을 생성하기 위한 SparkSession을 생성해주도록 합니다.

 

 val spark = SparkSession.builder().appName("SparkSameple01").master("local[*]").getOrCreate()

 training_Data.show(false)

 

훈련을 위한 Dataset과 테스트를 위한 Dataset을 Dataframe으로 생성하여 출력해보았습니다.

 

+------+------+---+------+
|height|weight|age|gender|
+------+------+---+------+
|161.0 |69.17 |29 |1.0   |
|175.5 |88.82 |15 |1.0   |
|155.2 |45.87 |22 |0.0   |
+------+------+---+------+

 

훈련 데이터는 모두 4개의 컬럼이 존재하며 키, 몸무게, 나이 컬럼은 예측을 위한 특성, 성별은 레이블로 사용됩니다.

3개의 특성이 속하는 칼럼 값을 갖기위한 벡터를 생성해 줘야합니다.

VectorAssembler 트랜스포머는 Spark 에서 제공하는 특성을 변환하는 알고리즘이며,

setInputCols()로 지정한 칼럼의 값을 포함하는 벡터를 생성하고

해당 벡터를 가진 새로운 컬럼의 이름을 setOutputCol()로 생성해줍니다. 

 

val assembler = new VectorAssembler().setInputCols(Array("height", "weight", "age")).setOutputCol("features")

 

 

해당 assembler을 생성해주고 트랜스포머 호출할 수 있는 transform()을 사용하여 해당기능을 적용합니다.

 

val assembled_Training = assembler.transform(training_Data)
assembled_Training.show(false)

 

+------+------+---+------+------------------+
|height|weight|age|gender|features          |
+------+------+---+------+------------------+
|161.0 |69.17 |29 |1.0   |[161.0,69.17,29.0]|
|175.5 |88.82 |15 |1.0   |[175.5,88.82,15.0]|
|155.2 |45.87 |22 |0.0   |[155.2,45.87,22.0]|
+------+------+---+------+------------------+

 

결과를 확인해보면 기존의 컬럼에 features가 추가되었는데

해당 데이터는 키,몸무게,나이의 값이 포함된 것을 확인할 수 있습니다.

특성 벡터가 준비 되었다면 성별 분류 모델을 만들기위한 알고리즘을 작성해야합니다.

이진 분류 알고리즘인 Logistic Regression(로지스틱 회귀 알고리즘)을 사용하도록 하겠습니다.

 

val Learning_Rate = new LogisticRegression().setMaxIter(10).setRegParam(0.01).setLabelCol("gender")

 

LogisticRegression()의 setMaxIter()로 반복 실행 횟수, setRegParm()로 얼마나 차이를 줄여나갈지를 설정할 수 있습니다.

이제 알고리즘도 준비되었으니 적합시켜 Pipeline api를 사용하여 모델을 생성해야합니다. 

 

val pipeline = new Pipeline().setStages(Array(assembler,Learning_Rate))
val pipelineModel = pipeline.fit(training_Data)  
pipelineModel.transform(training_Data).show()

 

+------+------+---+------+------------------+--------------------+--------------------+----------+
|height|weight|age|gender|          features|       rawPrediction|         probability|prediction|
+------+------+---+------+------------------+--------------------+--------------------+----------+
| 161.0| 69.17| 29|   1.0|[161.0,69.17,29.0]|[-2.4822151601906...|[0.07711440636316...|       1.0|
| 175.5| 88.82| 15|   1.0|[175.5,88.82,15.0]|[-3.6571095085973...|[0.02515775378340...|       1.0|
| 155.2| 45.87| 22|   0.0|[155.2,45.87,22.0]|[2.03575149854115...|[0.88449995044261...|       0.0|
+------+------+---+------+------------------+--------------------+--------------------+----------+

 

결과를 확인해보면 prediction에 성별을 예측한 결과를 나타내는 것을 확인할 수 있습니다.

pipeline()의 setSatages()를 이용하여 조합하고, Pipeline클래스 fit()을 호출하여 pipelinemodel을 생성하게 됩니다.

또한 해당 생성된 모델을 외부저장소에 저장하고 불러와 재사용할 수 있습니다.


val Save_Path = "C:/Users/JGH/Desktop/pipelinemodel"
pipelineModel.write.overwrite().save(Save_Path)
val loadedPipelineModel = PipelineModel.load(Save_Path) 

 

 

오늘은 간단하게 Pipeline을 사용하여 모델을 생성하고

키, 몸무게, 나이를 통한 성별을 예측하는 Logistic Regression알고리즘을 이용하여 작성해보았습니다.

반응형