반응형

PySpark DataFrame 을 이용한 탐색적 데이터 분석 및 처리 

/* DeepPlay 2022-08-26 */

 

* 본 포스팅에서는 pyspark 가 설치되어있음을 가정합니다. 

 

최초로 데이터를 받은 이후에, 가장 먼저할 작업은 탐색적 데이터 분석이다. 탐색적 데이터 분석을 통해 각 변수별로 처리 방법을 계획하여, 분석하기 쉬운 형태의 데이터로 만들게 된다. 이 과정에서 변수의 변환, 제거, 생성, 결측값 처리 등의 절차를 수행하게 된다. 본 포스팅은 pyspark 를 통해 데이터를 탐색적으로 확인하는 방법과 간단한 데이터 처리 방법 몇 가지를 다룬다. 

 

pyspark 를 통한 탐색적 분석 문법은 pandas 와 유사한 부분도 있고, 그렇지 않은 부분도 있다. 자주 등장하는 pyspark 의 특이적인 문법이 존재하는데, R, pandas 만 활용해온 사람이라면 이러한 부분에 점점 익숙해질 필요가 있을 것 같다. 

 

필요한 데이터 다운받기: Chronic Kidney Disease Dataset

https://archive.ics.uci.edu/ml/datasets/Chronic_Kidney_Disease

 

환경세팅

- MAC OS 에서 python 3.8 환경에 pyspark 를 설치하여 실습함 

# 데이터 다운로드 받기 
# https://www.kaggle.com/datasets/mansoordaku/ckdisease?resource=download
import pyspark
import pandas as pd
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
import os
import sys

# 환경변수 PYSPARK_PYTHON 와 PYSPARK_DRIVER_PYTHON 의 python path 를 동일하게 맞춰준다. 
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

# 환경 세팅
conf_spark = SparkConf().set("spark.driver.host", "127.0.0.1") # local 환경임을 명시적으로 지정해준다. 
sc = SparkContext(conf=conf_spark)

 

0. 데이터 로드

pyspark 에서 csv 파일을 load 하는 것은 아래 문법으로 수행할 수 있다. 

# data = pd.read_csv("data/kidney_disease.csv") # pandas 에서 csv 파일 읽기 
spark = SparkSession.builder.master("local[1]").appName("kidney_disease").getOrCreate()
data = spark.read.option("header", True).csv("data/kidney_disease.csv") # header가 있는 경우, option 구문 추가 
data_pd = pd.read_csv("data/kidney_disease.csv")

 

1. Schema: 데이터 스키마 확인하기

어떤 구조와 변수형으로 데이터가 들어가 있는지 먼저 파악한다. 

- 데이터를 직접 추출하여 만들었다면, 각 컬럼이 무엇을 의미하는지 알겠지만..
- 연습을 위한 토이 데이터이거나, 다른 사람한테 받은 데이터라면, 각 컬럼이 무엇을 의미하는지 이해하고 넘어갈 필요가 있다.

- kidney_disease 데이터의 변수명 의미 확인하기: https://archive.ics.uci.edu/ml/datasets/Chronic_Kidney_Disease

# 모든 필드가 string 타입으로 설정되어 있는 것을 확인할 수 있음  
data.printSchema()
root
 |-- id: string (nullable = true)
 |-- age: string (nullable = true)
 |-- bp: string (nullable = true)
 |-- sg: string (nullable = true)
 |-- al: string (nullable = true)
 |-- su: string (nullable = true)
 |-- rbc: string (nullable = true)

 

2. Show data: 데이터 확인하기

- csv 파일에서 공백인 경우에는 null 값으로 채워졌다는 것을 알 수 있다.

- 각 변수들의 타입을 눈으로 한 번 확인한다.(연속형, 범주형) 

