가끔 .gitignore 파일에 올리고 싶지 않은 파일이나 폴더를 지정후, remote repo 에 push 를 해도 적용되지 않을 때가 있다. 

이 때는 캐시를 지우고 시도한다. 

git rm --cached -r .
git add .




Jupyter notebook에서 python 파일을 자동으로 갱신하는 방법


예를 들어 jupyter notebook 파일에서 라는 파일을 import 해서 사용하고 있다고 하자. 

만약 파일을 수정하면, 커널을 재시작한 후, 다시 import 를 해야한다. 

아래 구문을 통해 파일이 수정되는 즉시 적용되도록 만들 수 있다. 

%load_ext autoreload
%autoreload 2

Pyspark MLlib 를 활용한 모델링 기초 (w/ Random Forest)

/* DeepPlay 2022-09-20 */



1) 배경: scalable machine learning 이란?

근래 빅데이터쪽에서 scalability 라는 개념이 buzzword 중 하나이다. scalability는 스케일 아웃에 관한 것이며, 스케일 아웃이란 "처리 능력을 향상시키기 위해 트래픽을 분산시킬 서버의 수를 늘리는 방법" 을 의미한다. 즉, scalable 머신러닝이란, 머신러닝을 빠르게 처리하기 위해 여러 컴퓨터에 분산시켜 일을 시키는 것으로 정의해볼 수 있다. 빅데이터를 어떻게 효율적으로 처리할 것인가? 와 관련한 논의 및 발전이 머신러닝 쪽에서도 이루어지고 있다고 볼 수 있겠다. 


2) 데이터

필요한 데이터 다운받기: Chronic Kidney Disease Dataset


3) 요점 

Pyspark MLlib 를 사용할 때 아래 사항을 주의해야함

1. Assembling : 모든 피쳐를 하나의 컬럼에 리스트 형태로 몰아 넣는 것 

2. Type casting: Assembling 한 피쳐는 동일한 타입이어야함. 

3. Missing value handling: Assebling 한 피쳐는 결측이 없어야 함 (NA 가 아닌 0 또는 imputation 된 값이어야 함)


0. Load data 

# 데이터 다운로드 받기 
import pyspark
import pandas as pd
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
import os
import sys

# 환경변수 PYSPARK_PYTHON 와 PYSPARK_DRIVER_PYTHON 의 python path 를 동일하게 맞춰준다. 
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

# 환경 세팅
conf_spark = SparkConf().set("", "") # local 환경임을 명시적으로 지정해준다. 
sc = SparkContext(conf=conf_spark)

# data = pd.read_csv("data/kidney_disease.csv") # pandas 에서 csv 파일 읽기 
spark = SparkSession.builder.master("local[1]").appName("kidney_disease").getOrCreate()
data ="header", True).csv("data/kidney_disease.csv") # header가 있는 경우, option 구문 추가 
data_pd = pd.read_csv("data/kidney_disease.csv")

1. Preprocessing

#### Imputation : Categorical variable

각 변수별 결측치 개수 확인하기

# 범주형 변수 리스트 
cat_cols = ['al', 'su', 'rbc', 'pc', 'pcc', 'ba', 'htn', 'dm', 'cad', 'appet', 'pe', 'ane']

# 각 컬럼별로 결측값 개수를 확인함 
# 만약 결측값 코딩이 0으로 되어 있는 경우, filter 조건을 변경하면 된다. 
result = pd.DataFrame({'variable':[], 'missing_count':[]})
for col in data.columns:
    df_new = pd.DataFrame({'variable':[], 'missing_count':[]})[0, 'variable'] = col[0, 'missing_count'] = data.filter(data[col].isNull()).count()
    result = pd.concat([result, df_new], axis=0)
result.astype({'missing_count': 'int32'}) # missing_count 을 int 형으로 변환함 
result['missing_rate'] = result['missing_count'] / data.count() # 결측률 계산 

각 변수별 최빈값 Imputation

from pyspark.sql.functions import when, lit
def mode_of_pyspark_columns(df, cat_col_list, verbose=False):
    for col in cat_col_list:
        #Filter null
        df = df.filter(df[col].isNull()==False)
        #Find unique_values_with_count
        unique_classes = x: x[0]).collect()
        for uc in unique_classes:
             unique_values_with_count.append([uc, df.filter(df[col]==uc).count()])
        #sort unique values w.r.t their count values
        sorted_unique_values_with_count= sorted(unique_values_with_count, key = lambda x: x[1], reverse =True)
        if (verbose==True): print(col, sorted_unique_values_with_count, " and mode is ", sorted_unique_values_with_count[0][0])
        col_with_mode.append([col, sorted_unique_values_with_count[0][0]])
    return col_with_mode

col_with_mode = mode_of_pyspark_columns(data, cat_cols)

for col, mode in col_with_mode:
    data = data.withColumn(col, when(data[col].isNull()==True, 

#### Imputation : Numeric variable

각 변수별 평균값 치환 코드

numeric_cols=['age', 'bp', 'sg','bgr', 'bu', 'sc', 'sod', 'pot', 'hemo', 'pcv', 'rc', 'wc']

from pyspark.sql.functions import avg
from pyspark.sql.functions import when, lit

# 컬럼별 평균값 계산하기 
def mean_of_pyspark_columns(df, numeric_cols, verbose=False):
    for col in numeric_cols:
        mean_value =[col]))
        avg_col = mean_value.columns[0]
        res = row : row[avg_col]).collect()
        if (verbose==True): print(mean_value.columns[0], "\t", res[0])
        col_with_mean.append([col, res[0]])    
    return col_with_mean

