Pyspark MLlib 를 활용한 모델링 기초 (w/ Random Forest)
/* DeepPlay 2022-09-20 */
개요
1) 배경: scalable machine learning 이란?
근래 빅데이터쪽에서 scalability 라는 개념이 buzzword 중 하나이다. scalability는 스케일 아웃에 관한 것이며, 스케일 아웃이란 "처리 능력을 향상시키기 위해 트래픽을 분산시킬 서버의 수를 늘리는 방법" 을 의미한다. 즉, scalable 머신러닝이란, 머신러닝을 빠르게 처리하기 위해 여러 컴퓨터에 분산시켜 일을 시키는 것으로 정의해볼 수 있다. 빅데이터를 어떻게 효율적으로 처리할 것인가? 와 관련한 논의 및 발전이 머신러닝 쪽에서도 이루어지고 있다고 볼 수 있겠다.
2) 데이터
필요한 데이터 다운받기: Chronic Kidney Disease Dataset
https://archive.ics.uci.edu/ml/datasets/Chronic_Kidney_Disease
3) 요점
Pyspark MLlib 를 사용할 때 아래 사항을 주의해야함
1. Assembling : 모든 피쳐를 하나의 컬럼에 리스트 형태로 몰아 넣는 것
2. Type casting: Assembling 한 피쳐는 동일한 타입이어야함.
3. Missing value handling: Assebling 한 피쳐는 결측이 없어야 함 (NA 가 아닌 0 또는 imputation 된 값이어야 함)
0. Load data
# 데이터 다운로드 받기
# https://www.kaggle.com/datasets/mansoordaku/ckdisease?resource=download
import pyspark
import pandas as pd
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
import os
import sys
# 환경변수 PYSPARK_PYTHON 와 PYSPARK_DRIVER_PYTHON 의 python path 를 동일하게 맞춰준다.
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
# 환경 세팅
conf_spark = SparkConf().set("spark.driver.host", "127.0.0.1") # local 환경임을 명시적으로 지정해준다.
sc = SparkContext(conf=conf_spark)
# data = pd.read_csv("data/kidney_disease.csv") # pandas 에서 csv 파일 읽기
spark = SparkSession.builder.master("local[1]").appName("kidney_disease").getOrCreate()
data = spark.read.option("header", True).csv("data/kidney_disease.csv") # header가 있는 경우, option 구문 추가
data_pd = pd.read_csv("data/kidney_disease.csv")
1. Preprocessing
#### Imputation : Categorical variable
각 변수별 결측치 개수 확인하기
# 범주형 변수 리스트
cat_cols = ['al', 'su', 'rbc', 'pc', 'pcc', 'ba', 'htn', 'dm', 'cad', 'appet', 'pe', 'ane']
# 각 컬럼별로 결측값 개수를 확인함
# 만약 결측값 코딩이 0으로 되어 있는 경우, filter 조건을 변경하면 된다.
result = pd.DataFrame({'variable':[], 'missing_count':[]})
for col in data.columns:
df_new = pd.DataFrame({'variable':[], 'missing_count':[]})
df_new.at[0, 'variable'] = col
df_new.at[0, 'missing_count'] = data.filter(data[col].isNull()).count()
result = pd.concat([result, df_new], axis=0)
result.astype({'missing_count': 'int32'}) # missing_count 을 int 형으로 변환함
result['missing_rate'] = result['missing_count'] / data.count() # 결측률 계산
result