data.show() 
# data.head() 도 사용 가능함
+---+----+-----+-----+----+----+--------+--------+----------+----------+-----+-----+----+-----+----+----+----+-----+----+---+---+---+-----+---+---+--------------+
| id| age|   bp|   sg|  al|  su|     rbc|      pc|       pcc|        ba|  bgr|   bu|  sc|  sod| pot|hemo| pcv|   wc|  rc|htn| dm|cad|appet| pe|ane|classification|
+---+----+-----+-----+----+----+--------+--------+----------+----------+-----+-----+----+-----+----+----+----+-----+----+---+---+---+-----+---+---+--------------+
|  0|48.0| 80.0| 1.02| 1.0| 0.0|    null|  normal|notpresent|notpresent|121.0| 36.0| 1.2| null|null|15.4|  44| 7800| 5.2|yes|yes| no| good| no| no|           ckd|
|  1| 7.0| 50.0| 1.02| 4.0| 0.0|    null|  normal|notpresent|notpresent| null| 18.0| 0.8| null|null|11.3|  38| 6000|null| no| no| no| good| no| no|           ckd|

 

3. Size: 데이터의 규모 확인하기

# 400 X 26 size  
print(f"Row count: {data.count()}")
print(f"Column count: {len(data.columns)}")
Row count: 400
Column count: 26

 

4. Univariate distribution (numeric): 단변수 분포 확인하기

#### string -> double 형변환

우선 연속형 변수를 double 형으로 변환해준다. 여기서도 withColumn 구문을 활용할 수 있다. 

# 연속형 변수들은 double 로 형변환 수행 
numeric_cols = ['age', 'bp', 'sg', 'al', 'su', 'bgr', 'bu', 'sc', 'sod', 'hemo', 'pcv', 'wc', 'rc']
for c in numeric_cols:
    data = data.withColumn(c, data[c].cast('double'))

#### describe 함수: 기본 통계치 추출 

data.describe(numeric_cols).show()
+-------+-----------------+-----------------+--------------------+------------------+-------------------+-----------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+
|summary|              age|               bp|                  sg|                al|                 su|              bgr|                bu|                sc|               sod|              hemo|              pcv|                wc|                rc|
+-------+-----------------+-----------------+--------------------+------------------+-------------------+-----------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+
|  count|              391|              388|                 353|               354|                351|              356|               381|               383|               313|               348|              329|               294|               269|
|   mean|51.48337595907928|76.46907216494846|  1.0174079320113256|1.0169491525423728|0.45014245014245013|148.0365168539326|57.425721784776904|3.0724543080939934|137.52875399361022|12.526436781609195|38.88449848024316| 8406.122448979591| 4.707434944237919|
| stddev|17.16971408926224|13.68363749352527|0.005716616974376756|1.3526789127628445|  1.099191251885407|79.28171423511773|50.503005849222504| 5.741126066859789|10.408752051798777| 2.912586608826765|8.990104814740933|2944.4741904103385|1.0253232655721791|
|    min|              2.0|             50.0|               1.005|               0.0|                0.0|             22.0|               1.5|               0.4|               4.5|               3.1|              9.0|            2200.0|               2.1|
|    max|             90.0|            180.0|               1.025|               5.0|                5.0|            490.0|             391.0|              76.0|             163.0|              17.8|             54.0|           26400.0|               8.0|
+-------+-----------------+-----------------+--------------------+------------------+-------------------+-----------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+

위 결과를 pandas dataframe 으로 변환하고, transpose 를 시켜 보기 쉬운 데이터 형태로 변환한다. 

# 첫번째 로우를 header 로 변경 
numeric_dist = data.describe(numeric_cols).toPandas().T
new_header = numeric_dist.iloc[0] #grab the first row for the header
numeric_dist = numeric_dist[1:] #take the data less the header row
numeric_dist.columns = new_header #set the header row as the df header

# 인덱스컬럼의 값을 추출해서 새로운 컬럼으로 지정
numeric_dist['col'] = numeric_dist.index.values
numeric_dist.reset_index(drop=True, inplace=True)

 

#### agg 함수 사용: quantile 값 추출 

- 아래와 같이 agg 함수를 활용해 연속형 변수의 quantile 값을 추출할 수 있다. 