col_with_mean = mean_of_pyspark_columns(data, numeric_cols)

# with column when 구문으로 null 일 때, 평균으로 치환한다. 
for col, mean in col_with_mean:
    data = data.withColumn(col, when(data[col].isNull() == True, 


2. Categorical variable encoding

범주형 피쳐 인코딩

데이터에 관한 지식을 활용해 모든 피쳐들이 binary 또는 ordinal 변수라는 것을 확인한다. 이 경우, Pyspark 의 StringIndexer 를 활용하여 encoding 해볼 수 있다. 예를 들어, normal/abnormal 의 경우, binary 변수이며, good/poor 등도 각 값들이 독립적이지 않고, 등급이 존재하는 ordinal 변수라는 것을 알 수 있다.
| al| su|     rbc|      pc|       pcc|        ba|htn| dm|cad|appet| pe|ane|
|1.0|0.0|  normal|  normal|notpresent|notpresent|yes|yes| no| good| no| no|
|4.0|0.0|  normal|  normal|notpresent|notpresent| no| no| no| good| no| no|
|2.0|3.0|  normal|  normal|notpresent|notpresent| no|yes| no| poor| no|yes|
|4.0|0.0|  normal|abnormal|   present|notpresent|yes| no| no| poor|yes|yes|


각 변수별로 suffix로 Index를 붙힌 새로운 변수를 만든다.

# Categorical 변수들을 StringIndexer 을 이용해 Numeric 변수로 변환함 
from import StringIndexer 

indexed = data
for col in cat_cols:
    indexer = StringIndexer(inputCol=col, outputCol=col + "Index")
    indexed =

예를 들어, 아래와 같이 새로운 컬럼이 생성된다. 

# 각 범주형 변수별로 Index 변수를 아래와 같이 만든다.['id', 'pc', 'pcIndex']).show()
| id|      pc|pcIndex|
|  0|  normal|    0.0|
|  1|  normal|    0.0|
|  2|  normal|    0.0|
|  3|abnormal|    1.0|


레이블 변수 인코딩

분류 문제의 경우 레이블은 범주형 변수로 마찬가지로 인코딩이 필요하다. 실제 유니크 레이블들을 확인해보니 잘못 들어간 값이 존재하여 이를 제대로된 값으로 치환해준다.'classification').distinct().show()
|        notckd|
|           ckd|
|         ckd\t|

값 치환 

# 잘못된 값이 있기 때문에 이를 replace 해준다. 
indexed = indexed.withColumn("classification", \
              when(indexed["classification"] == 'ckd\t', 'ckd').otherwise(indexed["classification"]))

결과를 보면, notckd 를 1, ckd를 0으로 인코딩 해주었는데, 반대로 notckd를 0, ckd를 1로 바꾸어 인코딩해보자. 

from import StringIndexer

indexer = StringIndexer(inputCol="classification", outputCol="label")
indexed ='label').distinct().show()
|        notckd|  1.0|
|           ckd|  0.0|
indexed = indexed.withColumn("label", when(indexed["label"] == 1, 0).otherwise(1))


2. Typecasting of Features

- Numeric 변수들을 Double 형으로 변환한다.
- 이 때 주의할점은, 만약 원래 numeric 변수인데, 오류로 인해 문자가 들어있는 경우, Double 형변환을 하면 값이 null 이 된다.
- 따라서 확인하고, imputation 을 한 번 더 수행해야할 수도 있다. 


numeric 변수들을 double 형으로 변환하기

from pyspark.sql.types import DoubleType
for col in numeric_cols:
    indexed = indexed.withColumn(col, indexed[col].cast(DoubleType()))
