We-Co

[We-Co] Pyspark Xgboost - Spark, MLlib Pipelines, 수요 예측 본문

Spark

[We-Co] Pyspark Xgboost - Spark, MLlib Pipelines, 수요 예측

위기의코딩맨 2022. 3. 15. 15:23
반응형

안녕하세요. 위기의 코딩맨입니다.

오늘은 pyspark의 xgboost의 예제를 한번 풀어보도록 하겠습니다.

예제는 Databricks의 공식 문서를 참고하여 작성했습니다.

 

databricks 문서

 

xgboost-pyspark - Databricks

 

docs.databricks.com

 

해당 문제는 XGBoost 및 MLlib 파이프라인을 사용한 회귀 분석의 예제입니다.

또한, 자전거 공유 데이터셋을 사용했으며,

해당 데이터를 통해 시간당 자전거 대여 횟수를 예측하는 문제입니다.

즉, 많은 분야에 응용할 수 있는 수요를 예측하는 예제입니다.

 

데이터 셋은 UCI Machine Learning Repository에서 가져왔으며,

2011~2012년에 캐피털 자전거 공유 시스템의 자전거 대여 정보가 포함되어 있습니다.

 

이제 간단한 소개는 끝났으며

Databricks를 이용하여 한번 작성해 보도록 하겠습니다.

해당 문제는 Databricks Runtime for Machine Learning 7.6 ML 이상의 Cluster를 만들고 적용해 주어야합니다.

적용하고

sparkdl xgboost의 XgboostRegressor를 불러와 import 해주도록 합니다.

from sparkdl.xgboost import XgboostRegressor

 

 

 

df에 spark를 사용하여 csv 파일을 읽어 해당 값을 넣어주도록 합니다.

df.cache()를 사요하면 해당 csv의 타입을 확인할 수 있습니다.

df = spark.read.csv("/databricks-datasets/bikeSharing/data-001/hour.csv", header="true", inferSchema="true")
df.cache()
Out[6]: DataFrame[
instant: int,
dteday: timestamp,
season: int,
yr: int,
mnth: int,
hr: int,
holiday: int,
weekday: int,
workingday: int,
weathersit: int,
temp: double,
atemp: double,
hum: double,
windspeed: double,
casual: int,
registered: int,
cnt: int]
 

df에 할당된 데이터를 확인해 보도록 하겠습니다.

display(df.limit(10))

결과

해당 데이터들이 잘 들어온 것을 확인할 수 있습니다. 

df에 저장된 개수는 17379개의 Row를 갖고있습니다.

해당 데이터들의 기본적인 설명입니다.

 

Feature columns 
dteday : 날짜
season : 계절(1:봄, 2:여름, 3:가을, 4:겨울)
yr : 연도(0:2011, 1:2012)
mnth : 월(1 ~ 12)
hr : 시간(0 ~ 23)
holiday : 휴일인 경우 1개, 휴일인 경우 0개
weekday : 요일(0 ~ 6)
workingday : 주말 또는 공휴일인 경우 0, 그렇지 않은 경우 1
weathersit : (1:맑음, 2:기온 또는 구름, 3:약간 비 또는 눈, 4:폭우 또는 눈)
temp : 정규화된 온도(섭씨)
atemp : 정상화된 체감온도(섭씨)
hum : 정상화된 습도
windspeed : 표준화된 풍속 


Label Columns  
casual : 캐주얼 사용자 수
registered : 등록된 사용자 수
cnt : 캐주얼 및 등록 자전거 모두를 포함한 총 렌털 자전거 수

데이터의 첫번째 줄은 2011년 1월 1일 자정 ~ 새벽 1시 사이에 16명이 자전거를 빌렸다는 것을 보여줍니다. 

공식문서에서 해당 데이터 셋은 거의 모든 데이터가 숫자로 저장되어 있어 Machine Learning에 적합하다고 말하고있습니다.

 

그리고 중복된 데이터들이 존재하여 몇몇 컬럼을 삭제했습니다.

yr, mnth, hr 등 시간 정보가 포함되어 있으므로, dteday 컬럼을 삭제하고

cnt에 모든 사용자가 저장되어 있어 casual, registred 컬럼을 삭제합니다.

또한 사용하지 않을 instant 컬럼도 삭제합니다.

 

df = df.drop("instant").drop("dteday").drop("casual").drop("registered")
display(df)

결과

해당 컬럼들이 삭제된 것을 확인할 수 있습니다.

 

그리고 학습 및 테스트 데이터 셋을 나누어야 합니다.

해당 문서에서는 랜덤하게 0.7, 0.3로 나누어 주도록 했습니다. (70프로 학습, 30프로 테스트)