- pyspark.sql.functions 의 expr 함수를 활용해, agg 함수 내에 원하는 집계 방법을 string 형태로 넣어서 quantile 을 구할 수 있다. 

import pyspark.sql.functions as F

perc = pd.DataFrame({'col':[], '%25':[], '%50':[], '%75':[], '%90':[], '%99':[]})
for c in numeric_cols :
    result_new = data.agg(
             F.expr('percentile(' + c + ',array(0.25))')[0].alias('%25'),
             F.expr('percentile(' + c + ',array(0.50))')[0].alias('%50'),
             F.expr('percentile(' + c + ',array(0.75))')[0].alias('%75'),
             F.expr('percentile(' + c + ',array(0.90))')[0].alias('%90'),
             F.expr('percentile(' + c + ',array(0.99))')[0].alias('%99'),
    ).toPandas()
    result_new['col'] = c
    perc = pd.concat([perc, result_new], axis=0)

#### 두 결과 테이블 조인하기 

- 위 두 테이블을 조인하여 최종적으로 원하는 형태의 연속형 변수 분포 테이블을 만들어 볼 수 있다. 

numeric_dist_merge = pd.merge(numeric_dist, perc, how="inner", left_on="col", right_on="col")
numeric_dist_merge[['col', 'count', 'mean', '%25', '%50', '%75', '%90', '%99']]

5. Distribution: 단변수 분포 확인하기 - Categorical variable

#### 범주형 변수에서 unique 한 값 추출하기

- 아래 스크립트에서 rdd.map 이 pyspark 특이적인 문법이라고 볼 수 있다. 

ㄴ rdd 는 dataframe 을 rdd 형태로 다룰 것임을 의미하며, rdd 에 lambda 함수를 적용해 결과를 추출한다. 

ㄴ collect 함수는 결과를 list 형태로 만들어준다. 

ㄴ 이러한 문법은 pandas 나 R 에서는 잘 나오지 않기 때문에 익숙해질 필요가 있을 것 같다. 

from pyspark.sql.functions import when
from pyspark.sql import functions as F

# Label 변수 확인하기 
print(f"labels: {data.select('classification').distinct().rdd.map(lambda r: r[0]).collect()}")
# data_pd['classification'].unique() # pandas 의 경우는 이와 같이 한다.
labels: ['notckd', 'ckd', 'ckd\t']

#### 분포 확인하기 

- sql.function 내 agg 함수 내에 원하는 집계 (count) 함수를 넣어, 아래와 같이 그룹별 행수를 계산할 수 있다. 

result = data.select(['classification']).\
   groupBy('classification').\
   agg(F.count('classification').alias('user_count')).toPandas()
result

 

6. Drop useless columns: 컬럼 드랍하기 (Option)

- 필요 없는 컬럼이 있는 경우 drop 함수로 드랍한다.

data = data.drop('pot')
print(f"Row count: {data.count()}")
print(f"Column count: {len(data.columns)}")
Row count: 400
Column count: 25

 

7. Subsetting: 원하는 행만 남기기 

- filter 함수를 통해 원하는 행만 남길 수 있다. 

data_ckd = data.filter(data['classification'] == 'ckd')
data_ckd.show()
+---+----+-----+-----+----+----+--------+--------+----------+----------+-----+-----+----+-----+----+----+-----+----+---+---+---+-----+---+---+--------------+
| id| age|   bp|   sg|  al|  su|     rbc|      pc|       pcc|        ba|  bgr|   bu|  sc|  sod|hemo| pcv|   wc|  rc|htn| dm|cad|appet| pe|ane|classification|
+---+----+-----+-----+----+----+--------+--------+----------+----------+-----+-----+----+-----+----+----+-----+----+---+---+---+-----+---+---+--------------+
|  0|48.0| 80.0| 1.02| 1.0| 0.0|    null|  normal|notpresent|notpresent|121.0| 36.0| 1.2| null|15.4|  44| 7800| 5.2|yes|yes| no| good| no| no|           ckd|
|  1| 7.0| 50.0| 1.02| 4.0| 0.0|    null|  normal|notpresent|notpresent| null| 18.0| 0.8| null|11.3|  38| 6000|null| no| no| no| good| no| no|           ckd|

 