각 변수별 최빈값 Imputation
from pyspark.sql.functions import when, lit
def mode_of_pyspark_columns(df, cat_col_list, verbose=False):
col_with_mode=[]
for col in cat_col_list:
#Filter null
df = df.filter(df[col].isNull()==False)
#Find unique_values_with_count
unique_classes = df.select(col).distinct().rdd.map(lambda x: x[0]).collect()
unique_values_with_count=[]
for uc in unique_classes:
unique_values_with_count.append([uc, df.filter(df[col]==uc).count()])
#sort unique values w.r.t their count values
sorted_unique_values_with_count= sorted(unique_values_with_count, key = lambda x: x[1], reverse =True)
if (verbose==True): print(col, sorted_unique_values_with_count, " and mode is ", sorted_unique_values_with_count[0][0])
col_with_mode.append([col, sorted_unique_values_with_count[0][0]])
return col_with_mode
col_with_mode = mode_of_pyspark_columns(data, cat_cols)
for col, mode in col_with_mode:
data = data.withColumn(col, when(data[col].isNull()==True,
lit(mode)).otherwise(data[col]))
#### Imputation : Numeric variable
각 변수별 평균값 치환 코드
numeric_cols=['age', 'bp', 'sg','bgr', 'bu', 'sc', 'sod', 'pot', 'hemo', 'pcv', 'rc', 'wc']
from pyspark.sql.functions import avg
from pyspark.sql.functions import when, lit
# 컬럼별 평균값 계산하기
def mean_of_pyspark_columns(df, numeric_cols, verbose=False):
col_with_mean=[]
for col in numeric_cols:
mean_value = df.select(avg(df[col]))
avg_col = mean_value.columns[0]
res = mean_value.rdd.map(lambda row : row[avg_col]).collect()
if (verbose==True): print(mean_value.columns[0], "\t", res[0])
col_with_mean.append([col, res[0]])
return col_with_mean
col_with_mean = mean_of_pyspark_columns(data, numeric_cols)
# with column when 구문으로 null 일 때, 평균으로 치환한다.
for col, mean in col_with_mean:
data = data.withColumn(col, when(data[col].isNull() == True,
lit(mean)).otherwise(data[col]))
2. Categorical variable encoding
범주형 피쳐 인코딩
데이터에 관한 지식을 활용해 모든 피쳐들이 binary 또는 ordinal 변수라는 것을 확인한다. 이 경우, Pyspark 의 StringIndexer 를 활용하여 encoding 해볼 수 있다. 예를 들어, normal/abnormal 의 경우, binary 변수이며, good/poor 등도 각 값들이 독립적이지 않고, 등급이 존재하는 ordinal 변수라는 것을 알 수 있다.
data.select(cat_cols).show()
+---+---+--------+--------+----------+----------+---+---+---+-----+---+---+
| al| su| rbc| pc| pcc| ba|htn| dm|cad|appet| pe|ane|
+---+---+--------+--------+----------+----------+---+---+---+-----+---+---+
|1.0|0.0| normal| normal|notpresent|notpresent|yes|yes| no| good| no| no|
|4.0|0.0| normal| normal|notpresent|notpresent| no| no| no| good| no| no|
|2.0|3.0| normal| normal|notpresent|notpresent| no|yes| no| poor| no|yes|
|4.0|0.0| normal|abnormal| present|notpresent|yes| no| no| poor|yes|yes|
각 변수별로 suffix로 Index를 붙힌 새로운 변수를 만든다.
# Categorical 변수들을 StringIndexer 을 이용해 Numeric 변수로 변환함
from pyspark.ml.feature import StringIndexer
indexed = data
for col in cat_cols:
indexer = StringIndexer(inputCol=col, outputCol=col + "Index")
indexed = indexer.fit(indexed).transform(indexed)
예를 들어, 아래와 같이 새로운 컬럼이 생성된다.
# 각 범주형 변수별로 Index 변수를 아래와 같이 만든다.
indexed.select(['id', 'pc', 'pcIndex']).show()
| id| pc|pcIndex|
+---+--------+-------+
| 0| normal| 0.0|
| 1| normal| 0.0|
| 2| normal| 0.0|
| 3|abnormal| 1.0|
레이블 변수 인코딩
분류 문제의 경우 레이블은 범주형 변수로 마찬가지로 인코딩이 필요하다. 실제 유니크 레이블들을 확인해보니 잘못 들어간 값이 존재하여 이를 제대로된 값으로 치환해준다.
indexed.select('classification').distinct().show()
+--------------+
|classification|
+--------------+
| notckd|
| ckd|
| ckd\t|
+--------------+
값 치환
# 잘못된 값이 있기 때문에 이를 replace 해준다.
indexed = indexed.withColumn("classification", \
when(indexed["classification"] == 'ckd\t', 'ckd').otherwise(indexed["classification"]))
결과를 보면, notckd 를 1, ckd를 0으로 인코딩 해주었는데, 반대로 notckd를 0, ckd를 1로 바꾸어 인코딩해보자.
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="classification", outputCol="label")
indexed = indexer.fit(indexed).transform(indexed)
indexed.select('label').distinct().show()
+--------------+-----+
|classification|label|
+--------------+-----+
| notckd| 1.0|
| ckd| 0.0|
+--------------+-----+
indexed = indexed.withColumn("label", when(indexed["label"] == 1, 0).otherwise(1))
2. Typecasting of Features
- Numeric 변수들을 Double 형으로 변환한다.
- 이 때 주의할점은, 만약 원래 numeric 변수인데, 오류로 인해 문자가 들어있는 경우, Double 형변환을 하면 값이 null 이 된다.
- 따라서 확인하고, imputation 을 한 번 더 수행해야할 수도 있다.
numeric 변수들을 double 형으로 변환하기
from pyspark.sql.types import DoubleType
for col in numeric_cols:
indexed = indexed.withColumn(col, indexed[col].cast(DoubleType()))
# with column when 구문으로 null 일 때, 평균으로 치환한다.
for col, mean in col_with_mean:
indexed = indexed.withColumn(col, when(indexed[col].isNull() == True,
lit(mean)).otherwise(indexed[col]))
3. Assembling of Input Features
- feature 를 assemble 하여 하나의 필드로 변환 후, 이후 피쳐 변환이나 모델링 절차 등이 수행한다.
피쳐들의 Type 재확인
-> 모두 double type 이라는 것을 알 수 있음
for col in indexed[feature_list].dtypes:
print(col[0]+" , "+col[1])
VectorAssemble 활용
피쳐들을 assemble 한 features 라고 하는 컬럼을 생성한다. inputCols 파라미터에 피쳐명을 리스트 형태로 지정하고, outputCols 에 assemble 한 값을 넣을 컬럼명을 넣는다.
# 주의 사항: Assemble 을 할 때, 데이터에 결측이 없어야함
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols=feature_list,
outputCol="features")
features_vectorized = vectorAssembler.transform(indexed)
features_vectorized.select(['id', 'features']).show(5)
+---+--------------------+
| id| features|
+---+--------------------+
| 0|[1.0,0.0,0.0,0.0,...|
| 1|(24,[0,12,13,14,1...|
| 2|[2.0,2.0,0.0,0.0,...|
| 3|[4.0,0.0,0.0,1.0,...|
| 4|(24,[0,12,13,14,1...|
+---+--------------------+
4. Normalization of Input Features
- 피쳐들은 모두 같은 스케일이 아니다.
- 피쳐들을 같은 스케일로 변환하면, 모델링 단계에서 더 좋은 결과를 보여준다.
- Scaling 을 할 수도 있고, Normalization 을 할 수도 있다. 이 둘을 interchangable 하게 말하기도 하지만, 정확히는 분포의 모양을 바꾸지 않는 경우 scaling, 분포를 바꾸는 경우 normalization 이라고 한다. (https://www.kaggle.com/code/alexisbcook/scaling-and-normalization/tutorial)
scaling 이나 normalization 은 왜 필요할까?
변수의 scale 이 모델 성능에 미치는 영향을 줄여줌. 피쳐들의 스케일까지 학습해야 하는 부담을 줄일 수 있다.
ㄴ 만약 normalization 이 적절히 수행된다면, 단점은 없고 장점만 있기 때문에 일반적으로 많이 수행한다.
Normalizer 를 활용한 standard scaling
inputCols 로 assemble 한 컬럼을 넣어주고, outputCols 로 normalization 된 피쳐를 넣을 컬럼명을 지정해준다. p는 normalization 을 할 때, L1 norm 을 이용하는 것을 의미한다. (default 는 L2 norm 이다.) 본 포스팅에서 normalization 수식이나 변환 결과는 생략한다.
from pyspark.ml.feature import Normalizer
normalizer = Normalizer(inputCol="features", outputCol="features_norm", p=1.0)
l1NormData = normalizer.transform(features_vectorized)
l1NormData.select(['features','features_norm']).show()
+--------------------+--------------------+
| features| features_norm|
+--------------------+--------------------+
|[1.0,0.0,0.0,0.0,...|[1.20525839810946...|
|(24,[0,12,13,14,1...|(24,[0,12,13,14,1...|
|[2.0,2.0,0.0,0.0,...|[2.40521254800404...|
|[4.0,0.0,0.0,1.0,...|[5.58159914210821...|
|(24,[0,12,13,14,1...|(24,[0,12,13,14,1...|
preprocessed_data = l1NormData
5. Spiliting into train and test set
#Split the dataset into training and testing dataset
splits = preprocessed_data.randomSplit([0.8, 0.2])
df_train = splits[0]
df_test = splits[1]
6. Train & Predict
pyspark 에서 학습하고 예측하는 일에 대해 fit, transform 이라는 용어를 사용한다. fit 은 학습 데이터를 입력 받아 모델을 만드는 작업업을 의미하며, transform 은 예측하고 싶은 데이터를 입력 받아, 학습된 모델로 그 데이터의 label을 예측한다.
from pyspark.ml.classification import RandomForestClassifier
# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="label", featuresCol="features_norm", numTrees=10)
rf_model = rf.fit(df_train)
prediction = rf_model.transform(df_train)
transform 을 수행하면, rawPrediction, probability, prediction 이라는 컬럼을 데이터에 추가해준다.
- rawPrediction (list): random forest 의 각 tree 들이 각 클래스에 속한다고 예측한 값들의 합이다.
- probabilty (list): rawPrediction 을 0~1 사이로 scaling 한 값
- prediction (element): probabilty 를 토대로 가장 가능성이 높은 label 예측한 값
prediction.select(['id', 'rawPrediction', 'probability', 'prediction']).show()
+---+--------------------+--------------------+----------+
| id| rawPrediction| probability|prediction|
+---+--------------------+--------------------+----------+
| 0|[0.94594594594594...|[0.09459459459459...| 1.0|
| 1|[2.06800093632958...|[0.20680009363295...| 1.0|
| 10|[0.01515151515151...|[0.00151515151515...| 1.0|
|100|[0.20461020461020...|[0.02046102046102...| 1.0|
|102|[4.91567118722564...|[0.49156711872256...| 1.0|
7. Evaluation
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
binEval = MulticlassClassificationEvaluator().setMetricName("accuracy") .setPredictionCol("prediction").setLabelCol("label")
print(f'training accuracy: {binEval.evaluate(prediction)}')
prediction_test = rf_model.transform(df_test)
binEval = MulticlassClassificationEvaluator().setMetricName("accuracy") .setPredictionCol("prediction").setLabelCol("label")
print(f'test accuracy: {binEval.evaluate(prediction_test)}')
training accuracy: 0.9900662251655629
test accuracy: 0.9489795918367347
참고문서
- https://medium.com/@aieeshashafique/machine-learning-with-pysparkmlib-and-random-forest-solving-a-chronic-kidney-disease-problem-752287d5ffd0
- https://www.silect.is/blog/random-forest-models-in-spark-ml/
'Tools > Python' 카테고리의 다른 글
Pyspark export to delimited file (0) | 2022.11.15 |
---|---|
Jupyter notebook에서 외부 파일을 자동으로 갱신하는 방법 (0) | 2022.10.19 |
PySpark DataFrame 을 이용한 탐색적 데이터 분석 및 처리 (0) | 2022.08.26 |
PySpark DataFrame 을 사용하는 이유와 pandas 와의 차이점 (0) | 2022.08.26 |
아나콘다 python 3 가상환경 세팅하기 (0) | 2022.06.12 |