# with column when 구문으로 null 일 때, 평균으로 치환한다. 
for col, mean in col_with_mean:
    indexed = indexed.withColumn(col, when(indexed[col].isNull() == True, 


3. Assembling of Input Features

- feature 를 assemble 하여 하나의 필드로 변환 후, 이후 피쳐 변환이나 모델링 절차 등이 수행한다. 


피쳐들의 Type 재확인

-> 모두 double type 이라는 것을 알 수 있음

for col in indexed[feature_list].dtypes:
    print(col[0]+" , "+col[1])


VectorAssemble 활용

피쳐들을 assemble 한 features 라고 하는 컬럼을 생성한다. inputCols 파라미터에 피쳐명을 리스트 형태로 지정하고, outputCols 에 assemble 한 값을 넣을 컬럼명을 넣는다. 

# 주의 사항: Assemble 을 할 때, 데이터에 결측이 없어야함 
from import VectorAssembler

vectorAssembler = VectorAssembler(inputCols=feature_list,
features_vectorized = vectorAssembler.transform(indexed)['id', 'features']).show(5)
| id|            features|
|  0|[1.0,0.0,0.0,0.0,...|
|  1|(24,[0,12,13,14,1...|
|  2|[2.0,2.0,0.0,0.0,...|
|  3|[4.0,0.0,0.0,1.0,...|
|  4|(24,[0,12,13,14,1...|



4. Normalization of Input Features 

- 피쳐들은 모두 같은 스케일이 아니다. 
- 피쳐들을 같은 스케일로 변환하면, 모델링 단계에서 더 좋은 결과를 보여준다. 

- Scaling 을 할 수도 있고, Normalization 을 할 수도 있다. 이 둘을 interchangable 하게 말하기도 하지만, 정확히는 분포의 모양을 바꾸지 않는 경우 scaling, 분포를 바꾸는 경우 normalization 이라고 한다. (

scaling 이나 normalization 은 왜 필요할까?

변수의 scale 이 모델 성능에 미치는 영향을 줄여줌. 피쳐들의 스케일까지 학습해야 하는 부담을 줄일 수 있다. 

ㄴ 만약 normalization 이 적절히 수행된다면, 단점은 없고 장점만 있기 때문에 일반적으로 많이 수행한다. 


Normalizer 를 활용한 standard scaling 

inputCols 로 assemble 한 컬럼을 넣어주고, outputCols 로 normalization 된 피쳐를 넣을 컬럼명을 지정해준다. p는 normalization 을 할 때, L1 norm 을 이용하는 것을 의미한다. (default 는 L2 norm 이다.) 본 포스팅에서 normalization 수식이나 변환 결과는 생략한다. 

from import Normalizer

normalizer = Normalizer(inputCol="features", outputCol="features_norm", p=1.0)
l1NormData = normalizer.transform(features_vectorized)['features','features_norm']).show()
|            features|       features_norm|
preprocessed_data = l1NormData



5. Spiliting into train and test set

#Split the dataset into training and testing dataset
splits = preprocessed_data.randomSplit([0.8, 0.2])
df_train = splits[0]
df_test = splits[1]


6. Train & Predict

pyspark 에서 학습하고 예측하는 일에 대해 fit, transform  이라는 용어를 사용한다. fit 은 학습 데이터를 입력 받아 모델을 만드는 작업업을 의미하며, transform 은 예측하고 싶은 데이터를 입력 받아, 학습된 모델로 그 데이터의 label을 예측한다. 

from import RandomForestClassifier

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="label", featuresCol="features_norm", numTrees=10)
rf_model =
prediction = rf_model.transform(df_train)

transform 을 수행하면, rawPrediction, probability, prediction 이라는 컬럼을 데이터에 추가해준다. 

- rawPrediction (list): random forest 의 각 tree 들이 각 클래스에 속한다고 예측한 값들의 합이다. 

- probabilty (list): rawPrediction 을 0~1 사이로 scaling 한 값 

- prediction (element): probabilty 를 토대로 가장 가능성이 높은 label 예측한 값['id', 'rawPrediction', 'probability', 'prediction']).show()
| id|       rawPrediction|         probability|prediction|
|  0|[0.94594594594594...|[0.09459459459459...|       1.0|
|  1|[2.06800093632958...|[0.20680009363295...|       1.0|
| 10|[0.01515151515151...|[0.00151515151515...|       1.0|
|100|[0.20461020461020...|[0.02046102046102...|       1.0|
|102|[4.91567118722564...|[0.49156711872256...|       1.0|


7. Evaluation

from import MulticlassClassificationEvaluator

binEval = MulticlassClassificationEvaluator().setMetricName("accuracy") .setPredictionCol("prediction").setLabelCol("label")
print(f'training accuracy: {binEval.evaluate(prediction)}')

prediction_test = rf_model.transform(df_test)
binEval = MulticlassClassificationEvaluator().setMetricName("accuracy") .setPredictionCol("prediction").setLabelCol("label")
print(f'test accuracy: {binEval.evaluate(prediction_test)}')
training accuracy: 0.9900662251655629
test accuracy: 0.9489795918367347




R - 컬럼별 동일한 함수 적용을 위한 lapply 테크닉

/* DeepPlay 2022-09-05 */

이전 포스트: apply 계열의 R 함수 정리 (포스팅)


데이터 처리 중, 각 컬럼별로 동일한 함수를 적용시키고 싶을 때가 있다. 예를 들면, string 형태로 저장된 컬럼들을 일괄적으로 numeric으로 바꾸고 싶다고 하자. 이 때, lapply 를 유용하게 사용할 수 있다. 


정보) lapply 는 vector, list 를 인풋으로 받아 list 를 아웃풋으로 내보낸다.   


아래 함수는 vars 에 지정된 컬럼들을 lapply 함수를 활용해 일괄적으로  numeric 형으로 변환하는 코드이다.  

# vars 에 numeric 으로 변환하고 싶은 컬럼 
data[,vars] <- lapply(vars, function(x){
  as.numeric(unlist(data[,x])) # [,x] 방식의 컬럼 선택은 output 을 list 형태로 반환한다. 

위 코드를 설명하면 우선 각 컬럼 x 별로 as.numeric 함수를 적용시켜 이 값을 list of vectors 로 반환한다. 그리고 이 반환값이 dataframe 의 컬럼값을 지정하도록 수행된다. (이러한 코드가 가능한 이유는 dataframe 이 기본적으로 list 의 결합이기 때문이다.)


PySpark DataFrame 을 이용한 탐색적 데이터 분석 및 처리 

/* DeepPlay 2022-08-26 */


* 본 포스팅에서는 pyspark 가 설치되어있음을 가정합니다. 


최초로 데이터를 받은 이후에, 가장 먼저할 작업은 탐색적 데이터 분석이다. 탐색적 데이터 분석을 통해 각 변수별로 처리 방법을 계획하여, 분석하기 쉬운 형태의 데이터로 만들게 된다. 이 과정에서 변수의 변환, 제거, 생성, 결측값 처리 등의 절차를 수행하게 된다. 본 포스팅은 pyspark 를 통해 데이터를 탐색적으로 확인하는 방법과 간단한 데이터 처리 방법 몇 가지를 다룬다. 


pyspark 를 통한 탐색적 분석 문법은 pandas 와 유사한 부분도 있고, 그렇지 않은 부분도 있다. 자주 등장하는 pyspark 의 특이적인 문법이 존재하는데, R, pandas 만 활용해온 사람이라면 이러한 부분에 점점 익숙해질 필요가 있을 것 같다. 


필요한 데이터 다운받기: Chronic Kidney Disease Dataset



- MAC OS 에서 python 3.8 환경에 pyspark 를 설치하여 실습함 

# 데이터 다운로드 받기 
import pyspark
import pandas as pd
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
import os
import sys

# 환경변수 PYSPARK_PYTHON 와 PYSPARK_DRIVER_PYTHON 의 python path 를 동일하게 맞춰준다. 
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

# 환경 세팅
conf_spark = SparkConf().set("", "") # local 환경임을 명시적으로 지정해준다. 
sc = SparkContext(conf=conf_spark)


0. 데이터 로드

pyspark 에서 csv 파일을 load 하는 것은 아래 문법으로 수행할 수 있다. 

# data = pd.read_csv("data/kidney_disease.csv") # pandas 에서 csv 파일 읽기 
spark = SparkSession.builder.master("local[1]").appName("kidney_disease").getOrCreate()
data ="header", True).csv("data/kidney_disease.csv") # header가 있는 경우, option 구문 추가 
data_pd = pd.read_csv("data/kidney_disease.csv")


1. Schema: 데이터 스키마 확인하기

어떤 구조와 변수형으로 데이터가 들어가 있는지 먼저 파악한다. 

- 데이터를 직접 추출하여 만들었다면, 각 컬럼이 무엇을 의미하는지 알겠지만..
- 연습을 위한 토이 데이터이거나, 다른 사람한테 받은 데이터라면, 각 컬럼이 무엇을 의미하는지 이해하고 넘어갈 필요가 있다.

- kidney_disease 데이터의 변수명 의미 확인하기:

# 모든 필드가 string 타입으로 설정되어 있는 것을 확인할 수 있음  
 |-- id: string (nullable = true)
 |-- age: string (nullable = true)
 |-- bp: string (nullable = true)
 |-- sg: string (nullable = true)
 |-- al: string (nullable = true)
 |-- su: string (nullable = true)
 |-- rbc: string (nullable = true)


2. Show data: 데이터 확인하기

- csv 파일에서 공백인 경우에는 null 값으로 채워졌다는 것을 알 수 있다.

- 각 변수들의 타입을 눈으로 한 번 확인한다.(연속형, 범주형) 
# data.head() 도 사용 가능함
| id| age|   bp|   sg|  al|  su|     rbc|      pc|       pcc|        ba|  bgr|   bu|  sc|  sod| pot|hemo| pcv|   wc|  rc|htn| dm|cad|appet| pe|ane|classification|
|  0|48.0| 80.0| 1.02| 1.0| 0.0|    null|  normal|notpresent|notpresent|121.0| 36.0| 1.2| null|null|15.4|  44| 7800| 5.2|yes|yes| no| good| no| no|           ckd|
|  1| 7.0| 50.0| 1.02| 4.0| 0.0|    null|  normal|notpresent|notpresent| null| 18.0| 0.8| null|null|11.3|  38| 6000|null| no| no| no| good| no| no|           ckd|


3. Size: 데이터의 규모 확인하기

# 400 X 26 size  
print(f"Row count: {data.count()}")
print(f"Column count: {len(data.columns)}")
Row count: 400
Column count: 26


4. Univariate distribution (numeric): 단변수 분포 확인하기

#### string -> double 형변환

우선 연속형 변수를 double 형으로 변환해준다. 여기서도 withColumn 구문을 활용할 수 있다. 

# 연속형 변수들은 double 로 형변환 수행 
numeric_cols = ['age', 'bp', 'sg', 'al', 'su', 'bgr', 'bu', 'sc', 'sod', 'hemo', 'pcv', 'wc', 'rc']
for c in numeric_cols:
    data = data.withColumn(c, data[c].cast('double'))

#### describe 함수: 기본 통계치 추출 

|summary|              age|               bp|                  sg|                al|                 su|              bgr|                bu|                sc|               sod|              hemo|              pcv|                wc|                rc|
|  count|              391|              388|                 353|               354|                351|              356|               381|               383|               313|               348|              329|               294|               269|
|   mean|51.48337595907928|76.46907216494846|  1.0174079320113256|1.0169491525423728|0.45014245014245013|148.0365168539326|57.425721784776904|3.0724543080939934|137.52875399361022|12.526436781609195|38.88449848024316| 8406.122448979591| 4.707434944237919|
| stddev|17.16971408926224|13.68363749352527|0.005716616974376756|1.3526789127628445|  1.099191251885407|79.28171423511773|50.503005849222504| 5.741126066859789|10.408752051798777| 2.912586608826765|8.990104814740933|2944.4741904103385|1.0253232655721791|
|    min|              2.0|             50.0|               1.005|               0.0|                0.0|             22.0|               1.5|               0.4|               4.5|               3.1|              9.0|            2200.0|               2.1|
|    max|             90.0|            180.0|               1.025|               5.0|                5.0|            490.0|             391.0|              76.0|             163.0|              17.8|             54.0|           26400.0|               8.0|

위 결과를 pandas dataframe 으로 변환하고, transpose 를 시켜 보기 쉬운 데이터 형태로 변환한다. 

# 첫번째 로우를 header 로 변경 
numeric_dist = data.describe(numeric_cols).toPandas().T
new_header = numeric_dist.iloc[0] #grab the first row for the header
numeric_dist = numeric_dist[1:] #take the data less the header row
numeric_dist.columns = new_header #set the header row as the df header

# 인덱스컬럼의 값을 추출해서 새로운 컬럼으로 지정
numeric_dist['col'] = numeric_dist.index.values
numeric_dist.reset_index(drop=True, inplace=True)


#### agg 함수 사용: quantile 값 추출 

- 아래와 같이 agg 함수를 활용해 연속형 변수의 quantile 값을 추출할 수 있다. 

- pyspark.sql.functions 의 expr 함수를 활용해, agg 함수 내에 원하는 집계 방법을 string 형태로 넣어서 quantile 을 구할 수 있다. 

import pyspark.sql.functions as F

perc = pd.DataFrame({'col':[], '%25':[], '%50':[], '%75':[], '%90':[], '%99':[]})
for c in numeric_cols :
    result_new = data.agg(
             F.expr('percentile(' + c + ',array(0.25))')[0].alias('%25'),
             F.expr('percentile(' + c + ',array(0.50))')[0].alias('%50'),
             F.expr('percentile(' + c + ',array(0.75))')[0].alias('%75'),
             F.expr('percentile(' + c + ',array(0.90))')[0].alias('%90'),
             F.expr('percentile(' + c + ',array(0.99))')[0].alias('%99'),
    result_new['col'] = c
    perc = pd.concat([perc, result_new], axis=0)

#### 두 결과 테이블 조인하기 

- 위 두 테이블을 조인하여 최종적으로 원하는 형태의 연속형 변수 분포 테이블을 만들어 볼 수 있다. 

numeric_dist_merge = pd.merge(numeric_dist, perc, how="inner", left_on="col", right_on="col")
numeric_dist_merge[['col', 'count', 'mean', '%25', '%50', '%75', '%90', '%99']]

5. Distribution: 단변수 분포 확인하기 - Categorical variable

#### 범주형 변수에서 unique 한 값 추출하기

- 아래 스크립트에서 이 pyspark 특이적인 문법이라고 볼 수 있다. 

ㄴ rdd 는 dataframe 을 rdd 형태로 다룰 것임을 의미하며, rdd 에 lambda 함수를 적용해 결과를 추출한다. 

ㄴ collect 함수는 결과를 list 형태로 만들어준다. 

ㄴ 이러한 문법은 pandas 나 R 에서는 잘 나오지 않기 때문에 익숙해질 필요가 있을 것 같다. 

from pyspark.sql.functions import when
from pyspark.sql import functions as F

# Label 변수 확인하기 
print(f"labels: {'classification').distinct() r: r[0]).collect()}")
# data_pd['classification'].unique() # pandas 의 경우는 이와 같이 한다.
labels: ['notckd', 'ckd', 'ckd\t']

#### 분포 확인하기 

- sql.function 내 agg 함수 내에 원하는 집계 (count) 함수를 넣어, 아래와 같이 그룹별 행수를 계산할 수 있다. 

result =['classification']).\


6. Drop useless columns: 컬럼 드랍하기 (Option)

- 필요 없는 컬럼이 있는 경우 drop 함수로 드랍한다.

data = data.drop('pot')
print(f"Row count: {data.count()}")
print(f"Column count: {len(data.columns)}")
Row count: 400
Column count: 25


7. Subsetting: 원하는 행만 남기기 

- filter 함수를 통해 원하는 행만 남길 수 있다. 

data_ckd = data.filter(data['classification'] == 'ckd')
| id| age|   bp|   sg|  al|  su|     rbc|      pc|       pcc|        ba|  bgr|   bu|  sc|  sod|hemo| pcv|   wc|  rc|htn| dm|cad|appet| pe|ane|classification|
|  0|48.0| 80.0| 1.02| 1.0| 0.0|    null|  normal|notpresent|notpresent|121.0| 36.0| 1.2| null|15.4|  44| 7800| 5.2|yes|yes| no| good| no| no|           ckd|
|  1| 7.0| 50.0| 1.02| 4.0| 0.0|    null|  normal|notpresent|notpresent| null| 18.0| 0.8| null|11.3|  38| 6000|null| no| no| no| good| no| no|           ckd|


8. Check missing values: 컬럼별 결측값 개수/결측률 확인하기

- 이 데이터에서는 결측값이 null 로 들어가 있다. 

- 컬럼별로 결측값 개수를 구하는 것은 다양한 방식으로 구현할 수 있지만, 가장 간단하게 for 을 활용하는 방법은 아래와 같다. 

# 각 컬럼별로 결측값 개수를 확인함 
# 만약 결측값 코딩이 0으로 되어 있는 경우, filter 조건을 변경하면 된다. 
result = pd.DataFrame({'variable':[], 'missing_count':[]})
for col in data.columns:
    df_new = pd.DataFrame({'variable':[], 'missing_count':[]})[0, 'variable'] = col[0, 'missing_count'] = data.filter(data[col].isNull()).count()
    result = pd.concat([result, df_new], axis=0)
result.astype({'missing_count': 'int32'}) # missing_count 을 int 형으로 변환함 
result['missing_rate'] = result['missing_count'] / data.count() # 결측률 계산
# 결측률이 높은 순서대로 출력한다. 
# rbc 컬럼의 결측율이 38% 로 가장 높은 것을 확인할 수 있다. 
result.sort_values(by=['missing_rate'], ascending=False)


9. Replace: 값 치환하기 

- 데이터를 다루다보면 종종 값을 치환해야할 경우가 있다. (값이 잘못 들어가 있는 경우, 카테고리를 합치는 경우 등..)

- withColumn 구문과 when 구문을 합쳐여 값 치환을 구현해볼 수 있다. 


# label 에 'ckd\t' 로 데이터가 잘못 들어가 있는 것을 확인할 수 있다. 
# 이를 when 구문을 통해 원하는 값으로 바꾸어준다. 
data = data.withColumn("classification", \
              when(data["classification"] == 'ckd\t', 'ckd').otherwise(data["classification"]))

# 재출력하기 
print(f"처리후 labels: {'classification').distinct() r: r[0]).collect()}")
처리후 labels: ['notckd', 'ckd']


10. Imputation: 평균대치법 (for numeric variable)

- 본 데이터셋은 결측값이 null 로 채워져 있다. 평균 대치법으로 결측값을 처리하자. 

- 먼저, 특정 컬럼의 평균을 구하는 함수를 작성한다. 

ㄴ pyspark 에서는 이와 관련한 readymade 함수가 없기 때문에 직접 작성한다. 

- 여기서도[col]))) 이나 같은 pyspark 특이적 문법이 등장한다. 

from pyspark.sql.functions import avg
from pyspark.sql.functions import when, lit

# 컬럼별 평균값 계산하기 
def mean_of_pyspark_columns(df, numeric_cols, verbose=False):
    for col in numeric_cols:
        mean_value =[col]))
        avg_col = mean_value.columns[0]
        res = row : row[avg_col]).collect()
        if (verbose==True): print(mean_value.columns[0], "\t", res[0])
        col_with_mean.append([col, res[0]])    
    return col_with_mean

각 컬럼들에 대해 평균값을 구한다. 

numeric_cols = ['age', 'bp', 'sg', 'al', 'su', 'bgr', 'bu', 'sc', 'sod', 'hemo', 'pcv', 'wc', 'rc']
col_with_mean = mean_of_pyspark_columns(data, numeric_cols)
[['age', 51.48337595907928],
 ['bp', 76.46907216494846],
 ['sg', 1.0174079320113256],
 ['al', 1.0169491525423728],
 ['su', 0.45014245014245013],
 ['bgr', 148.0365168539326],
 ['bu', 57.425721784776904],
 ['sc', 3.0724543080939934],
 ['sod', 137.52875399361022],
 ['hemo', 12.526436781609195],
 ['pcv', 38.88449848024316],
 ['wc', 8406.122448979591],
 ['rc', 4.707434944237919]]

각 컬럼별로 반복문을 돌면서, null 인 경우 평균으로 replace 를 수행한다. 

# with column when 구문으로 null 일 때, 평균으로 치환한다. 
for col, mean in col_with_mean:
    data = data.withColumn(col, when(data[col].isNull() == True, 


11. Imputation: Mode 대치 (for categorical variable)

- 본 데이터셋은 결측값이 null 로 채워져 있다. 최빈값 대치법으로 결측값을 처리해보자. 

- 마찬가지로 컬럼별로 최빈값을 구하는 함수를 작성한다. 

def mode_of_pyspark_columns(df, cat_col_list, verbose=False):
    for col in cat_col_list:
        #Filter null
        df = df.filter(df[col].isNull()==False)
        #Find unique_values_with_count
        unique_classes = x: x[0]).collect()
        for uc in unique_classes:
             unique_values_with_count.append([uc, df.filter(df[col]==uc).count()])
        #sort unique values w.r.t their count values
        sorted_unique_values_with_count= sorted(unique_values_with_count, key = lambda x: x[1], reverse =True)
        if (verbose==True): print(col, sorted_unique_values_with_count, " and mode is ", sorted_unique_values_with_count[0][0])
        col_with_mode.append([col, sorted_unique_values_with_count[0][0]])
    return col_with_mode

각 컬럼들에 대해 최빈값을 구한다. 

cat_cols = ['rbc', 'pc', 'pcc', 'ba', 'htn', 'dm', 'cad', 'appet', 'pe', 'ane']
col_with_mode = mode_of_pyspark_columns(data, cat_cols)
[['rbc', 'normal'],
 ['pc', 'normal'],
 ['pcc', 'notpresent'],
 ['ba', 'notpresent'],
 ['htn', 'no'],
 ['dm', 'no'],
 ['cad', 'no'],
 ['appet', 'good'],
 ['pe', 'no'],
 ['ane', 'no']]

각 컬럼별로 반복문을 돌면서, null 인 경우 최빈값으로 replace 를 수행한다. 

for col, mode in col_with_mode:
    data = data.withColumn(col, when(data[col].isNull()==True, 

결측값 치환이 잘 되었는지를 확인한다. 결측값이 없는 데이터가 되었음을 확인할 수 있다. 

# 각 컬럼별로 결측값 개수를 확인함 
# 만약 결측값 코딩이 0으로 되어 있는 경우, filter 조건을 변경하면 된다. 
result = pd.DataFrame({'variable':[], 'missing_count':[]})
for col in data.columns:
    df_new = pd.DataFrame({'variable':[], 'missing_count':[]})[0, 'variable'] = col[0, 'missing_count'] = data.filter(data[col].isNull()).count()
    result = pd.concat([result, df_new], axis=0)
result.astype({'missing_count': 'int32'}) # missing_count 을 int 형으로 변환함 
result['missing_rate'] = result['missing_count'] / data.count() # 결측률 계산 

# 결측률이 높은 순서대로 출력한다. 
# rbc 컬럼의 결측율이 38% 로 가장 높은 것을 확인할 수 있다. 
result.sort_values(by=['missing_rate'], ascending=False)


참고 자료


A Brief Introduction to PySpark

PySpark is a great language for performing exploratory data analysis at scale, building machine learning pipelines, and creating ETLs for…








PySpark DataFrame 을 사용하는 이유와 pandas 와의 차이점

/* DeepPlay 2022-08-26 */


왜 PySpark 인가?


한 마디로 빅데이터 환경에서 전통적인 데이터 처리 툴들 (R, pandas) 을 활용하기 어렵기 때문이다. 토이 데이터의 경우, 10GB 를 넘는 경우가 드물지만, 실제 회사의 빅데이터 환경에서는 하나의 데이터셋이 10GB 를 넘는 경우가 많으며, 크게는 10TB를 넘는 경우도 있다. 기본적으로 R과 python pandas 는 in-memory 처리 방식이다. 모든 데이터를 메모리에 적재한 후, 처리한다. 만약 램이 8GB 인 머신을 사용한다고 하면, 이러한 데이터들을 로드조차 하지 못하고, out-of-memory 에러로 커널이 죽는 모습을 확인할 수 있게 된다.


pyspark 환경에서는 메모리 사용량을 최소화하는 방식으로 용량이 크고, 포맷이 다양한 데이터들을 "특정 데이터 구조" 로 로드하고 처리하는 것이 가능하다. 즉, pyspark 는 시간 및 컴퓨팅 자원 측면에서 효율적으로 데이터 처리/분석을 할 수 있도록 도와준다.


만약, 데이터 분석 공부를 하거나, 큰 데이터를 접할 일이 없는 도메인에서 일을 하는 경우에는 R 또는 pandas 만 사용하여도 괜찮다. 하지만 빅데이터 환경에서 데이터를 처리하고 분석하는 것이 필요하다면, pyspark 를 활용하는 것이 더욱 효율적이며, 이것이 pyspark 를 배우면 좋은 이유이다. 


PySpark DataFrame


pyspark 의 핵심 데이터 타입은 dataframe 이다. pyspark dataframe 은 쉽게 말해 여러 클러스터에 분산 되어 있는 테이블이라고 할 수 있다. 그 이름처럼 R이나 pandas의 dataframe 과 비슷한 함수들을 갖고 있다. 분산 실행을 위해서는 다른 데이터 타입이 아닌 pyspark 의 dataframe 객체를 이용하면 된다. pyspark dataframe을 활용하는 것은 R 과 pandas 와 거의 비슷하기 때문에, 둘 중 하나에 익숙한 경우 쉽게 활용할 수 있다. 


pyspark 와 pandas 의 큰 차이점 중 하나는 pyspark 는 lazy 하고, pandas 는 eager 하다는 것이다. pyspark 에서는 실제 결과가 필요할 때까지 실행을 유보한다 (lazy evaluation). 예를 들어, hive 환경에 있는 테이블을 읽어온 후, 특정 변수를 변환하는 코드를 짰다고 하자. 하지만 이 코드를 실행하는 즉시, 이 작업이 실행되지 않는다. 실제 데이터가 필요한 경우에만 이 작업이 실행횐다 예를 들어, 변환된 테이블을 다시 hive 환경에 파일로 저장하는 등의 경우를 들 수 있다. 이러한 방식이 좋은 점은 전체 데이터를 메모리에 저장하지 않아도 되기 때문에, 효율적으로 데이터를 처리할 수 있다. 반면, pandas 에서는 함수가 호출되는 즉시 실행되며 (eager evaluation), 모든 것은 메모리에 저장된다. pyspark 환경에서의 데이터 처리를 한다고 했을 때, eager operation 은 사용하는 것은 지양하는 것이, 리소스 절감 측면에서 바람직하다고 할 수 있다. 



참고 자료


여러 테이블을 Full outer join 하기

/* 2022-08-25 by DeepPlay */


결론: full outer join 은 3개 이상의 테이블에 대해서는 웬만해서는 쓰지 않는 것이 좋다.

만약 3개 이상의 테이블에 대해 full outer join 을 하려고 한다면, 다른 join 방법을 고려해보자. 


여러 테이블을 종합해서 하나의 테이블로 만들고 싶은데, 각 테이블들간 포함관계가 없어 full outer join을 해야하는 상황. 

예를 들면, 아래와 같은 4개의 테이블을 full outer join 을 해서 personid 와 row 개수가 1:1인 테이블을 만들고 싶은 상황이다. 

| employee1.personid  |  |
| 111                 | aaa             |
| 222                 | bbb             |   
| 333                 | ccc             | 
| employee2.personid  | employee2.sal  |
| 111                 | 2              |
| 200                 | 3              |
| employee3.personid  |  |
| 111                 | bbsr             |
| 300                 | atl              |
| 200                 | ny               |
| employee4.personid  | employee4.dt  |
| 111                 | 2019-02-21    |
| 300                 | 2019-03-18    |
| 400                 | 2019-03-18    |


원하는 병합 테이블

| personid  |  | u.sal  |  |   v_in.dt   |
| 111       | aaa     | 2      | bbsr     | 2019-02-21  |
| 200       | NULL    | 3      | ny       | NULL        |
| 222       | bbb     | NULL   | NULL     | NULL        |
| 300       | NULL    | NULL   | atl      | 2019-03-18  |
| 333       | ccc     | NULL   | NULL     | NULL        |
| 400       | NULL    | NULL   | NULL     | 2019-03-18  |


해결 방법

가장 깔끔한 방법은 유니크한 personid 를 갖는 병합 테이블을 하나 만들고, 이 테이블에 left outer join 을 하는 것이다. 

create table total_user as 
select ditinct account_id from (
    select distinct account_id from employee1
    union all 
    select distinct account_id from employee2
    union all 
    select distinct account_id from employee3
    union all 
    select distinct account_id from employee4

select t0.personid,
`name`, sal, place, dt
from total_user t0
left outer join employee1 t1 on t0.personid = t1.personid
left outer join employee1 t2 on t0.personid = t2.personid
left outer join employee1 t3 on t0.personid = t3.personid
left outer join employee1 t4 on t0.personid = t4.personid


참고) 잘못된 SQL

-> 이렇게 하게 되면, 테이블1과 계속해서 full outer join 을 하기 때문에, 중복된 personid 가 생기게 된다.

select coalesce(f.personid, u.personid, v.personid, v_in.personid) as personid,,u.sal,,v_in.dt
from employee1 f FULL OUTER JOIN employee2 u on f.personid=u.personid
FULL OUTER JOIN employee3 v on f.personid=v.personid
FULL OUTER JOIN employee4 v_in on f.personid=v_in.personid;




R - aggregate / separate_rows


데이터 처리 도중, 특정 key 를 기준으로, 문자를 리스트형태로 바꾸고 싶을 때가 있다. 

이 때, R 에서 사용할 수 있는 함수가 aggregate 와 separate_rows 이다. 

(hive 에서는 collect_set 과 explode 함수가 비슷한 기능을 함)


왼쪽 테이블 -> 오른쪽 테이블로: separate_rows

오른쪽 테이블 -> 왼쪽 테이블로: aggregate

df <- data.frame(Family_ID = 1:2,
  name = c("Smith, John", "Walker, Mike"),
  stringsAsFactors = FALSE)
df2 <- df %>% separate_rows(name)
#   Family_ID name  
#      1 Smith 
#      1 John  
#      2 Walker
#      2 Mike  

df3 <- aggregate(name ~ Family_ID, df2, toString)
#  Family_ID name
#      1  Smith, John
#      2 Walker, Mike

R - 난수 생성을 일별로 변경하기


R 에서 난수를 생성 또는 랜덤 샘플링 작업의 결과가 일별로 바뀌도록 하고 싶을 때가 있다. 

방법은 간단하게 일별로 random seed 를 동일하게 맞춰주면 된다. 

특정 날짜 '01/06/2022' 를 integer 형으로 변환하면 일별로 동일한 숫자가 나오도록 구현할 수 있다. 


dayYear <- as.Date(Sys.Date(),format='%d/%m/%Y') %>% lubridate::yday() %>% as.integer()

sample(nrow(10)) # 같은 날에는 동일한 순서의 숫자 10개가 나온다.


주의할점은 값이 정수를 갖도록 as.integer 함수를 통해 변환해주어야한다.

(만약 double 인 경우, 실제 시드는 매번 달라진다. 이는 컴퓨터가 double 형을 메모리에 저장하는 방식 때문일듯하다.)



아나콘다 설치 이후 프로세스

ㄴ 파이썬 3.8 버전 설치 가정


1. python3.8 가상환경 생성

conda create -n py38 python=3.8


2. 가상환경 활성화

source activate py38


3. ipykernel 설치

pip install ipykernel


4. 가상환경을 ipykernel 에 등록

python -m ipykernel install --user --name py38 --display-name py38


5. 주피터 노트북 or 주피터랩 실행

conda install jupyter
jupyter lab