8. Check missing values: 컬럼별 결측값 개수/결측률 확인하기

- 이 데이터에서는 결측값이 null 로 들어가 있다. 

- 컬럼별로 결측값 개수를 구하는 것은 다양한 방식으로 구현할 수 있지만, 가장 간단하게 for 을 활용하는 방법은 아래와 같다. 

# 각 컬럼별로 결측값 개수를 확인함 
# 만약 결측값 코딩이 0으로 되어 있는 경우, filter 조건을 변경하면 된다. 
result = pd.DataFrame({'variable':[], 'missing_count':[]})
for col in data.columns:
    df_new = pd.DataFrame({'variable':[], 'missing_count':[]})
    df_new.at[0, 'variable'] = col
    df_new.at[0, 'missing_count'] = data.filter(data[col].isNull()).count()
    result = pd.concat([result, df_new], axis=0)
    
result.astype({'missing_count': 'int32'}) # missing_count 을 int 형으로 변환함 
result['missing_rate'] = result['missing_count'] / data.count() # 결측률 계산
# 결측률이 높은 순서대로 출력한다. 
# rbc 컬럼의 결측율이 38% 로 가장 높은 것을 확인할 수 있다. 
result.sort_values(by=['missing_rate'], ascending=False)

 

9. Replace: 값 치환하기 

- 데이터를 다루다보면 종종 값을 치환해야할 경우가 있다. (값이 잘못 들어가 있는 경우, 카테고리를 합치는 경우 등..)

- withColumn 구문과 when 구문을 합쳐여 값 치환을 구현해볼 수 있다. 

 

# label 에 'ckd\t' 로 데이터가 잘못 들어가 있는 것을 확인할 수 있다. 
# 이를 when 구문을 통해 원하는 값으로 바꾸어준다. 
data = data.withColumn("classification", \
              when(data["classification"] == 'ckd\t', 'ckd').otherwise(data["classification"]))

# 재출력하기 
print(f"처리후 labels: {data.select('classification').distinct().rdd.map(lambda r: r[0]).collect()}")
처리후 labels: ['notckd', 'ckd']

 

10. Imputation: 평균대치법 (for numeric variable)

- 본 데이터셋은 결측값이 null 로 채워져 있다. 평균 대치법으로 결측값을 처리하자. 

- 먼저, 특정 컬럼의 평균을 구하는 함수를 작성한다. 

ㄴ pyspark 에서는 이와 관련한 readymade 함수가 없기 때문에 직접 작성한다. 

- 여기서도 df.select(avg(df[col]))) 이나 rdd.map 같은 pyspark 특이적 문법이 등장한다. 

from pyspark.sql.functions import avg
from pyspark.sql.functions import when, lit

# 컬럼별 평균값 계산하기 
def mean_of_pyspark_columns(df, numeric_cols, verbose=False):
    col_with_mean=[]
    for col in numeric_cols:
        mean_value = df.select(avg(df[col]))
        avg_col = mean_value.columns[0]
        res = mean_value.rdd.map(lambda row : row[avg_col]).collect()
        
        if (verbose==True): print(mean_value.columns[0], "\t", res[0])
        col_with_mean.append([col, res[0]])    
    return col_with_mean

각 컬럼들에 대해 평균값을 구한다. 

numeric_cols = ['age', 'bp', 'sg', 'al', 'su', 'bgr', 'bu', 'sc', 'sod', 'hemo', 'pcv', 'wc', 'rc']
col_with_mean = mean_of_pyspark_columns(data, numeric_cols)
col_with_mean
[['age', 51.48337595907928],
 ['bp', 76.46907216494846],
 ['sg', 1.0174079320113256],
 ['al', 1.0169491525423728],
 ['su', 0.45014245014245013],
 ['bgr', 148.0365168539326],
 ['bu', 57.425721784776904],
 ['sc', 3.0724543080939934],
 ['sod', 137.52875399361022],
 ['hemo', 12.526436781609195],
 ['pcv', 38.88449848024316],
 ['wc', 8406.122448979591],
 ['rc', 4.707434944237919]]

