Pyspark MLlib 를 활용한 모델링 기초 (w/ Random Forest)

/* DeepPlay 2022-09-20 */

 

개요

1) 배경: scalable machine learning 이란?

근래 빅데이터쪽에서 scalability 라는 개념이 buzzword 중 하나이다. scalability는 스케일 아웃에 관한 것이며, 스케일 아웃이란 "처리 능력을 향상시키기 위해 트래픽을 분산시킬 서버의 수를 늘리는 방법" 을 의미한다. 즉, scalable 머신러닝이란, 머신러닝을 빠르게 처리하기 위해 여러 컴퓨터에 분산시켜 일을 시키는 것으로 정의해볼 수 있다. 빅데이터를 어떻게 효율적으로 처리할 것인가? 와 관련한 논의 및 발전이 머신러닝 쪽에서도 이루어지고 있다고 볼 수 있겠다. 

 

2) 데이터

필요한 데이터 다운받기: Chronic Kidney Disease Dataset

https://archive.ics.uci.edu/ml/datasets/Chronic_Kidney_Disease

 

3) 요점 

Pyspark MLlib 를 사용할 때 아래 사항을 주의해야함

1. Assembling : 모든 피쳐를 하나의 컬럼에 리스트 형태로 몰아 넣는 것 

2. Type casting: Assembling 한 피쳐는 동일한 타입이어야함. 

3. Missing value handling: Assebling 한 피쳐는 결측이 없어야 함 (NA 가 아닌 0 또는 imputation 된 값이어야 함)

 

0. Load data 

# 데이터 다운로드 받기 
# https://www.kaggle.com/datasets/mansoordaku/ckdisease?resource=download
import pyspark
import pandas as pd
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
import os
import sys

# 환경변수 PYSPARK_PYTHON 와 PYSPARK_DRIVER_PYTHON 의 python path 를 동일하게 맞춰준다. 
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

# 환경 세팅
conf_spark = SparkConf().set("spark.driver.host", "127.0.0.1") # local 환경임을 명시적으로 지정해준다. 
sc = SparkContext(conf=conf_spark)

# data = pd.read_csv("data/kidney_disease.csv") # pandas 에서 csv 파일 읽기 
spark = SparkSession.builder.master("local[1]").appName("kidney_disease").getOrCreate()
data = spark.read.option("header", True).csv("data/kidney_disease.csv") # header가 있는 경우, option 구문 추가 
data_pd = pd.read_csv("data/kidney_disease.csv")

1. Preprocessing

#### Imputation : Categorical variable

각 변수별 결측치 개수 확인하기

# 범주형 변수 리스트 
cat_cols = ['al', 'su', 'rbc', 'pc', 'pcc', 'ba', 'htn', 'dm', 'cad', 'appet', 'pe', 'ane']

# 각 컬럼별로 결측값 개수를 확인함 
# 만약 결측값 코딩이 0으로 되어 있는 경우, filter 조건을 변경하면 된다. 
result = pd.DataFrame({'variable':[], 'missing_count':[]})
for col in data.columns:
    df_new = pd.DataFrame({'variable':[], 'missing_count':[]})
    df_new.at[0, 'variable'] = col
    df_new.at[0, 'missing_count'] = data.filter(data[col].isNull()).count()
    result = pd.concat([result, df_new], axis=0)
    
result.astype({'missing_count': 'int32'}) # missing_count 을 int 형으로 변환함 
result['missing_rate'] = result['missing_count'] / data.count() # 결측률 계산 
result

각 변수별 최빈값 Imputation

from pyspark.sql.functions import when, lit
def mode_of_pyspark_columns(df, cat_col_list, verbose=False):
    col_with_mode=[]
    for col in cat_col_list:
        #Filter null
        df = df.filter(df[col].isNull()==False)
        #Find unique_values_with_count
        unique_classes = df.select(col).distinct().rdd.map(lambda x: x[0]).collect()
        unique_values_with_count=[]
        for uc in unique_classes:
             unique_values_with_count.append([uc, df.filter(df[col]==uc).count()])
        #sort unique values w.r.t their count values
        sorted_unique_values_with_count= sorted(unique_values_with_count, key = lambda x: x[1], reverse =True)
        
        if (verbose==True): print(col, sorted_unique_values_with_count, " and mode is ", sorted_unique_values_with_count[0][0])
        col_with_mode.append([col, sorted_unique_values_with_count[0][0]])
    return col_with_mode

