Tools/Python (38)

 

Pyspark export to delimited file

def myConcat(*cols):
    concat_columns = []
    for c in cols[:-1]:
        concat_columns.append(F.coalesce(c, F.lit("*")))
        concat_columns.append(F.lit("\t"))  
    concat_columns.append(F.coalesce(cols[-1], F.lit("*")))
    return F.concat(*concat_columns)

# combined column 에 모든 변수를 \t 로 concat 한 값 저장 
data_text = data.withColumn("combined", myConcat(*data.columns)).select("combined")
data_text.coalesce(1).write.format("text").option("header", "false").mode("overwrite").save(path)

출처 : https://stackoverflow.com/questions/17837871/how-to-copy-file-from-hdfs-to-the-local-file-system

 

Jupyter notebook에서 python 파일을 자동으로 갱신하는 방법

 

예를 들어 jupyter notebook 파일에서 model.py 라는 파일을 import 해서 사용하고 있다고 하자. 

만약 model.py 파일을 수정하면, 커널을 재시작한 후, 다시 import 를 해야한다. 

아래 구문을 통해 model.py 파일이 수정되는 즉시 적용되도록 만들 수 있다. 

%load_ext autoreload
%autoreload 2

Pyspark MLlib 를 활용한 모델링 기초 (w/ Random Forest)

/* DeepPlay 2022-09-20 */

 

개요

1) 배경: scalable machine learning 이란?

근래 빅데이터쪽에서 scalability 라는 개념이 buzzword 중 하나이다. scalability는 스케일 아웃에 관한 것이며, 스케일 아웃이란 "처리 능력을 향상시키기 위해 트래픽을 분산시킬 서버의 수를 늘리는 방법" 을 의미한다. 즉, scalable 머신러닝이란, 머신러닝을 빠르게 처리하기 위해 여러 컴퓨터에 분산시켜 일을 시키는 것으로 정의해볼 수 있다. 빅데이터를 어떻게 효율적으로 처리할 것인가? 와 관련한 논의 및 발전이 머신러닝 쪽에서도 이루어지고 있다고 볼 수 있겠다. 

 

2) 데이터

필요한 데이터 다운받기: Chronic Kidney Disease Dataset

https://archive.ics.uci.edu/ml/datasets/Chronic_Kidney_Disease

 

3) 요점 

Pyspark MLlib 를 사용할 때 아래 사항을 주의해야함

1. Assembling : 모든 피쳐를 하나의 컬럼에 리스트 형태로 몰아 넣는 것 

2. Type casting: Assembling 한 피쳐는 동일한 타입이어야함. 

3. Missing value handling: Assebling 한 피쳐는 결측이 없어야 함 (NA 가 아닌 0 또는 imputation 된 값이어야 함)

 

0. Load data 

# 데이터 다운로드 받기 
# https://www.kaggle.com/datasets/mansoordaku/ckdisease?resource=download
import pyspark
import pandas as pd
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
import os
import sys

# 환경변수 PYSPARK_PYTHON 와 PYSPARK_DRIVER_PYTHON 의 python path 를 동일하게 맞춰준다. 
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

# 환경 세팅
conf_spark = SparkConf().set("spark.driver.host", "127.0.0.1") # local 환경임을 명시적으로 지정해준다. 
sc = SparkContext(conf=conf_spark)

# data = pd.read_csv("data/kidney_disease.csv") # pandas 에서 csv 파일 읽기 
spark = SparkSession.builder.master("local[1]").appName("kidney_disease").getOrCreate()
data = spark.read.option("header", True).csv("data/kidney_disease.csv") # header가 있는 경우, option 구문 추가 
data_pd = pd.read_csv("data/kidney_disease.csv")

1. Preprocessing

#### Imputation : Categorical variable

각 변수별 결측치 개수 확인하기

# 범주형 변수 리스트 
cat_cols = ['al', 'su', 'rbc', 'pc', 'pcc', 'ba', 'htn', 'dm', 'cad', 'appet', 'pe', 'ane']

# 각 컬럼별로 결측값 개수를 확인함 
# 만약 결측값 코딩이 0으로 되어 있는 경우, filter 조건을 변경하면 된다. 
result = pd.DataFrame({'variable':[], 'missing_count':[]})
for col in data.columns:
    df_new = pd.DataFrame({'variable':[], 'missing_count':[]})
    df_new.at[0, 'variable'] = col
    df_new.at[0, 'missing_count'] = data.filter(data[col].isNull()).count()
    result = pd.concat([result, df_new], axis=0)
    
result.astype({'missing_count': 'int32'}) # missing_count 을 int 형으로 변환함 
result['missing_rate'] = result['missing_count'] / data.count() # 결측률 계산 
result

각 변수별 최빈값 Imputation

from pyspark.sql.functions import when, lit
def mode_of_pyspark_columns(df, cat_col_list, verbose=False):
    col_with_mode=[]
    for col in cat_col_list:
        #Filter null
        df = df.filter(df[col].isNull()==False)
        #Find unique_values_with_count
        unique_classes = df.select(col).distinct().rdd.map(lambda x: x[0]).collect()
        unique_values_with_count=[]
        for uc in unique_classes:
             unique_values_with_count.append([uc, df.filter(df[col]==uc).count()])
        #sort unique values w.r.t their count values
        sorted_unique_values_with_count= sorted(unique_values_with_count, key = lambda x: x[1], reverse =True)
        
        if (verbose==True): print(col, sorted_unique_values_with_count, " and mode is ", sorted_unique_values_with_count[0][0])
        col_with_mode.append([col, sorted_unique_values_with_count[0][0]])
    return col_with_mode

col_with_mode = mode_of_pyspark_columns(data, cat_cols)

for col, mode in col_with_mode:
    data = data.withColumn(col, when(data[col].isNull()==True, 
    lit(mode)).otherwise(data[col]))