각 컬럼별로 반복문을 돌면서, null 인 경우 평균으로 replace 를 수행한다. 

# with column when 구문으로 null 일 때, 평균으로 치환한다. 
for col, mean in col_with_mean:
    data = data.withColumn(col, when(data[col].isNull() == True, 
    lit(mean)).otherwise(data[col]))

 

11. Imputation: Mode 대치 (for categorical variable)

- 본 데이터셋은 결측값이 null 로 채워져 있다. 최빈값 대치법으로 결측값을 처리해보자. 

- 마찬가지로 컬럼별로 최빈값을 구하는 함수를 작성한다. 

def mode_of_pyspark_columns(df, cat_col_list, verbose=False):
    col_with_mode=[]
    for col in cat_col_list:
        #Filter null
        df = df.filter(df[col].isNull()==False)
        #Find unique_values_with_count
        unique_classes = df.select(col).distinct().rdd.map(lambda x: x[0]).collect()
        unique_values_with_count=[]
        for uc in unique_classes:
             unique_values_with_count.append([uc, df.filter(df[col]==uc).count()])
        #sort unique values w.r.t their count values
        sorted_unique_values_with_count= sorted(unique_values_with_count, key = lambda x: x[1], reverse =True)
        
        if (verbose==True): print(col, sorted_unique_values_with_count, " and mode is ", sorted_unique_values_with_count[0][0])
        col_with_mode.append([col, sorted_unique_values_with_count[0][0]])
    return col_with_mode

각 컬럼들에 대해 최빈값을 구한다. 

cat_cols = ['rbc', 'pc', 'pcc', 'ba', 'htn', 'dm', 'cad', 'appet', 'pe', 'ane']
col_with_mode = mode_of_pyspark_columns(data, cat_cols)
col_with_mode
[['rbc', 'normal'],
 ['pc', 'normal'],
 ['pcc', 'notpresent'],
 ['ba', 'notpresent'],
 ['htn', 'no'],
 ['dm', 'no'],
 ['cad', 'no'],
 ['appet', 'good'],
 ['pe', 'no'],
 ['ane', 'no']]

각 컬럼별로 반복문을 돌면서, null 인 경우 최빈값으로 replace 를 수행한다. 

for col, mode in col_with_mode:
    data = data.withColumn(col, when(data[col].isNull()==True, 
    lit(mode)).otherwise(data[col]))

결측값 치환이 잘 되었는지를 확인한다. 결측값이 없는 데이터가 되었음을 확인할 수 있다. 

# 각 컬럼별로 결측값 개수를 확인함 
# 만약 결측값 코딩이 0으로 되어 있는 경우, filter 조건을 변경하면 된다. 
result = pd.DataFrame({'variable':[], 'missing_count':[]})
for col in data.columns:
    df_new = pd.DataFrame({'variable':[], 'missing_count':[]})
    df_new.at[0, 'variable'] = col
    df_new.at[0, 'missing_count'] = data.filter(data[col].isNull()).count()
    result = pd.concat([result, df_new], axis=0)
    
result.astype({'missing_count': 'int32'}) # missing_count 을 int 형으로 변환함 
result['missing_rate'] = result['missing_count'] / data.count() # 결측률 계산 

# 결측률이 높은 순서대로 출력한다. 
# rbc 컬럼의 결측율이 38% 로 가장 높은 것을 확인할 수 있다. 
result.sort_values(by=['missing_rate'], ascending=False)

 

참고 자료

 

A Brief Introduction to PySpark

PySpark is a great language for performing exploratory data analysis at scale, building machine learning pipelines, and creating ETLs for…

towardsdatascience.com

 

 

 

 

 

 

반응형