col_with_mode = mode_of_pyspark_columns(data, cat_cols)

for col, mode in col_with_mode:
    data = data.withColumn(col, when(data[col].isNull()==True, 
    lit(mode)).otherwise(data[col]))

#### Imputation : Numeric variable

각 변수별 평균값 치환 코드

numeric_cols=['age', 'bp', 'sg','bgr', 'bu', 'sc', 'sod', 'pot', 'hemo', 'pcv', 'rc', 'wc']

from pyspark.sql.functions import avg
from pyspark.sql.functions import when, lit

# 컬럼별 평균값 계산하기 
def mean_of_pyspark_columns(df, numeric_cols, verbose=False):
    col_with_mean=[]
    for col in numeric_cols:
        mean_value = df.select(avg(df[col]))
        avg_col = mean_value.columns[0]
        res = mean_value.rdd.map(lambda row : row[avg_col]).collect()
        
        if (verbose==True): print(mean_value.columns[0], "\t", res[0])
        col_with_mean.append([col, res[0]])    
    return col_with_mean

col_with_mean = mean_of_pyspark_columns(data, numeric_cols)

# with column when 구문으로 null 일 때, 평균으로 치환한다. 
for col, mean in col_with_mean:
    data = data.withColumn(col, when(data[col].isNull() == True, 
    lit(mean)).otherwise(data[col]))

 

2. Categorical variable encoding

범주형 피쳐 인코딩

데이터에 관한 지식을 활용해 모든 피쳐들이 binary 또는 ordinal 변수라는 것을 확인한다. 이 경우, Pyspark 의 StringIndexer 를 활용하여 encoding 해볼 수 있다. 예를 들어, normal/abnormal 의 경우, binary 변수이며, good/poor 등도 각 값들이 독립적이지 않고, 등급이 존재하는 ordinal 변수라는 것을 알 수 있다.

data.select(cat_cols).show()
+---+---+--------+--------+----------+----------+---+---+---+-----+---+---+
| al| su|     rbc|      pc|       pcc|        ba|htn| dm|cad|appet| pe|ane|
+---+---+--------+--------+----------+----------+---+---+---+-----+---+---+
|1.0|0.0|  normal|  normal|notpresent|notpresent|yes|yes| no| good| no| no|
|4.0|0.0|  normal|  normal|notpresent|notpresent| no| no| no| good| no| no|
|2.0|3.0|  normal|  normal|notpresent|notpresent| no|yes| no| poor| no|yes|
|4.0|0.0|  normal|abnormal|   present|notpresent|yes| no| no| poor|yes|yes|

 

각 변수별로 suffix로 Index를 붙힌 새로운 변수를 만든다.

# Categorical 변수들을 StringIndexer 을 이용해 Numeric 변수로 변환함 
from pyspark.ml.feature import StringIndexer 

indexed = data
for col in cat_cols:
    indexer = StringIndexer(inputCol=col, outputCol=col + "Index")
    indexed = indexer.fit(indexed).transform(indexed)

예를 들어, 아래와 같이 새로운 컬럼이 생성된다. 

# 각 범주형 변수별로 Index 변수를 아래와 같이 만든다.
indexed.select(['id', 'pc', 'pcIndex']).show()
| id|      pc|pcIndex|
+---+--------+-------+
|  0|  normal|    0.0|
|  1|  normal|    0.0|
|  2|  normal|    0.0|
|  3|abnormal|    1.0|

 

레이블 변수 인코딩