#### Imputation : Numeric variable

각 변수별 평균값 치환 코드

numeric_cols=['age', 'bp', 'sg','bgr', 'bu', 'sc', 'sod', 'pot', 'hemo', 'pcv', 'rc', 'wc']

from pyspark.sql.functions import avg
from pyspark.sql.functions import when, lit

# 컬럼별 평균값 계산하기 
def mean_of_pyspark_columns(df, numeric_cols, verbose=False):
    col_with_mean=[]
    for col in numeric_cols:
        mean_value = df.select(avg(df[col]))
        avg_col = mean_value.columns[0]
        res = mean_value.rdd.map(lambda row : row[avg_col]).collect()
        
        if (verbose==True): print(mean_value.columns[0], "\t", res[0])
        col_with_mean.append([col, res[0]])    
    return col_with_mean

col_with_mean = mean_of_pyspark_columns(data, numeric_cols)

# with column when 구문으로 null 일 때, 평균으로 치환한다. 
for col, mean in col_with_mean:
    data = data.withColumn(col, when(data[col].isNull() == True, 
    lit(mean)).otherwise(data[col]))

 

2. Categorical variable encoding

범주형 피쳐 인코딩

데이터에 관한 지식을 활용해 모든 피쳐들이 binary 또는 ordinal 변수라는 것을 확인한다. 이 경우, Pyspark 의 StringIndexer 를 활용하여 encoding 해볼 수 있다. 예를 들어, normal/abnormal 의 경우, binary 변수이며, good/poor 등도 각 값들이 독립적이지 않고, 등급이 존재하는 ordinal 변수라는 것을 알 수 있다.

data.select(cat_cols).show()
+---+---+--------+--------+----------+----------+---+---+---+-----+---+---+
| al| su|     rbc|      pc|       pcc|        ba|htn| dm|cad|appet| pe|ane|
+---+---+--------+--------+----------+----------+---+---+---+-----+---+---+
|1.0|0.0|  normal|  normal|notpresent|notpresent|yes|yes| no| good| no| no|
|4.0|0.0|  normal|  normal|notpresent|notpresent| no| no| no| good| no| no|
|2.0|3.0|  normal|  normal|notpresent|notpresent| no|yes| no| poor| no|yes|
|4.0|0.0|  normal|abnormal|   present|notpresent|yes| no| no| poor|yes|yes|

 

각 변수별로 suffix로 Index를 붙힌 새로운 변수를 만든다.

# Categorical 변수들을 StringIndexer 을 이용해 Numeric 변수로 변환함 
from pyspark.ml.feature import StringIndexer 

indexed = data
for col in cat_cols:
    indexer = StringIndexer(inputCol=col, outputCol=col + "Index")
    indexed = indexer.fit(indexed).transform(indexed)

예를 들어, 아래와 같이 새로운 컬럼이 생성된다. 

# 각 범주형 변수별로 Index 변수를 아래와 같이 만든다.
indexed.select(['id', 'pc', 'pcIndex']).show()
| id|      pc|pcIndex|
+---+--------+-------+
|  0|  normal|    0.0|
|  1|  normal|    0.0|
|  2|  normal|    0.0|
|  3|abnormal|    1.0|

 

레이블 변수 인코딩

분류 문제의 경우 레이블은 범주형 변수로 마찬가지로 인코딩이 필요하다. 실제 유니크 레이블들을 확인해보니 잘못 들어간 값이 존재하여 이를 제대로된 값으로 치환해준다. 

indexed.select('classification').distinct().show()
+--------------+
|classification|
+--------------+
|        notckd|
|           ckd|
|         ckd\t|
+--------------+

값 치환 

# 잘못된 값이 있기 때문에 이를 replace 해준다. 
indexed = indexed.withColumn("classification", \
              when(indexed["classification"] == 'ckd\t', 'ckd').otherwise(indexed["classification"]))

결과를 보면, notckd 를 1, ckd를 0으로 인코딩 해주었는데, 반대로 notckd를 0, ckd를 1로 바꾸어 인코딩해보자. 

from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="classification", outputCol="label")
indexed = indexer.fit(indexed).transform(indexed)
indexed.select('label').distinct().show()
+--------------+-----+
|classification|label|
+--------------+-----+
|        notckd|  1.0|
|           ckd|  0.0|
+--------------+-----+
indexed = indexed.withColumn("label", when(indexed["label"] == 1, 0).otherwise(1))

 

2. Typecasting of Features

- Numeric 변수들을 Double 형으로 변환한다.
- 이 때 주의할점은, 만약 원래 numeric 변수인데, 오류로 인해 문자가 들어있는 경우, Double 형변환을 하면 값이 null 이 된다.
- 따라서 확인하고, imputation 을 한 번 더 수행해야할 수도 있다. 

 

numeric 변수들을 double 형으로 변환하기

from pyspark.sql.types import DoubleType
for col in numeric_cols:
    indexed = indexed.withColumn(col, indexed[col].cast(DoubleType()))
# with column when 구문으로 null 일 때, 평균으로 치환한다. 
for col, mean in col_with_mean:
    indexed = indexed.withColumn(col, when(indexed[col].isNull() == True, 
    lit(mean)).otherwise(indexed[col]))

 

3. Assembling of Input Features

- feature 를 assemble 하여 하나의 필드로 변환 후, 이후 피쳐 변환이나 모델링 절차 등이 수행한다. 

 

피쳐들의 Type 재확인

-> 모두 double type 이라는 것을 알 수 있음

for col in indexed[feature_list].dtypes:
    print(col[0]+" , "+col[1])

 

VectorAssemble 활용

피쳐들을 assemble 한 features 라고 하는 컬럼을 생성한다. inputCols 파라미터에 피쳐명을 리스트 형태로 지정하고, outputCols 에 assemble 한 값을 넣을 컬럼명을 넣는다. 