train, test = df.randomSplit([0.7, 0.3], seed = 0)
print("There are %d training examples and %d test examples." % (train.count(), test.count()))
There are 12081 training examples and 5298 test examples.
12081개의 학습 데이터, 5298개의 테스트 데이터로 나누었습니다.

그리고 시간단 사용자를 시각화하는 방식을 진행했습니다. 

display(train.select("hr", "cnt"))

dispaly를 사용하여 데이터를 확인하고

기본 데이터

해당 데이터 시각화는 Line을 선택하고, Plot option은 문서에서 지시하는 옵션을 선택했습니다.

데이터를 확인해보면 16~18시에 가장 사용자가 많은 것을 확인할 수 있습니다.

이제 학습을 진행하기 위한 작업을 해보도록 하겠습니다.

먼저 해당 데이터를 저장하고 VectorAssemble, VectoIndexer 작업을 진행합니다.

 

VectorAssembler :  피쳐 열을 피쳐 벡터로 조립합니다.
VectorIndexer :  범주형으로 처리해야 하는 열을 식별합니다.

 

이 작업은 작은 수의 고유 값을 갖는 열을 범주형으로 식별하고 휴리스틱 방식으로 수행되며,

이 예제에서 범주형 열은 yr(2개 값), 계절(4개 값), 휴일(2개 값), 근무일(2개 값) 및 날씨 상황(4개 값)으로 간주됩니다.

위에 데이터 설명할 때 상황에 따른 값을 나타냅니다.

날씨를 예를 들면 (1:맑음, 2:기온 또는 구름, 3:약간 비 또는 눈, 4:폭우 또는 눈), 즉 4개의 값으로 간주합니다.

 

from pyspark.ml.feature import VectorAssembler, VectorIndexer
  
featuresCols = df.columns
featuresCols.remove('cnt')
  
vectorAssembler = VectorAssembler(inputCols=featuresCols, outputCol="rawFeatures") 
vectorIndexer = VectorIndexer(inputCol="rawFeatures", outputCol="features", maxCategories=4)

 

다음으로는 모델을 설정합니다.

기본적으로 features를 사용하며, cnt를 예측하는 방법으로 학습하는 XgboostRegressor 모델을 선택합니다.

from sparkdl.xgboost import XgboostRegressor 
xgb_regressor = XgboostRegressor(num_workers=3, labelCol="cnt", missing=0.0)

 

 

다음으로  방금 정의한 모형을 교차 검증 단계에서 래핑하는 것입니다. 

CrossValidator는 서로 다른 하이퍼 매개 변수 설정을 사용하여 XgboostRegressor 추정기를 호출합니다.

여러 모델을 교육하고 지정된 메트릭을 최소화하여 최상의 모델을 선택합니다.

 

maxdepth 트리 최대 깊이, maxIter 반복, 총 트리 수를 설정하고

evaluator를 설정합니다. rmse(평균 제곱근 편차)로 설정을 진행하고 cv로 만들어 주도록합니다.

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
  
paramGrid = ParamGridBuilder()\
  .addGrid(xgb_regressor.max_depth, [2, 5])\
  .addGrid(xgb_regressor.n_estimators, [10, 100])\
  .build()
 
evaluator = RegressionEvaluator(metricName="rmse",
                                labelCol=xgb_regressor.getLabelCol(),
                                predictionCol=xgb_regressor.getPredictionCol())
 
cv = CrossValidator(estimator=xgb_regressor, evaluator=evaluator, estimatorParamMaps=paramGrid)

 rmse는 모델이나 추정자가 예측한 값 ( 표본 또는 모집단 값 )과 관측된 값 간의 차이를 측정하는 데 자주 사용됩니다.

 

이제 이 단계들을 PipeLine을 통해 단계를 설정해 주도록 합니다.

from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[vectorAssembler, vectorIndexer, cv])

그리고 pipeline의 fit()을 통해 학습을 진행합니다.

pipelineModel = pipeline.fit(train)

반환된 모델을 토대로 테스트 데이터를 입력하여 예측이 실행된 결과를 확인해보도록 하겠습니다.

predictions = pipelineModel.transform(test)
display(predictions.select("cnt", "prediction", *featuresCols))

결과

display(predictions.select("hr", "prediction"))

해당 결과를 시각화를 진행해서

위에서 시각화 했던 내용과 예측 시각화 결과를 비교해 보도록 하겠습니다.

비교

기존의 내용과 예측한 결과와 유사한 것을 확인할 수 있었습니다.

이 모델을 토대로 시간, 날씨 등의 데이터를 넣어주면 해당 수요를 예측할 수 있을 것입니다.

 

오늘은 Spark의 xgboost 기법을 사용해서 Databricks에서 제공하는 공식문서를 토대로 예제 문제를 풀어보았습니다.

 

반응형