분류 문제의 경우 레이블은 범주형 변수로 마찬가지로 인코딩이 필요하다. 실제 유니크 레이블들을 확인해보니 잘못 들어간 값이 존재하여 이를 제대로된 값으로 치환해준다. 

indexed.select('classification').distinct().show()
+--------------+
|classification|
+--------------+
|        notckd|
|           ckd|
|         ckd\t|
+--------------+

값 치환 

# 잘못된 값이 있기 때문에 이를 replace 해준다. 
indexed = indexed.withColumn("classification", \
              when(indexed["classification"] == 'ckd\t', 'ckd').otherwise(indexed["classification"]))

결과를 보면, notckd 를 1, ckd를 0으로 인코딩 해주었는데, 반대로 notckd를 0, ckd를 1로 바꾸어 인코딩해보자. 

from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="classification", outputCol="label")
indexed = indexer.fit(indexed).transform(indexed)
indexed.select('label').distinct().show()
+--------------+-----+
|classification|label|
+--------------+-----+
|        notckd|  1.0|
|           ckd|  0.0|
+--------------+-----+
indexed = indexed.withColumn("label", when(indexed["label"] == 1, 0).otherwise(1))

 

2. Typecasting of Features

- Numeric 변수들을 Double 형으로 변환한다.
- 이 때 주의할점은, 만약 원래 numeric 변수인데, 오류로 인해 문자가 들어있는 경우, Double 형변환을 하면 값이 null 이 된다.
- 따라서 확인하고, imputation 을 한 번 더 수행해야할 수도 있다. 

 

numeric 변수들을 double 형으로 변환하기

from pyspark.sql.types import DoubleType
for col in numeric_cols:
    indexed = indexed.withColumn(col, indexed[col].cast(DoubleType()))
# with column when 구문으로 null 일 때, 평균으로 치환한다. 
for col, mean in col_with_mean:
    indexed = indexed.withColumn(col, when(indexed[col].isNull() == True, 
    lit(mean)).otherwise(indexed[col]))

 

3. Assembling of Input Features

- feature 를 assemble 하여 하나의 필드로 변환 후, 이후 피쳐 변환이나 모델링 절차 등이 수행한다. 

 

피쳐들의 Type 재확인

-> 모두 double type 이라는 것을 알 수 있음

for col in indexed[feature_list].dtypes:
    print(col[0]+" , "+col[1])

 

VectorAssemble 활용

피쳐들을 assemble 한 features 라고 하는 컬럼을 생성한다. inputCols 파라미터에 피쳐명을 리스트 형태로 지정하고, outputCols 에 assemble 한 값을 넣을 컬럼명을 넣는다. 

# 주의 사항: Assemble 을 할 때, 데이터에 결측이 없어야함 
from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler(inputCols=feature_list,
                                  outputCol="features")
                                  