# 주의 사항: Assemble 을 할 때, 데이터에 결측이 없어야함 
from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler(inputCols=feature_list,
                                  outputCol="features")
                                  
features_vectorized = vectorAssembler.transform(indexed)
features_vectorized.select(['id', 'features']).show(5)
+---+--------------------+
| id|            features|
+---+--------------------+
|  0|[1.0,0.0,0.0,0.0,...|
|  1|(24,[0,12,13,14,1...|
|  2|[2.0,2.0,0.0,0.0,...|
|  3|[4.0,0.0,0.0,1.0,...|
|  4|(24,[0,12,13,14,1...|
+---+--------------------+

 

 

4. Normalization of Input Features 

- 피쳐들은 모두 같은 스케일이 아니다. 
- 피쳐들을 같은 스케일로 변환하면, 모델링 단계에서 더 좋은 결과를 보여준다. 

- Scaling 을 할 수도 있고, Normalization 을 할 수도 있다. 이 둘을 interchangable 하게 말하기도 하지만, 정확히는 분포의 모양을 바꾸지 않는 경우 scaling, 분포를 바꾸는 경우 normalization 이라고 한다. (https://www.kaggle.com/code/alexisbcook/scaling-and-normalization/tutorial)


scaling 이나 normalization 은 왜 필요할까?

변수의 scale 이 모델 성능에 미치는 영향을 줄여줌. 피쳐들의 스케일까지 학습해야 하는 부담을 줄일 수 있다. 

ㄴ 만약 normalization 이 적절히 수행된다면, 단점은 없고 장점만 있기 때문에 일반적으로 많이 수행한다. 

 

Normalizer 를 활용한 standard scaling 

inputCols 로 assemble 한 컬럼을 넣어주고, outputCols 로 normalization 된 피쳐를 넣을 컬럼명을 지정해준다. p는 normalization 을 할 때, L1 norm 을 이용하는 것을 의미한다. (default 는 L2 norm 이다.) 본 포스팅에서 normalization 수식이나 변환 결과는 생략한다. 

from pyspark.ml.feature import Normalizer

normalizer = Normalizer(inputCol="features", outputCol="features_norm", p=1.0)
l1NormData = normalizer.transform(features_vectorized)
l1NormData.select(['features','features_norm']).show()
+--------------------+--------------------+
|            features|       features_norm|
+--------------------+--------------------+
|[1.0,0.0,0.0,0.0,...|[1.20525839810946...|
|(24,[0,12,13,14,1...|(24,[0,12,13,14,1...|
|[2.0,2.0,0.0,0.0,...|[2.40521254800404...|
|[4.0,0.0,0.0,1.0,...|[5.58159914210821...|
|(24,[0,12,13,14,1...|(24,[0,12,13,14,1...|
preprocessed_data = l1NormData

 

 

5. Spiliting into train and test set

#Split the dataset into training and testing dataset
splits = preprocessed_data.randomSplit([0.8, 0.2])
df_train = splits[0]
df_test = splits[1]

 

6. Train & Predict

pyspark 에서 학습하고 예측하는 일에 대해 fit, transform  이라는 용어를 사용한다. fit 은 학습 데이터를 입력 받아 모델을 만드는 작업업을 의미하며, transform 은 예측하고 싶은 데이터를 입력 받아, 학습된 모델로 그 데이터의 label을 예측한다. 

from pyspark.ml.classification import RandomForestClassifier

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="label", featuresCol="features_norm", numTrees=10)
rf_model = rf.fit(df_train)
prediction = rf_model.transform(df_train)

transform 을 수행하면, rawPrediction, probability, prediction 이라는 컬럼을 데이터에 추가해준다. 

- rawPrediction (list): random forest 의 각 tree 들이 각 클래스에 속한다고 예측한 값들의 합이다. 

- probabilty (list): rawPrediction 을 0~1 사이로 scaling 한 값 

- prediction (element): probabilty 를 토대로 가장 가능성이 높은 label 예측한 값 

prediction.select(['id', 'rawPrediction', 'probability', 'prediction']).show()
+---+--------------------+--------------------+----------+
| id|       rawPrediction|         probability|prediction|
+---+--------------------+--------------------+----------+
|  0|[0.94594594594594...|[0.09459459459459...|       1.0|
|  1|[2.06800093632958...|[0.20680009363295...|       1.0|
| 10|[0.01515151515151...|[0.00151515151515...|       1.0|
|100|[0.20461020461020...|[0.02046102046102...|       1.0|
|102|[4.91567118722564...|[0.49156711872256...|       1.0|

 

7. Evaluation

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

binEval = MulticlassClassificationEvaluator().setMetricName("accuracy") .setPredictionCol("prediction").setLabelCol("label")
print(f'training accuracy: {binEval.evaluate(prediction)}')

prediction_test = rf_model.transform(df_test)
binEval = MulticlassClassificationEvaluator().setMetricName("accuracy") .setPredictionCol("prediction").setLabelCol("label")
print(f'test accuracy: {binEval.evaluate(prediction_test)}')
training accuracy: 0.9900662251655629
test accuracy: 0.9489795918367347
 

참고문서

https://medium.com/@aieeshashafique/machine-learning-with-pysparkmlib-and-random-forest-solving-a-chronic-kidney-disease-problem-752287d5ffd0
https://www.silect.is/blog/random-forest-models-in-spark-ml/  

- https://dzone.com/articles/what-scalable-machine-learning

PySpark DataFrame 을 이용한 탐색적 데이터 분석 및 처리 

/* DeepPlay 2022-08-26 */

 

* 본 포스팅에서는 pyspark 가 설치되어있음을 가정합니다. 

 

최초로 데이터를 받은 이후에, 가장 먼저할 작업은 탐색적 데이터 분석이다. 탐색적 데이터 분석을 통해 각 변수별로 처리 방법을 계획하여, 분석하기 쉬운 형태의 데이터로 만들게 된다. 이 과정에서 변수의 변환, 제거, 생성, 결측값 처리 등의 절차를 수행하게 된다. 본 포스팅은 pyspark 를 통해 데이터를 탐색적으로 확인하는 방법과 간단한 데이터 처리 방법 몇 가지를 다룬다. 

 

pyspark 를 통한 탐색적 분석 문법은 pandas 와 유사한 부분도 있고, 그렇지 않은 부분도 있다. 자주 등장하는 pyspark 의 특이적인 문법이 존재하는데, R, pandas 만 활용해온 사람이라면 이러한 부분에 점점 익숙해질 필요가 있을 것 같다. 

 

필요한 데이터 다운받기: Chronic Kidney Disease Dataset

https://archive.ics.uci.edu/ml/datasets/Chronic_Kidney_Disease

 

환경세팅

- MAC OS 에서 python 3.8 환경에 pyspark 를 설치하여 실습함 

# 데이터 다운로드 받기 
# https://www.kaggle.com/datasets/mansoordaku/ckdisease?resource=download
import pyspark
import pandas as pd
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
import os
import sys

# 환경변수 PYSPARK_PYTHON 와 PYSPARK_DRIVER_PYTHON 의 python path 를 동일하게 맞춰준다. 
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

# 환경 세팅
conf_spark = SparkConf().set("spark.driver.host", "127.0.0.1") # local 환경임을 명시적으로 지정해준다. 
sc = SparkContext(conf=conf_spark)

 

0. 데이터 로드

pyspark 에서 csv 파일을 load 하는 것은 아래 문법으로 수행할 수 있다. 

# data = pd.read_csv("data/kidney_disease.csv") # pandas 에서 csv 파일 읽기 
spark = SparkSession.builder.master("local[1]").appName("kidney_disease").getOrCreate()
data = spark.read.option("header", True).csv("data/kidney_disease.csv") # header가 있는 경우, option 구문 추가 
data_pd = pd.read_csv("data/kidney_disease.csv")

 

1. Schema: 데이터 스키마 확인하기

어떤 구조와 변수형으로 데이터가 들어가 있는지 먼저 파악한다. 

- 데이터를 직접 추출하여 만들었다면, 각 컬럼이 무엇을 의미하는지 알겠지만..
- 연습을 위한 토이 데이터이거나, 다른 사람한테 받은 데이터라면, 각 컬럼이 무엇을 의미하는지 이해하고 넘어갈 필요가 있다.

- kidney_disease 데이터의 변수명 의미 확인하기: https://archive.ics.uci.edu/ml/datasets/Chronic_Kidney_Disease

# 모든 필드가 string 타입으로 설정되어 있는 것을 확인할 수 있음  
data.printSchema()
root
 |-- id: string (nullable = true)
 |-- age: string (nullable = true)
 |-- bp: string (nullable = true)
 |-- sg: string (nullable = true)
 |-- al: string (nullable = true)
 |-- su: string (nullable = true)
 |-- rbc: string (nullable = true)

 

2. Show data: 데이터 확인하기

- csv 파일에서 공백인 경우에는 null 값으로 채워졌다는 것을 알 수 있다.

- 각 변수들의 타입을 눈으로 한 번 확인한다.(연속형, 범주형) 

data.show() 
# data.head() 도 사용 가능함
+---+----+-----+-----+----+----+--------+--------+----------+----------+-----+-----+----+-----+----+----+----+-----+----+---+---+---+-----+---+---+--------------+
| id| age|   bp|   sg|  al|  su|     rbc|      pc|       pcc|        ba|  bgr|   bu|  sc|  sod| pot|hemo| pcv|   wc|  rc|htn| dm|cad|appet| pe|ane|classification|
+---+----+-----+-----+----+----+--------+--------+----------+----------+-----+-----+----+-----+----+----+----+-----+----+---+---+---+-----+---+---+--------------+
|  0|48.0| 80.0| 1.02| 1.0| 0.0|    null|  normal|notpresent|notpresent|121.0| 36.0| 1.2| null|null|15.4|  44| 7800| 5.2|yes|yes| no| good| no| no|           ckd|
|  1| 7.0| 50.0| 1.02| 4.0| 0.0|    null|  normal|notpresent|notpresent| null| 18.0| 0.8| null|null|11.3|  38| 6000|null| no| no| no| good| no| no|           ckd|

 

3. Size: 데이터의 규모 확인하기

# 400 X 26 size  
print(f"Row count: {data.count()}")
print(f"Column count: {len(data.columns)}")
Row count: 400
Column count: 26

 

4. Univariate distribution (numeric): 단변수 분포 확인하기

#### string -> double 형변환

우선 연속형 변수를 double 형으로 변환해준다. 여기서도 withColumn 구문을 활용할 수 있다. 

# 연속형 변수들은 double 로 형변환 수행 
numeric_cols = ['age', 'bp', 'sg', 'al', 'su', 'bgr', 'bu', 'sc', 'sod', 'hemo', 'pcv', 'wc', 'rc']
for c in numeric_cols:
    data = data.withColumn(c, data[c].cast('double'))

#### describe 함수: 기본 통계치 추출 

data.describe(numeric_cols).show()
+-------+-----------------+-----------------+--------------------+------------------+-------------------+-----------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+
|summary|              age|               bp|                  sg|                al|                 su|              bgr|                bu|                sc|               sod|              hemo|              pcv|                wc|                rc|
+-------+-----------------+-----------------+--------------------+------------------+-------------------+-----------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+
|  count|              391|              388|                 353|               354|                351|              356|               381|               383|               313|               348|              329|               294|               269|
|   mean|51.48337595907928|76.46907216494846|  1.0174079320113256|1.0169491525423728|0.45014245014245013|148.0365168539326|57.425721784776904|3.0724543080939934|137.52875399361022|12.526436781609195|38.88449848024316| 8406.122448979591| 4.707434944237919|
| stddev|17.16971408926224|13.68363749352527|0.005716616974376756|1.3526789127628445|  1.099191251885407|79.28171423511773|50.503005849222504| 5.741126066859789|10.408752051798777| 2.912586608826765|8.990104814740933|2944.4741904103385|1.0253232655721791|
|    min|              2.0|             50.0|               1.005|               0.0|                0.0|             22.0|               1.5|               0.4|               4.5|               3.1|              9.0|            2200.0|               2.1|
|    max|             90.0|            180.0|               1.025|               5.0|                5.0|            490.0|             391.0|              76.0|             163.0|              17.8|             54.0|           26400.0|               8.0|
+-------+-----------------+-----------------+--------------------+------------------+-------------------+-----------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+

위 결과를 pandas dataframe 으로 변환하고, transpose 를 시켜 보기 쉬운 데이터 형태로 변환한다. 

# 첫번째 로우를 header 로 변경 
numeric_dist = data.describe(numeric_cols).toPandas().T
new_header = numeric_dist.iloc[0] #grab the first row for the header
numeric_dist = numeric_dist[1:] #take the data less the header row
numeric_dist.columns = new_header #set the header row as the df header

# 인덱스컬럼의 값을 추출해서 새로운 컬럼으로 지정
numeric_dist['col'] = numeric_dist.index.values
numeric_dist.reset_index(drop=True, inplace=True)

 

#### agg 함수 사용: quantile 값 추출 

- 아래와 같이 agg 함수를 활용해 연속형 변수의 quantile 값을 추출할 수 있다. 

- pyspark.sql.functions 의 expr 함수를 활용해, agg 함수 내에 원하는 집계 방법을 string 형태로 넣어서 quantile 을 구할 수 있다. 

import pyspark.sql.functions as F

perc = pd.DataFrame({'col':[], '%25':[], '%50':[], '%75':[], '%90':[], '%99':[]})
for c in numeric_cols :
    result_new = data.agg(
             F.expr('percentile(' + c + ',array(0.25))')[0].alias('%25'),
             F.expr('percentile(' + c + ',array(0.50))')[0].alias('%50'),
             F.expr('percentile(' + c + ',array(0.75))')[0].alias('%75'),
             F.expr('percentile(' + c + ',array(0.90))')[0].alias('%90'),
             F.expr('percentile(' + c + ',array(0.99))')[0].alias('%99'),
    ).toPandas()
    result_new['col'] = c
    perc = pd.concat([perc, result_new], axis=0)

#### 두 결과 테이블 조인하기 

- 위 두 테이블을 조인하여 최종적으로 원하는 형태의 연속형 변수 분포 테이블을 만들어 볼 수 있다. 

numeric_dist_merge = pd.merge(numeric_dist, perc, how="inner", left_on="col", right_on="col")
numeric_dist_merge[['col', 'count', 'mean', '%25', '%50', '%75', '%90', '%99']]

5. Distribution: 단변수 분포 확인하기 - Categorical variable

#### 범주형 변수에서 unique 한 값 추출하기

- 아래 스크립트에서 rdd.map 이 pyspark 특이적인 문법이라고 볼 수 있다. 

ㄴ rdd 는 dataframe 을 rdd 형태로 다룰 것임을 의미하며, rdd 에 lambda 함수를 적용해 결과를 추출한다. 

ㄴ collect 함수는 결과를 list 형태로 만들어준다. 

ㄴ 이러한 문법은 pandas 나 R 에서는 잘 나오지 않기 때문에 익숙해질 필요가 있을 것 같다. 

from pyspark.sql.functions import when
from pyspark.sql import functions as F

# Label 변수 확인하기 
print(f"labels: {data.select('classification').distinct().rdd.map(lambda r: r[0]).collect()}")
# data_pd['classification'].unique() # pandas 의 경우는 이와 같이 한다.
labels: ['notckd', 'ckd', 'ckd\t']

#### 분포 확인하기 

- sql.function 내 agg 함수 내에 원하는 집계 (count) 함수를 넣어, 아래와 같이 그룹별 행수를 계산할 수 있다. 

result = data.select(['classification']).\
   groupBy('classification').\
   agg(F.count('classification').alias('user_count')).toPandas()
result

 

6. Drop useless columns: 컬럼 드랍하기 (Option)

- 필요 없는 컬럼이 있는 경우 drop 함수로 드랍한다.

data = data.drop('pot')
print(f"Row count: {data.count()}")
print(f"Column count: {len(data.columns)}")
Row count: 400
Column count: 25

 

7. Subsetting: 원하는 행만 남기기 

- filter 함수를 통해 원하는 행만 남길 수 있다. 

data_ckd = data.filter(data['classification'] == 'ckd')
data_ckd.show()
+---+----+-----+-----+----+----+--------+--------+----------+----------+-----+-----+----+-----+----+----+-----+----+---+---+---+-----+---+---+--------------+
| id| age|   bp|   sg|  al|  su|     rbc|      pc|       pcc|        ba|  bgr|   bu|  sc|  sod|hemo| pcv|   wc|  rc|htn| dm|cad|appet| pe|ane|classification|
+---+----+-----+-----+----+----+--------+--------+----------+----------+-----+-----+----+-----+----+----+-----+----+---+---+---+-----+---+---+--------------+
|  0|48.0| 80.0| 1.02| 1.0| 0.0|    null|  normal|notpresent|notpresent|121.0| 36.0| 1.2| null|15.4|  44| 7800| 5.2|yes|yes| no| good| no| no|           ckd|
|  1| 7.0| 50.0| 1.02| 4.0| 0.0|    null|  normal|notpresent|notpresent| null| 18.0| 0.8| null|11.3|  38| 6000|null| no| no| no| good| no| no|           ckd|

 

8. Check missing values: 컬럼별 결측값 개수/결측률 확인하기

- 이 데이터에서는 결측값이 null 로 들어가 있다. 

- 컬럼별로 결측값 개수를 구하는 것은 다양한 방식으로 구현할 수 있지만, 가장 간단하게 for 을 활용하는 방법은 아래와 같다. 

# 각 컬럼별로 결측값 개수를 확인함 
# 만약 결측값 코딩이 0으로 되어 있는 경우, filter 조건을 변경하면 된다. 
result = pd.DataFrame({'variable':[], 'missing_count':[]})
for col in data.columns:
    df_new = pd.DataFrame({'variable':[], 'missing_count':[]})
    df_new.at[0, 'variable'] = col
    df_new.at[0, 'missing_count'] = data.filter(data[col].isNull()).count()
    result = pd.concat([result, df_new], axis=0)
    
result.astype({'missing_count': 'int32'}) # missing_count 을 int 형으로 변환함 
result['missing_rate'] = result['missing_count'] / data.count() # 결측률 계산
# 결측률이 높은 순서대로 출력한다. 
# rbc 컬럼의 결측율이 38% 로 가장 높은 것을 확인할 수 있다. 
result.sort_values(by=['missing_rate'], ascending=False)

 

9. Replace: 값 치환하기 

- 데이터를 다루다보면 종종 값을 치환해야할 경우가 있다. (값이 잘못 들어가 있는 경우, 카테고리를 합치는 경우 등..)

- withColumn 구문과 when 구문을 합쳐여 값 치환을 구현해볼 수 있다. 

 

# label 에 'ckd\t' 로 데이터가 잘못 들어가 있는 것을 확인할 수 있다. 
# 이를 when 구문을 통해 원하는 값으로 바꾸어준다. 
data = data.withColumn("classification", \
              when(data["classification"] == 'ckd\t', 'ckd').otherwise(data["classification"]))

# 재출력하기 
print(f"처리후 labels: {data.select('classification').distinct().rdd.map(lambda r: r[0]).collect()}")
처리후 labels: ['notckd', 'ckd']

 

10. Imputation: 평균대치법 (for numeric variable)

- 본 데이터셋은 결측값이 null 로 채워져 있다. 평균 대치법으로 결측값을 처리하자. 

- 먼저, 특정 컬럼의 평균을 구하는 함수를 작성한다. 

ㄴ pyspark 에서는 이와 관련한 readymade 함수가 없기 때문에 직접 작성한다. 

- 여기서도 df.select(avg(df[col]))) 이나 rdd.map 같은 pyspark 특이적 문법이 등장한다. 

from pyspark.sql.functions import avg
from pyspark.sql.functions import when, lit

# 컬럼별 평균값 계산하기 
def mean_of_pyspark_columns(df, numeric_cols, verbose=False):
    col_with_mean=[]
    for col in numeric_cols:
        mean_value = df.select(avg(df[col]))
        avg_col = mean_value.columns[0]
        res = mean_value.rdd.map(lambda row : row[avg_col]).collect()
        
        if (verbose==True): print(mean_value.columns[0], "\t", res[0])
        col_with_mean.append([col, res[0]])    
    return col_with_mean

각 컬럼들에 대해 평균값을 구한다. 

numeric_cols = ['age', 'bp', 'sg', 'al', 'su', 'bgr', 'bu', 'sc', 'sod', 'hemo', 'pcv', 'wc', 'rc']
col_with_mean = mean_of_pyspark_columns(data, numeric_cols)
col_with_mean
[['age', 51.48337595907928],
 ['bp', 76.46907216494846],
 ['sg', 1.0174079320113256],
 ['al', 1.0169491525423728],
 ['su', 0.45014245014245013],
 ['bgr', 148.0365168539326],
 ['bu', 57.425721784776904],
 ['sc', 3.0724543080939934],
 ['sod', 137.52875399361022],
 ['hemo', 12.526436781609195],
 ['pcv', 38.88449848024316],
 ['wc', 8406.122448979591],
 ['rc', 4.707434944237919]]

각 컬럼별로 반복문을 돌면서, null 인 경우 평균으로 replace 를 수행한다. 

# with column when 구문으로 null 일 때, 평균으로 치환한다. 
for col, mean in col_with_mean:
    data = data.withColumn(col, when(data[col].isNull() == True, 
    lit(mean)).otherwise(data[col]))

 

11. Imputation: Mode 대치 (for categorical variable)

- 본 데이터셋은 결측값이 null 로 채워져 있다. 최빈값 대치법으로 결측값을 처리해보자. 

- 마찬가지로 컬럼별로 최빈값을 구하는 함수를 작성한다. 

def mode_of_pyspark_columns(df, cat_col_list, verbose=False):
    col_with_mode=[]
    for col in cat_col_list:
        #Filter null
        df = df.filter(df[col].isNull()==False)
        #Find unique_values_with_count
        unique_classes = df.select(col).distinct().rdd.map(lambda x: x[0]).collect()
        unique_values_with_count=[]
        for uc in unique_classes:
             unique_values_with_count.append([uc, df.filter(df[col]==uc).count()])
        #sort unique values w.r.t their count values
        sorted_unique_values_with_count= sorted(unique_values_with_count, key = lambda x: x[1], reverse =True)
        
        if (verbose==True): print(col, sorted_unique_values_with_count, " and mode is ", sorted_unique_values_with_count[0][0])
        col_with_mode.append([col, sorted_unique_values_with_count[0][0]])
    return col_with_mode

각 컬럼들에 대해 최빈값을 구한다. 

cat_cols = ['rbc', 'pc', 'pcc', 'ba', 'htn', 'dm', 'cad', 'appet', 'pe', 'ane']
col_with_mode = mode_of_pyspark_columns(data, cat_cols)
col_with_mode
[['rbc', 'normal'],
 ['pc', 'normal'],
 ['pcc', 'notpresent'],
 ['ba', 'notpresent'],
 ['htn', 'no'],
 ['dm', 'no'],
 ['cad', 'no'],
 ['appet', 'good'],
 ['pe', 'no'],
 ['ane', 'no']]

각 컬럼별로 반복문을 돌면서, null 인 경우 최빈값으로 replace 를 수행한다. 

for col, mode in col_with_mode:
    data = data.withColumn(col, when(data[col].isNull()==True, 
    lit(mode)).otherwise(data[col]))

결측값 치환이 잘 되었는지를 확인한다. 결측값이 없는 데이터가 되었음을 확인할 수 있다. 

# 각 컬럼별로 결측값 개수를 확인함 
# 만약 결측값 코딩이 0으로 되어 있는 경우, filter 조건을 변경하면 된다. 
result = pd.DataFrame({'variable':[], 'missing_count':[]})
for col in data.columns:
    df_new = pd.DataFrame({'variable':[], 'missing_count':[]})
    df_new.at[0, 'variable'] = col
    df_new.at[0, 'missing_count'] = data.filter(data[col].isNull()).count()
    result = pd.concat([result, df_new], axis=0)
    
result.astype({'missing_count': 'int32'}) # missing_count 을 int 형으로 변환함 
result['missing_rate'] = result['missing_count'] / data.count() # 결측률 계산 

# 결측률이 높은 순서대로 출력한다. 
# rbc 컬럼의 결측율이 38% 로 가장 높은 것을 확인할 수 있다. 
result.sort_values(by=['missing_rate'], ascending=False)

 

참고 자료

 

A Brief Introduction to PySpark

PySpark is a great language for performing exploratory data analysis at scale, building machine learning pipelines, and creating ETLs for…

towardsdatascience.com

 

 

 

 

 

 


PySpark DataFrame 을 사용하는 이유와 pandas 와의 차이점

/* DeepPlay 2022-08-26 */

 

왜 PySpark 인가?

 

한 마디로 빅데이터 환경에서 전통적인 데이터 처리 툴들 (R, pandas) 을 활용하기 어렵기 때문이다. 토이 데이터의 경우, 10GB 를 넘는 경우가 드물지만, 실제 회사의 빅데이터 환경에서는 하나의 데이터셋이 10GB 를 넘는 경우가 많으며, 크게는 10TB를 넘는 경우도 있다. 기본적으로 R과 python pandas 는 in-memory 처리 방식이다. 모든 데이터를 메모리에 적재한 후, 처리한다. 만약 램이 8GB 인 머신을 사용한다고 하면, 이러한 데이터들을 로드조차 하지 못하고, out-of-memory 에러로 커널이 죽는 모습을 확인할 수 있게 된다.

 

pyspark 환경에서는 메모리 사용량을 최소화하는 방식으로 용량이 크고, 포맷이 다양한 데이터들을 "특정 데이터 구조" 로 로드하고 처리하는 것이 가능하다. 즉, pyspark 는 시간 및 컴퓨팅 자원 측면에서 효율적으로 데이터 처리/분석을 할 수 있도록 도와준다.

 

만약, 데이터 분석 공부를 하거나, 큰 데이터를 접할 일이 없는 도메인에서 일을 하는 경우에는 R 또는 pandas 만 사용하여도 괜찮다. 하지만 빅데이터 환경에서 데이터를 처리하고 분석하는 것이 필요하다면, pyspark 를 활용하는 것이 더욱 효율적이며, 이것이 pyspark 를 배우면 좋은 이유이다. 

 

PySpark DataFrame

https://www.analyticsvidhya.com/blog/2016/10/spark-dataframe-and-operations/

 

pyspark 의 핵심 데이터 타입은 dataframe 이다. pyspark dataframe 은 쉽게 말해 여러 클러스터에 분산 되어 있는 테이블이라고 할 수 있다. 그 이름처럼 R이나 pandas의 dataframe 과 비슷한 함수들을 갖고 있다. 분산 실행을 위해서는 다른 데이터 타입이 아닌 pyspark 의 dataframe 객체를 이용하면 된다. pyspark dataframe을 활용하는 것은 R 과 pandas 와 거의 비슷하기 때문에, 둘 중 하나에 익숙한 경우 쉽게 활용할 수 있다. 

 

pyspark 와 pandas 의 큰 차이점 중 하나는 pyspark 는 lazy 하고, pandas 는 eager 하다는 것이다. pyspark 에서는 실제 결과가 필요할 때까지 실행을 유보한다 (lazy evaluation). 예를 들어, hive 환경에 있는 테이블을 읽어온 후, 특정 변수를 변환하는 코드를 짰다고 하자. 하지만 이 코드를 실행하는 즉시, 이 작업이 실행되지 않는다. 실제 데이터가 필요한 경우에만 이 작업이 실행횐다 예를 들어, 변환된 테이블을 다시 hive 환경에 파일로 저장하는 등의 경우를 들 수 있다. 이러한 방식이 좋은 점은 전체 데이터를 메모리에 저장하지 않아도 되기 때문에, 효율적으로 데이터를 처리할 수 있다. 반면, pandas 에서는 함수가 호출되는 즉시 실행되며 (eager evaluation), 모든 것은 메모리에 저장된다. pyspark 환경에서의 데이터 처리를 한다고 했을 때, eager operation 은 사용하는 것은 지양하는 것이, 리소스 절감 측면에서 바람직하다고 할 수 있다. 

 

 

참고 자료

 

아나콘다 설치 이후 프로세스

ㄴ 파이썬 3.8 버전 설치 가정

 

1. python3.8 가상환경 생성

conda create -n py38 python=3.8

 

2. 가상환경 활성화

source activate py38

 

3. ipykernel 설치

pip install ipykernel

 

4. 가상환경을 ipykernel 에 등록

python -m ipykernel install --user --name py38 --display-name py38

 

5. 주피터 노트북 or 주피터랩 실행

conda install jupyter
jupyter lab

Ubuntu 에 Python 새로운 버전 설치하기


sudo apt update
sudo apt install build-essential zlib1g-dev libncurses5-dev libgdbm-dev libnss3-dev libssl-dev libreadline-dev libffi-dev wget

필요한 패키지를 받습니다. 


cd /tmp wget https://www.python.org/ftp/python/3.7.2/Python-3.7.2.tar.xz


설치파일을 다운로드 받습니다. 


tar -xf Python-3.7.2.tar.xz cd Python-3.7.2 ./configure --enable-optimizations


설치 파일의 압축을 풀고, 인스톨 준비를 합니다. 


make -j 1
sudo make altinstall

-j 1 은 1 개의 CPU 를 이용해서 build 하겠다는 것입니다.포인트는 sudo make altinstall 을 통해 버전을 따로 관리하는 것입니다. sudo make install 을 하면, 기존 파이썬을 덮어써버리게 되므로 주의해야합니다. 이후, 커맨드 창에 python3.7 을 입력해 잘 설치되었는지 확인합니다. 


특정 버전에 pip 를 통해 패키지 설치하는 법 

예를 들어, beatifulsoup4 패키지를 설치하려면 아래와 같이 합니다. pip 자체가 파이썬 코드이기 때문에 이런식으로 원하는 파이썬 버전을 통해 pip 를 실행시켜주면 됩니다. 

python3.7 -m pip install beautifulsoup4

References
https://websiteforstudents.com/installing-the-latest-python-3-7-on-ubuntu-16-04-18-04/





Cookiecutter 패키지는 프로젝트 템플릿을 쉽게 생성해주는 파이썬 패키지입니다. 1) 자신이 만든 템플릿을 저장해서 reproducible 하게 사용할 수 있고, 2) 이미 만들어져 있는 템플릿을 불러와서 거기서부터 새로 프로젝트를 생성할 수 있습니다. 특히, cookiecutter-pypackage/ 를 많이 사용하는듯합니다. 이 템플릿은 PyPI 등록을 위한 파이썬 패키지를 위한 템플릿을 제공합니다. 기본적인 PyPI 등록을 위한 패키지 구조와 기본 파일 (setup.py 등), nox, tox, Click, travis 등과 같은 파이썬 패키지 관련 코드를 담은 파일이 탑재되어 있기 때문에 패키지 개발의 start point 로서 유용하고 실제로 많이 사용하고 있는 패키지입니다. 


homepage

https://cookiecutter.readthedocs.io


tutorial

https://cookiecutter.readthedocs.io/en/latest/tutorial1.html

https://cookiecutter-pypackage.readthedocs.io/en/latest/tutorial.html


다른 컴퓨터로 conda 가상환경 옮기는 방법


참고

conda-cheatsheet.pdf


기존에 사용하던 컴퓨터 A 에서 컴퓨터 B 로 conda 가상환경을 옮겨야할 때가 있다. 


컴퓨터 A 에서 해야할 일 


1. 가상환경 켜기


source activate [이름]


2. 가상환경이 켜진 상태에서 아래 명령어로 dependency 를 export 할수 있다.


conda env export > environment.yaml

environment.yaml 파일을 열어보면 아래와 같이 잘 export 되었다는 것을 확인할 수 있다. 



3. 현재 환경의 python 버전 체크


현재 사용하고 있는 가상환경에서 사용하고 있는 python version 을 체크한다. 


python --version
Python 3.6.6


컴퓨터 B 에서 해야할 일 


Requirements! 

  • 컴퓨터 B 에서는 컴퓨터 A 에서와 같은 anaconda (python 2 또는 python3) 를 사용해야한다. 만약 anaconda 버전이 하위버전이면 잘 안돌아갈 수도 있을듯 하다. 

4. 가상환경 생성 


conda create --name [이름] python=3.6


5. prefix 변경


environment.yaml 을 열고 원하는 경우, 가상 환경의 이름을 바꾸고, prefix 를 경로에 맞게 바꾸어준다. 예를 들어, 


prefix: C:\Users\[사용자이름] \Anaconda3\envs\[가상환경이름]


5. yaml 파일을 통한 가상환경 생성 


conda env create --file environment.yaml


이 때, 


Solving environment: failed

ResolvePackageNotFound:


에러가 뜰 수 있다. 이것은 A 컴퓨터에서 설치된 라이브러리가 B 컴퓨터에서 설치가 불가능한 것인데, 컴퓨터의 운영체제가 다른 경우에 발생하는 것으로 보인다. 해결 방법은 수동으로 ResoevePackageNotFound 에서 출력된 리스트를 environment.yaml 파일에서 지운 후, 다시 시도하면 된다. (참고)



6. 주피터를 사용하는 경우, 커널에 가상환경 등록


source activate myenv python -m ipykernel install --user --name myenv --display-name "Python (myenv)"



Jupyter 유용한 확장기능 - lab_black


설치


pip install nb_black


사용법


notebook 사용자의 경우, 첫번째 셀에 아래 코드 실행 


%load_ext nb_black


lab 사용자의 경우, 첫번째 셀에 아래 코드 실행 


%load_ext lab_black


이후 코드를 실행하면, 자동으로 black format 으로 포매팅이 되는 것을 볼 수 있다. 따로 command 를 이용해서 formatting 을 하지 않아도 실행하는 즉시 formatting 이 되기 때문에 매우 유용하다!


링크 

https://github.com/dnanhkhoa/nb_black?source=your_stories_page