features_vectorized = vectorAssembler.transform(indexed)
features_vectorized.select(['id', 'features']).show(5)
+---+--------------------+
| id|            features|
+---+--------------------+
|  0|[1.0,0.0,0.0,0.0,...|
|  1|(24,[0,12,13,14,1...|
|  2|[2.0,2.0,0.0,0.0,...|
|  3|[4.0,0.0,0.0,1.0,...|
|  4|(24,[0,12,13,14,1...|
+---+--------------------+

 

 

4. Normalization of Input Features 

- 피쳐들은 모두 같은 스케일이 아니다. 
- 피쳐들을 같은 스케일로 변환하면, 모델링 단계에서 더 좋은 결과를 보여준다. 

- Scaling 을 할 수도 있고, Normalization 을 할 수도 있다. 이 둘을 interchangable 하게 말하기도 하지만, 정확히는 분포의 모양을 바꾸지 않는 경우 scaling, 분포를 바꾸는 경우 normalization 이라고 한다. (https://www.kaggle.com/code/alexisbcook/scaling-and-normalization/tutorial)


scaling 이나 normalization 은 왜 필요할까?

변수의 scale 이 모델 성능에 미치는 영향을 줄여줌. 피쳐들의 스케일까지 학습해야 하는 부담을 줄일 수 있다. 

ㄴ 만약 normalization 이 적절히 수행된다면, 단점은 없고 장점만 있기 때문에 일반적으로 많이 수행한다. 

 

Normalizer 를 활용한 standard scaling 

inputCols 로 assemble 한 컬럼을 넣어주고, outputCols 로 normalization 된 피쳐를 넣을 컬럼명을 지정해준다. p는 normalization 을 할 때, L1 norm 을 이용하는 것을 의미한다. (default 는 L2 norm 이다.) 본 포스팅에서 normalization 수식이나 변환 결과는 생략한다. 

from pyspark.ml.feature import Normalizer

normalizer = Normalizer(inputCol="features", outputCol="features_norm", p=1.0)
l1NormData = normalizer.transform(features_vectorized)
l1NormData.select(['features','features_norm']).show()
+--------------------+--------------------+
|            features|       features_norm|
+--------------------+--------------------+
|[1.0,0.0,0.0,0.0,...|[1.20525839810946...|
|(24,[0,12,13,14,1...|(24,[0,12,13,14,1...|
|[2.0,2.0,0.0,0.0,...|[2.40521254800404...|
|[4.0,0.0,0.0,1.0,...|[5.58159914210821...|
|(24,[0,12,13,14,1...|(24,[0,12,13,14,1...|
preprocessed_data = l1NormData

 

 

5. Spiliting into train and test set

#Split the dataset into training and testing dataset
splits = preprocessed_data.randomSplit([0.8, 0.2])
df_train = splits[0]
df_test = splits[1]

 

6. Train & Predict

pyspark 에서 학습하고 예측하는 일에 대해 fit, transform  이라는 용어를 사용한다. fit 은 학습 데이터를 입력 받아 모델을 만드는 작업업을 의미하며, transform 은 예측하고 싶은 데이터를 입력 받아, 학습된 모델로 그 데이터의 label을 예측한다. 

from pyspark.ml.classification import RandomForestClassifier

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="label", featuresCol="features_norm", numTrees=10)
rf_model = rf.fit(df_train)
prediction = rf_model.transform(df_train)

transform 을 수행하면, rawPrediction, probability, prediction 이라는 컬럼을 데이터에 추가해준다. 

- rawPrediction (list): random forest 의 각 tree 들이 각 클래스에 속한다고 예측한 값들의 합이다. 

- probabilty (list): rawPrediction 을 0~1 사이로 scaling 한 값 

- prediction (element): probabilty 를 토대로 가장 가능성이 높은 label 예측한 값 

prediction.select(['id', 'rawPrediction', 'probability', 'prediction']).show()
+---+--------------------+--------------------+----------+
| id|       rawPrediction|         probability|prediction|
+---+--------------------+--------------------+----------+
|  0|[0.94594594594594...|[0.09459459459459...|       1.0|
|  1|[2.06800093632958...|[0.20680009363295...|       1.0|
| 10|[0.01515151515151...|[0.00151515151515...|       1.0|
|100|[0.20461020461020...|[0.02046102046102...|       1.0|
|102|[4.91567118722564...|[0.49156711872256...|       1.0|

 

7. Evaluation

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

binEval = MulticlassClassificationEvaluator().setMetricName("accuracy") .setPredictionCol("prediction").setLabelCol("label")
print(f'training accuracy: {binEval.evaluate(prediction)}')

prediction_test = rf_model.transform(df_test)
binEval = MulticlassClassificationEvaluator().setMetricName("accuracy") .setPredictionCol("prediction").setLabelCol("label")
print(f'test accuracy: {binEval.evaluate(prediction_test)}')
training accuracy: 0.9900662251655629
test accuracy: 0.9489795918367347
 

참고문서

https://medium.com/@aieeshashafique/machine-learning-with-pysparkmlib-and-random-forest-solving-a-chronic-kidney-disease-problem-752287d5ffd0
https://www.silect.is/blog/random-forest-models-in-spark-ml/  

- https://dzone.com/articles/what-scalable-machine-learning