Pyspark export to delimited file

def myConcat(*cols):
    concat_columns = []
    for c in cols[:-1]:
        concat_columns.append(F.coalesce(c, F.lit("*")))
        concat_columns.append(F.lit("\t"))  
    concat_columns.append(F.coalesce(cols[-1], F.lit("*")))
    return F.concat(*concat_columns)

# combined column 에 모든 변수를 \t 로 concat 한 값 저장 
data_text = data.withColumn("combined", myConcat(*data.columns)).select("combined")
data_text.coalesce(1).write.format("text").option("header", "false").mode("overwrite").save(path)

출처 : https://stackoverflow.com/questions/17837871/how-to-copy-file-from-hdfs-to-the-local-file-system

가끔 .gitignore 파일에 올리고 싶지 않은 파일이나 폴더를 지정후, remote repo 에 push 를 해도 적용되지 않을 때가 있다. 

이 때는 캐시를 지우고 시도한다. 

git rm --cached -r .
git add .

 

 

Jupyter notebook에서 python 파일을 자동으로 갱신하는 방법

 

예를 들어 jupyter notebook 파일에서 model.py 라는 파일을 import 해서 사용하고 있다고 하자. 

만약 model.py 파일을 수정하면, 커널을 재시작한 후, 다시 import 를 해야한다. 

아래 구문을 통해 model.py 파일이 수정되는 즉시 적용되도록 만들 수 있다. 

%load_ext autoreload
%autoreload 2

Pyspark MLlib 를 활용한 모델링 기초 (w/ Random Forest)

/* DeepPlay 2022-09-20 */

 

개요

1) 배경: scalable machine learning 이란?

근래 빅데이터쪽에서 scalability 라는 개념이 buzzword 중 하나이다. scalability는 스케일 아웃에 관한 것이며, 스케일 아웃이란 "처리 능력을 향상시키기 위해 트래픽을 분산시킬 서버의 수를 늘리는 방법" 을 의미한다. 즉, scalable 머신러닝이란, 머신러닝을 빠르게 처리하기 위해 여러 컴퓨터에 분산시켜 일을 시키는 것으로 정의해볼 수 있다. 빅데이터를 어떻게 효율적으로 처리할 것인가? 와 관련한 논의 및 발전이 머신러닝 쪽에서도 이루어지고 있다고 볼 수 있겠다. 

 

2) 데이터

필요한 데이터 다운받기: Chronic Kidney Disease Dataset

https://archive.ics.uci.edu/ml/datasets/Chronic_Kidney_Disease

 

3) 요점 

Pyspark MLlib 를 사용할 때 아래 사항을 주의해야함

1. Assembling : 모든 피쳐를 하나의 컬럼에 리스트 형태로 몰아 넣는 것 

2. Type casting: Assembling 한 피쳐는 동일한 타입이어야함. 

3. Missing value handling: Assebling 한 피쳐는 결측이 없어야 함 (NA 가 아닌 0 또는 imputation 된 값이어야 함)

 

0. Load data 

# 데이터 다운로드 받기 
# https://www.kaggle.com/datasets/mansoordaku/ckdisease?resource=download
import pyspark
import pandas as pd
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
import os
import sys

# 환경변수 PYSPARK_PYTHON 와 PYSPARK_DRIVER_PYTHON 의 python path 를 동일하게 맞춰준다. 
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

# 환경 세팅
conf_spark = SparkConf().set("spark.driver.host", "127.0.0.1") # local 환경임을 명시적으로 지정해준다. 
sc = SparkContext(conf=conf_spark)

# data = pd.read_csv("data/kidney_disease.csv") # pandas 에서 csv 파일 읽기 
spark = SparkSession.builder.master("local[1]").appName("kidney_disease").getOrCreate()
data = spark.read.option("header", True).csv("data/kidney_disease.csv") # header가 있는 경우, option 구문 추가 
data_pd = pd.read_csv("data/kidney_disease.csv")

1. Preprocessing

#### Imputation : Categorical variable

각 변수별 결측치 개수 확인하기

# 범주형 변수 리스트 
cat_cols = ['al', 'su', 'rbc', 'pc', 'pcc', 'ba', 'htn', 'dm', 'cad', 'appet', 'pe', 'ane']

# 각 컬럼별로 결측값 개수를 확인함 
# 만약 결측값 코딩이 0으로 되어 있는 경우, filter 조건을 변경하면 된다. 
result = pd.DataFrame({'variable':[], 'missing_count':[]})
for col in data.columns:
    df_new = pd.DataFrame({'variable':[], 'missing_count':[]})
    df_new.at[0, 'variable'] = col
    df_new.at[0, 'missing_count'] = data.filter(data[col].isNull()).count()
    result = pd.concat([result, df_new], axis=0)
    
result.astype({'missing_count': 'int32'}) # missing_count 을 int 형으로 변환함 
result['missing_rate'] = result['missing_count'] / data.count() # 결측률 계산 
result

각 변수별 최빈값 Imputation

from pyspark.sql.functions import when, lit
def mode_of_pyspark_columns(df, cat_col_list, verbose=False):
    col_with_mode=[]
    for col in cat_col_list:
        #Filter null
        df = df.filter(df[col].isNull()==False)
        #Find unique_values_with_count
        unique_classes = df.select(col).distinct().rdd.map(lambda x: x[0]).collect()
        unique_values_with_count=[]
        for uc in unique_classes:
             unique_values_with_count.append([uc, df.filter(df[col]==uc).count()])
        #sort unique values w.r.t their count values
        sorted_unique_values_with_count= sorted(unique_values_with_count, key = lambda x: x[1], reverse =True)
        
        if (verbose==True): print(col, sorted_unique_values_with_count, " and mode is ", sorted_unique_values_with_count[0][0])
        col_with_mode.append([col, sorted_unique_values_with_count[0][0]])
    return col_with_mode

col_with_mode = mode_of_pyspark_columns(data, cat_cols)

for col, mode in col_with_mode:
    data = data.withColumn(col, when(data[col].isNull()==True, 
    lit(mode)).otherwise(data[col]))

#### Imputation : Numeric variable

각 변수별 평균값 치환 코드

numeric_cols=['age', 'bp', 'sg','bgr', 'bu', 'sc', 'sod', 'pot', 'hemo', 'pcv', 'rc', 'wc']

from pyspark.sql.functions import avg
from pyspark.sql.functions import when, lit

# 컬럼별 평균값 계산하기 
def mean_of_pyspark_columns(df, numeric_cols, verbose=False):
    col_with_mean=[]
    for col in numeric_cols:
        mean_value = df.select(avg(df[col]))
        avg_col = mean_value.columns[0]
        res = mean_value.rdd.map(lambda row : row[avg_col]).collect()
        
        if (verbose==True): print(mean_value.columns[0], "\t", res[0])
        col_with_mean.append([col, res[0]])    
    return col_with_mean

col_with_mean = mean_of_pyspark_columns(data, numeric_cols)

# with column when 구문으로 null 일 때, 평균으로 치환한다. 
for col, mean in col_with_mean:
    data = data.withColumn(col, when(data[col].isNull() == True, 
    lit(mean)).otherwise(data[col]))

 

2. Categorical variable encoding

범주형 피쳐 인코딩

데이터에 관한 지식을 활용해 모든 피쳐들이 binary 또는 ordinal 변수라는 것을 확인한다. 이 경우, Pyspark 의 StringIndexer 를 활용하여 encoding 해볼 수 있다. 예를 들어, normal/abnormal 의 경우, binary 변수이며, good/poor 등도 각 값들이 독립적이지 않고, 등급이 존재하는 ordinal 변수라는 것을 알 수 있다.

data.select(cat_cols).show()
+---+---+--------+--------+----------+----------+---+---+---+-----+---+---+
| al| su|     rbc|      pc|       pcc|        ba|htn| dm|cad|appet| pe|ane|
+---+---+--------+--------+----------+----------+---+---+---+-----+---+---+
|1.0|0.0|  normal|  normal|notpresent|notpresent|yes|yes| no| good| no| no|
|4.0|0.0|  normal|  normal|notpresent|notpresent| no| no| no| good| no| no|
|2.0|3.0|  normal|  normal|notpresent|notpresent| no|yes| no| poor| no|yes|
|4.0|0.0|  normal|abnormal|   present|notpresent|yes| no| no| poor|yes|yes|

 

각 변수별로 suffix로 Index를 붙힌 새로운 변수를 만든다.

# Categorical 변수들을 StringIndexer 을 이용해 Numeric 변수로 변환함 
from pyspark.ml.feature import StringIndexer 

indexed = data
for col in cat_cols:
    indexer = StringIndexer(inputCol=col, outputCol=col + "Index")
    indexed = indexer.fit(indexed).transform(indexed)

예를 들어, 아래와 같이 새로운 컬럼이 생성된다. 

# 각 범주형 변수별로 Index 변수를 아래와 같이 만든다.
indexed.select(['id', 'pc', 'pcIndex']).show()
| id|      pc|pcIndex|
+---+--------+-------+
|  0|  normal|    0.0|
|  1|  normal|    0.0|
|  2|  normal|    0.0|
|  3|abnormal|    1.0|

 

레이블 변수 인코딩

분류 문제의 경우 레이블은 범주형 변수로 마찬가지로 인코딩이 필요하다. 실제 유니크 레이블들을 확인해보니 잘못 들어간 값이 존재하여 이를 제대로된 값으로 치환해준다. 

indexed.select('classification').distinct().show()
+--------------+
|classification|
+--------------+
|        notckd|
|           ckd|
|         ckd\t|
+--------------+

값 치환 

# 잘못된 값이 있기 때문에 이를 replace 해준다. 
indexed = indexed.withColumn("classification", \
              when(indexed["classification"] == 'ckd\t', 'ckd').otherwise(indexed["classification"]))

결과를 보면, notckd 를 1, ckd를 0으로 인코딩 해주었는데, 반대로 notckd를 0, ckd를 1로 바꾸어 인코딩해보자. 

from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="classification", outputCol="label")
indexed = indexer.fit(indexed).transform(indexed)
indexed.select('label').distinct().show()
+--------------+-----+
|classification|label|
+--------------+-----+
|        notckd|  1.0|
|           ckd|  0.0|
+--------------+-----+
indexed = indexed.withColumn("label", when(indexed["label"] == 1, 0).otherwise(1))

 

2. Typecasting of Features

- Numeric 변수들을 Double 형으로 변환한다.
- 이 때 주의할점은, 만약 원래 numeric 변수인데, 오류로 인해 문자가 들어있는 경우, Double 형변환을 하면 값이 null 이 된다.
- 따라서 확인하고, imputation 을 한 번 더 수행해야할 수도 있다. 

 

numeric 변수들을 double 형으로 변환하기

from pyspark.sql.types import DoubleType
for col in numeric_cols:
    indexed = indexed.withColumn(col, indexed[col].cast(DoubleType()))
# with column when 구문으로 null 일 때, 평균으로 치환한다. 
for col, mean in col_with_mean:
    indexed = indexed.withColumn(col, when(indexed[col].isNull() == True, 
    lit(mean)).otherwise(indexed[col]))

 

3. Assembling of Input Features

- feature 를 assemble 하여 하나의 필드로 변환 후, 이후 피쳐 변환이나 모델링 절차 등이 수행한다. 

 

피쳐들의 Type 재확인

-> 모두 double type 이라는 것을 알 수 있음

for col in indexed[feature_list].dtypes:
    print(col[0]+" , "+col[1])

 

VectorAssemble 활용

피쳐들을 assemble 한 features 라고 하는 컬럼을 생성한다. inputCols 파라미터에 피쳐명을 리스트 형태로 지정하고, outputCols 에 assemble 한 값을 넣을 컬럼명을 넣는다. 

# 주의 사항: Assemble 을 할 때, 데이터에 결측이 없어야함 
from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler(inputCols=feature_list,
                                  outputCol="features")
                                  
features_vectorized = vectorAssembler.transform(indexed)
features_vectorized.select(['id', 'features']).show(5)
+---+--------------------+
| id|            features|
+---+--------------------+
|  0|[1.0,0.0,0.0,0.0,...|
|  1|(24,[0,12,13,14,1...|
|  2|[2.0,2.0,0.0,0.0,...|
|  3|[4.0,0.0,0.0,1.0,...|
|  4|(24,[0,12,13,14,1...|
+---+--------------------+

 

 

4. Normalization of Input Features 

- 피쳐들은 모두 같은 스케일이 아니다. 
- 피쳐들을 같은 스케일로 변환하면, 모델링 단계에서 더 좋은 결과를 보여준다. 

- Scaling 을 할 수도 있고, Normalization 을 할 수도 있다. 이 둘을 interchangable 하게 말하기도 하지만, 정확히는 분포의 모양을 바꾸지 않는 경우 scaling, 분포를 바꾸는 경우 normalization 이라고 한다. (https://www.kaggle.com/code/alexisbcook/scaling-and-normalization/tutorial)


scaling 이나 normalization 은 왜 필요할까?

변수의 scale 이 모델 성능에 미치는 영향을 줄여줌. 피쳐들의 스케일까지 학습해야 하는 부담을 줄일 수 있다. 

ㄴ 만약 normalization 이 적절히 수행된다면, 단점은 없고 장점만 있기 때문에 일반적으로 많이 수행한다. 

 

Normalizer 를 활용한 standard scaling 

inputCols 로 assemble 한 컬럼을 넣어주고, outputCols 로 normalization 된 피쳐를 넣을 컬럼명을 지정해준다. p는 normalization 을 할 때, L1 norm 을 이용하는 것을 의미한다. (default 는 L2 norm 이다.) 본 포스팅에서 normalization 수식이나 변환 결과는 생략한다. 

from pyspark.ml.feature import Normalizer

normalizer = Normalizer(inputCol="features", outputCol="features_norm", p=1.0)
l1NormData = normalizer.transform(features_vectorized)
l1NormData.select(['features','features_norm']).show()
+--------------------+--------------------+
|            features|       features_norm|
+--------------------+--------------------+
|[1.0,0.0,0.0,0.0,...|[1.20525839810946...|
|(24,[0,12,13,14,1...|(24,[0,12,13,14,1...|
|[2.0,2.0,0.0,0.0,...|[2.40521254800404...|
|[4.0,0.0,0.0,1.0,...|[5.58159914210821...|
|(24,[0,12,13,14,1...|(24,[0,12,13,14,1...|
preprocessed_data = l1NormData

 

 

5. Spiliting into train and test set

#Split the dataset into training and testing dataset
splits = preprocessed_data.randomSplit([0.8, 0.2])
df_train = splits[0]
df_test = splits[1]

 

6. Train & Predict

pyspark 에서 학습하고 예측하는 일에 대해 fit, transform  이라는 용어를 사용한다. fit 은 학습 데이터를 입력 받아 모델을 만드는 작업업을 의미하며, transform 은 예측하고 싶은 데이터를 입력 받아, 학습된 모델로 그 데이터의 label을 예측한다. 

from pyspark.ml.classification import RandomForestClassifier

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="label", featuresCol="features_norm", numTrees=10)
rf_model = rf.fit(df_train)
prediction = rf_model.transform(df_train)

transform 을 수행하면, rawPrediction, probability, prediction 이라는 컬럼을 데이터에 추가해준다. 

- rawPrediction (list): random forest 의 각 tree 들이 각 클래스에 속한다고 예측한 값들의 합이다. 

- probabilty (list): rawPrediction 을 0~1 사이로 scaling 한 값 

- prediction (element): probabilty 를 토대로 가장 가능성이 높은 label 예측한 값 

prediction.select(['id', 'rawPrediction', 'probability', 'prediction']).show()
+---+--------------------+--------------------+----------+
| id|       rawPrediction|         probability|prediction|
+---+--------------------+--------------------+----------+
|  0|[0.94594594594594...|[0.09459459459459...|       1.0|
|  1|[2.06800093632958...|[0.20680009363295...|       1.0|
| 10|[0.01515151515151...|[0.00151515151515...|       1.0|
|100|[0.20461020461020...|[0.02046102046102...|       1.0|
|102|[4.91567118722564...|[0.49156711872256...|       1.0|

 

7. Evaluation

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

binEval = MulticlassClassificationEvaluator().setMetricName("accuracy") .setPredictionCol("prediction").setLabelCol("label")
print(f'training accuracy: {binEval.evaluate(prediction)}')

prediction_test = rf_model.transform(df_test)
binEval = MulticlassClassificationEvaluator().setMetricName("accuracy") .setPredictionCol("prediction").setLabelCol("label")
print(f'test accuracy: {binEval.evaluate(prediction_test)}')
training accuracy: 0.9900662251655629
test accuracy: 0.9489795918367347
 

참고문서

https://medium.com/@aieeshashafique/machine-learning-with-pysparkmlib-and-random-forest-solving-a-chronic-kidney-disease-problem-752287d5ffd0
https://www.silect.is/blog/random-forest-models-in-spark-ml/  

- https://dzone.com/articles/what-scalable-machine-learning

t-SNE 의 개념 및 알고리즘 설명

/* DeepPlay 2022-09-11 */

 

t-SNE (t-distributed Stochastic Neighbor Embedding) 는 고차원 데이터를 저차원 데이터로 변환하는 차원 축소 (dimensionality reduction) 기법이며, 대표적이며, 좋은 성능을 보이는 기법이다. 

 

차원 축소을 하는 목적은 시각화, 클러스터링, 예측 모델의 일반화 성능 향상 등의 목적을 들 수 있다. t-SNE 의 경우, 고차원 공간상의 데이터 포인트들의 위치를 저차원 공간상에서의 극적으로 표현을 해주기 때문에 데이터에 존재할 수 있는 군집들을 시각화해서 표현해주는데 강점을 갖고, 시각화에 주로 사용된다. t-SNE 는 직접적으로 클러스터를 만들어서 레이블링까지 해주는 클러스터링 알고리즘은 아니다. 따라서 클러스터링에 직접적으로 활용되기 보다는 t-SNE 의 결과에 다시 k-means 와 같은 알고리즘을 적용하는 방식으로 클러스터링을 수행하기도 한다 (참고: https://www.quora.com/Can-TSNE-be-used-for-clustering). t-SNE 는 PCA 와 같이 저차원에서 요약된 변수에 의미가 있는 것은 아니다. (PCA 의 경우, 저차원 공간의 변수가 고차원 공간상의 변수들의 선형 결합이라는 의미가 있다.)

 

차원 축소의 3가지 카테고리

1) feature selection: univariate association test, ensemble feature selection, step-wise regression 등

2) matrix factorization: SVD (singluar vector decomposition)

3) neighbor graphs: t-sne, UMAP (Uniform Manifold Approximation and Projection) 등

 

우선, t-sne 는 비선형 차원 축소 (nonlinear dimensionality reduction) 기법이다. 따라서 아래와 같은 데이터에 대해서도 적용할 수 있다. 반면 PCA (principle component analysis) 와 같은 선형 차원 축소 방법의 경우 아래 데이터에 적용하여 유의미한 결과를 내기는 어렵다.

 

선형 차원 축소 기법으로 차원 축소가 어려운 형태의 데이터

 

t-SNE 알고리즘

 

sne, t-sne, UMAP 과 같은 차원 축소 방법은 아래의 공통된 절차를 수행한다.

원래 데이터가 있는 공간을 high dimension, 축소된 공간을 low dimension 이라고 하자. 

 

------------------------------------------------------------------------------------------------

1) high dimensional probabilities p 를 계산한다.
2) low dimensional probabilities q 를 계산한다.
3) 두 분포의 차이를 반영하는 cos function C(p,q) 를 정의한다. 
4) Cost function 이 최소화 되도록 저차원 공간상의 데이터를 변환한다. 

------------------------------------------------------------------------------------------------

 

대략적인 절차는 매우 심플하다. t-sne 에서는 각 절차를 실제로 어떻게 수행하는지 알아보자. 

 

1) high dimensional probabilities p 를 계산한다.

p_(i,j) 를 어떤 데이터 포인트 i,j의 similarity 를 반영하는 스코어라고 하자. 두 포인트가 가까이 위치할 수록 p_(i,j) 의 값은 커지게 된다. 그리고 i,j 의 euclidean distance 를 e_(i,j) 라고 하자. 다른 데이터들에 대해서도 서로의 eucliean distance 를 계산할 수가 있고, 그 값들이 어떤 분포 g 를 따른다고 가정하자. p_(i,j) 는 그 분포상에서의 likelihood 라고 할 수 있는, g(e(i,j)) 로 정의해보자. 

예를 들어 설명하면, 위 두 데이터 포인트를 각각 (2,9) 와 (3,10) 이라고 하자. e= sqrt((3-2)^2 + (10-9)^2)=sqrt(2) = 1.41 이다. 

 

t-sne 에서 사용하는 g 확률 분포는 아래와 같다.

 

$$ g(x) = exp(-x^2 / 2\sigma_i^2) $$ 

$$ g(e(i,j)) = exp(-||x_i-x_j||^2 / 2\sigma_i^2) $$ 

 

모든 g 값의 합이 1이 되도록 아래 식으로 변환한다. 

 

$$ p_{j|i} = \frac{exp(-||x_i-x_j||^2 / 2\sigma_i^2)}{\sum_{k \neq l} exp(-||x_k-x_l||^2 / 2\sigma_i^2)} $$

 

그러면, p(j|i) 의 값이 p(i|j) 의 값은 다른데, 최종적으로 두 값의 평균을 취하고, 마찬가지로 모든 값의 합이 1이 되도록 하기 위하여 최종적인 i,j 의 similarity score p(i,j) 를 아래와 같이 계산한다 (N은 계산 가능한 쌍의 수). 이러면, p 를 이산확률분포처럼 다룰 수 있게 된다. 

 

$$ p_{i,j} = \frac{p(j|i) + p(i|j)}{2N} $$

 

2) low dimensional probabilities q 를 계산한다.

마찬가지의 방법으로 q(i,j) 를 다음과 같이 구한다. 

 

$$ q_{j|i} = \frac{(1+||y_i - y_j||^2)^{-1})}{\sum_{k \neq l} (1+||y_k - y_l||^2)^{-1})  } $$ 

$$ q_{i,j} = \frac{q(j|i) + q(i|j)}{2N} $$ 

 

σ 는 어떻게 정해지는가?

이를 위해하기 위해 우선, entropy 와 perplexity 라는 개념에 대한 설명이 필요하다. perplexity = 2^entropy 로 정의되며, entropy 는 '어떠한 확률 분포에 대하여, 관측값을 예측하기 어려운 정도' 를 의미하는 수치이다. 어떤 분포 q 에대한 entropy 는 아래와 같다. 

 

$$ H(q) = -\sum_{c=1}^{C} q(y_c)log(q(y_c)) $$ 

 

entropy 를 설명하기 위해, 빨간공과 녹색공이 20:80 으로 들어 있는 가방에서 1개의 공을 꺼내서 관찰 값을 확인하는 이산 확률 분포를 예로 들어보자. 그 확률 분포 q의 entropy 는 H(q)=-(0.2log(0.2)+0.8log(0.8))=0.5 이다. 그리고, perplexity = 2^0.5 = 1.41 이다. 

 

perplexity 값에 따라 t-SNE 의 결과가 민감하게 반응하기 때문에 perplexity 는 중요한 파라미터이다. 보통 t-SNE 는 입력받은 perplexity 를 맞추는 σ 를 찾기 위하여 binary search 를 수행한다. 일반적으로 perplexity 를 조정하면서 시각화를 해보고, 가장 군집을 잘 보여주는 값을 최종적으로 선정하는 방법을 택한다. 

 

왜 p, q 분포는 위와 같이 정해지는가?

p분포는 정규 분포와 유사하며, q분포는 t분포와 유사한 형태를 띈다. q 분포의 경우, p 분포 대비 빠르게 하락하고, 꼬리가 두터운 형태의 분포를 갖는다.  q분포를 썼을 때의 효과는 한 점에 데이터가 뭉치는 crowding problem 을 완화시킨다는데 있다. 따라서, 시각화시 저차원 공간상에서 너무 한 점에 뭉치지 않도록 하는 효과가 있기 때문에 p 분포를 썼을 때보다 이점이 있다. (이는 개인적인 이해를 위한 해석이며, 이와 관련한 좀 더 디테일한 설명은 original paper 를 참고) 

 

구현 레벨에서의 최적화

t-SNE 는 데이터가 커질수록 연산량이 기하급수적으로 늘어나는 O(n^2) 의 시간 복잡도를 갖는다. 실제 구현 레벨에서는 Barnes hut t-SNE 라는 방법을 통해 더 계산 효율적인 구현 방식을 택한다. scikit-learn 의 t-sne 구현체는 이 방식을 활용한다.

 

t-SNE 의 optimization

t-SNE 에서의 optimization 이란 고차원 공간상에서의 p분포 (high dimensional probabilities p) 와 저차원 공간상의 q분포 (low dimensional probabilities q ) 의 차이를 줄이는 것이다. 이 때, cost function 을 정의하고, 이를 최소화하는 방식으로 optimization 이 수행된다. 

 

3) 두 분포의 차이를 반영하는 cos function C(p,q) 를 정의한다. 

cost function C(p,q) 로는 Kullback-Leibler divergence 를 사용한다. p,q는 이산확률분포이고, KL divergence의 식에 적용하면 cost 를 실제로 구해볼 수도 있다. 

 

4) Cost function 이 최소화 되도록 저차원 공간상의 데이터를 변환한다. 

KL divergence 을 최소화 시키는 저차원 공간상의 데이터의 위치를 gradient optimization 방식을 통해 구할 수 있다. 설명하자면, 결국 저차원 공간상에 랜덤하게 뿌려진 데이터 포인트들이 각각 어떤 방향으로 가야지 cost function 을 줄일 수 있을지 알아야 하는 것인데, 이는 cost function 을 미분한 뒤에 각 데이터 포인트 별로 gradient 를 구함으로써 알 수 있다. 

 

참고자료

R - 컬럼별 동일한 함수 적용을 위한 lapply 테크닉

/* DeepPlay 2022-09-05 */

이전 포스트: apply 계열의 R 함수 정리 (포스팅)

 

데이터 처리 중, 각 컬럼별로 동일한 함수를 적용시키고 싶을 때가 있다. 예를 들면, string 형태로 저장된 컬럼들을 일괄적으로 numeric으로 바꾸고 싶다고 하자. 이 때, lapply 를 유용하게 사용할 수 있다. 

 

정보) lapply 는 vector, list 를 인풋으로 받아 list 를 아웃풋으로 내보낸다.   

 

아래 함수는 vars 에 지정된 컬럼들을 lapply 함수를 활용해 일괄적으로  numeric 형으로 변환하는 코드이다.  

# vars 에 numeric 으로 변환하고 싶은 컬럼 
data[,vars] <- lapply(vars, function(x){
  as.numeric(unlist(data[,x])) # [,x] 방식의 컬럼 선택은 output 을 list 형태로 반환한다. 
})

위 코드를 설명하면 우선 각 컬럼 x 별로 as.numeric 함수를 적용시켜 이 값을 list of vectors 로 반환한다. 그리고 이 반환값이 dataframe 의 컬럼값을 지정하도록 수행된다. (이러한 코드가 가능한 이유는 dataframe 이 기본적으로 list 의 결합이기 때문이다.)

PySpark DataFrame 을 이용한 탐색적 데이터 분석 및 처리 

/* DeepPlay 2022-08-26 */

 

* 본 포스팅에서는 pyspark 가 설치되어있음을 가정합니다. 

 

최초로 데이터를 받은 이후에, 가장 먼저할 작업은 탐색적 데이터 분석이다. 탐색적 데이터 분석을 통해 각 변수별로 처리 방법을 계획하여, 분석하기 쉬운 형태의 데이터로 만들게 된다. 이 과정에서 변수의 변환, 제거, 생성, 결측값 처리 등의 절차를 수행하게 된다. 본 포스팅은 pyspark 를 통해 데이터를 탐색적으로 확인하는 방법과 간단한 데이터 처리 방법 몇 가지를 다룬다. 

 

pyspark 를 통한 탐색적 분석 문법은 pandas 와 유사한 부분도 있고, 그렇지 않은 부분도 있다. 자주 등장하는 pyspark 의 특이적인 문법이 존재하는데, R, pandas 만 활용해온 사람이라면 이러한 부분에 점점 익숙해질 필요가 있을 것 같다. 

 

필요한 데이터 다운받기: Chronic Kidney Disease Dataset

https://archive.ics.uci.edu/ml/datasets/Chronic_Kidney_Disease

 

환경세팅

- MAC OS 에서 python 3.8 환경에 pyspark 를 설치하여 실습함 

# 데이터 다운로드 받기 
# https://www.kaggle.com/datasets/mansoordaku/ckdisease?resource=download
import pyspark
import pandas as pd
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
import os
import sys

# 환경변수 PYSPARK_PYTHON 와 PYSPARK_DRIVER_PYTHON 의 python path 를 동일하게 맞춰준다. 
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

# 환경 세팅
conf_spark = SparkConf().set("spark.driver.host", "127.0.0.1") # local 환경임을 명시적으로 지정해준다. 
sc = SparkContext(conf=conf_spark)

 

0. 데이터 로드

pyspark 에서 csv 파일을 load 하는 것은 아래 문법으로 수행할 수 있다. 

# data = pd.read_csv("data/kidney_disease.csv") # pandas 에서 csv 파일 읽기 
spark = SparkSession.builder.master("local[1]").appName("kidney_disease").getOrCreate()
data = spark.read.option("header", True).csv("data/kidney_disease.csv") # header가 있는 경우, option 구문 추가 
data_pd = pd.read_csv("data/kidney_disease.csv")

 

1. Schema: 데이터 스키마 확인하기

어떤 구조와 변수형으로 데이터가 들어가 있는지 먼저 파악한다. 

- 데이터를 직접 추출하여 만들었다면, 각 컬럼이 무엇을 의미하는지 알겠지만..
- 연습을 위한 토이 데이터이거나, 다른 사람한테 받은 데이터라면, 각 컬럼이 무엇을 의미하는지 이해하고 넘어갈 필요가 있다.

- kidney_disease 데이터의 변수명 의미 확인하기: https://archive.ics.uci.edu/ml/datasets/Chronic_Kidney_Disease

# 모든 필드가 string 타입으로 설정되어 있는 것을 확인할 수 있음  
data.printSchema()
root
 |-- id: string (nullable = true)
 |-- age: string (nullable = true)
 |-- bp: string (nullable = true)
 |-- sg: string (nullable = true)
 |-- al: string (nullable = true)
 |-- su: string (nullable = true)
 |-- rbc: string (nullable = true)

 

2. Show data: 데이터 확인하기

- csv 파일에서 공백인 경우에는 null 값으로 채워졌다는 것을 알 수 있다.

- 각 변수들의 타입을 눈으로 한 번 확인한다.(연속형, 범주형) 

data.show() 
# data.head() 도 사용 가능함
+---+----+-----+-----+----+----+--------+--------+----------+----------+-----+-----+----+-----+----+----+----+-----+----+---+---+---+-----+---+---+--------------+
| id| age|   bp|   sg|  al|  su|     rbc|      pc|       pcc|        ba|  bgr|   bu|  sc|  sod| pot|hemo| pcv|   wc|  rc|htn| dm|cad|appet| pe|ane|classification|
+---+----+-----+-----+----+----+--------+--------+----------+----------+-----+-----+----+-----+----+----+----+-----+----+---+---+---+-----+---+---+--------------+
|  0|48.0| 80.0| 1.02| 1.0| 0.0|    null|  normal|notpresent|notpresent|121.0| 36.0| 1.2| null|null|15.4|  44| 7800| 5.2|yes|yes| no| good| no| no|           ckd|
|  1| 7.0| 50.0| 1.02| 4.0| 0.0|    null|  normal|notpresent|notpresent| null| 18.0| 0.8| null|null|11.3|  38| 6000|null| no| no| no| good| no| no|           ckd|

 

3. Size: 데이터의 규모 확인하기

# 400 X 26 size  
print(f"Row count: {data.count()}")
print(f"Column count: {len(data.columns)}")
Row count: 400
Column count: 26

 

4. Univariate distribution (numeric): 단변수 분포 확인하기

#### string -> double 형변환

우선 연속형 변수를 double 형으로 변환해준다. 여기서도 withColumn 구문을 활용할 수 있다. 

# 연속형 변수들은 double 로 형변환 수행 
numeric_cols = ['age', 'bp', 'sg', 'al', 'su', 'bgr', 'bu', 'sc', 'sod', 'hemo', 'pcv', 'wc', 'rc']
for c in numeric_cols:
    data = data.withColumn(c, data[c].cast('double'))

#### describe 함수: 기본 통계치 추출 

data.describe(numeric_cols).show()
+-------+-----------------+-----------------+--------------------+------------------+-------------------+-----------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+
|summary|              age|               bp|                  sg|                al|                 su|              bgr|                bu|                sc|               sod|              hemo|              pcv|                wc|                rc|
+-------+-----------------+-----------------+--------------------+------------------+-------------------+-----------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+
|  count|              391|              388|                 353|               354|                351|              356|               381|               383|               313|               348|              329|               294|               269|
|   mean|51.48337595907928|76.46907216494846|  1.0174079320113256|1.0169491525423728|0.45014245014245013|148.0365168539326|57.425721784776904|3.0724543080939934|137.52875399361022|12.526436781609195|38.88449848024316| 8406.122448979591| 4.707434944237919|
| stddev|17.16971408926224|13.68363749352527|0.005716616974376756|1.3526789127628445|  1.099191251885407|79.28171423511773|50.503005849222504| 5.741126066859789|10.408752051798777| 2.912586608826765|8.990104814740933|2944.4741904103385|1.0253232655721791|
|    min|              2.0|             50.0|               1.005|               0.0|                0.0|             22.0|               1.5|               0.4|               4.5|               3.1|              9.0|            2200.0|               2.1|
|    max|             90.0|            180.0|               1.025|               5.0|                5.0|            490.0|             391.0|              76.0|             163.0|              17.8|             54.0|           26400.0|               8.0|
+-------+-----------------+-----------------+--------------------+------------------+-------------------+-----------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+

위 결과를 pandas dataframe 으로 변환하고, transpose 를 시켜 보기 쉬운 데이터 형태로 변환한다. 

# 첫번째 로우를 header 로 변경 
numeric_dist = data.describe(numeric_cols).toPandas().T
new_header = numeric_dist.iloc[0] #grab the first row for the header
numeric_dist = numeric_dist[1:] #take the data less the header row
numeric_dist.columns = new_header #set the header row as the df header

# 인덱스컬럼의 값을 추출해서 새로운 컬럼으로 지정
numeric_dist['col'] = numeric_dist.index.values
numeric_dist.reset_index(drop=True, inplace=True)

 

#### agg 함수 사용: quantile 값 추출 

- 아래와 같이 agg 함수를 활용해 연속형 변수의 quantile 값을 추출할 수 있다. 

- pyspark.sql.functions 의 expr 함수를 활용해, agg 함수 내에 원하는 집계 방법을 string 형태로 넣어서 quantile 을 구할 수 있다. 

import pyspark.sql.functions as F

perc = pd.DataFrame({'col':[], '%25':[], '%50':[], '%75':[], '%90':[], '%99':[]})
for c in numeric_cols :
    result_new = data.agg(
             F.expr('percentile(' + c + ',array(0.25))')[0].alias('%25'),
             F.expr('percentile(' + c + ',array(0.50))')[0].alias('%50'),
             F.expr('percentile(' + c + ',array(0.75))')[0].alias('%75'),
             F.expr('percentile(' + c + ',array(0.90))')[0].alias('%90'),
             F.expr('percentile(' + c + ',array(0.99))')[0].alias('%99'),
    ).toPandas()
    result_new['col'] = c
    perc = pd.concat([perc, result_new], axis=0)

#### 두 결과 테이블 조인하기 

- 위 두 테이블을 조인하여 최종적으로 원하는 형태의 연속형 변수 분포 테이블을 만들어 볼 수 있다. 

numeric_dist_merge = pd.merge(numeric_dist, perc, how="inner", left_on="col", right_on="col")
numeric_dist_merge[['col', 'count', 'mean', '%25', '%50', '%75', '%90', '%99']]

5. Distribution: 단변수 분포 확인하기 - Categorical variable

#### 범주형 변수에서 unique 한 값 추출하기

- 아래 스크립트에서 rdd.map 이 pyspark 특이적인 문법이라고 볼 수 있다. 

ㄴ rdd 는 dataframe 을 rdd 형태로 다룰 것임을 의미하며, rdd 에 lambda 함수를 적용해 결과를 추출한다. 

ㄴ collect 함수는 결과를 list 형태로 만들어준다. 

ㄴ 이러한 문법은 pandas 나 R 에서는 잘 나오지 않기 때문에 익숙해질 필요가 있을 것 같다. 

from pyspark.sql.functions import when
from pyspark.sql import functions as F

# Label 변수 확인하기 
print(f"labels: {data.select('classification').distinct().rdd.map(lambda r: r[0]).collect()}")
# data_pd['classification'].unique() # pandas 의 경우는 이와 같이 한다.
labels: ['notckd', 'ckd', 'ckd\t']

#### 분포 확인하기 

- sql.function 내 agg 함수 내에 원하는 집계 (count) 함수를 넣어, 아래와 같이 그룹별 행수를 계산할 수 있다. 

result = data.select(['classification']).\
   groupBy('classification').\
   agg(F.count('classification').alias('user_count')).toPandas()
result

 

6. Drop useless columns: 컬럼 드랍하기 (Option)

- 필요 없는 컬럼이 있는 경우 drop 함수로 드랍한다.

data = data.drop('pot')
print(f"Row count: {data.count()}")
print(f"Column count: {len(data.columns)}")
Row count: 400
Column count: 25

 

7. Subsetting: 원하는 행만 남기기 

- filter 함수를 통해 원하는 행만 남길 수 있다. 

data_ckd = data.filter(data['classification'] == 'ckd')
data_ckd.show()
+---+----+-----+-----+----+----+--------+--------+----------+----------+-----+-----+----+-----+----+----+-----+----+---+---+---+-----+---+---+--------------+
| id| age|   bp|   sg|  al|  su|     rbc|      pc|       pcc|        ba|  bgr|   bu|  sc|  sod|hemo| pcv|   wc|  rc|htn| dm|cad|appet| pe|ane|classification|
+---+----+-----+-----+----+----+--------+--------+----------+----------+-----+-----+----+-----+----+----+-----+----+---+---+---+-----+---+---+--------------+
|  0|48.0| 80.0| 1.02| 1.0| 0.0|    null|  normal|notpresent|notpresent|121.0| 36.0| 1.2| null|15.4|  44| 7800| 5.2|yes|yes| no| good| no| no|           ckd|
|  1| 7.0| 50.0| 1.02| 4.0| 0.0|    null|  normal|notpresent|notpresent| null| 18.0| 0.8| null|11.3|  38| 6000|null| no| no| no| good| no| no|           ckd|

 

8. Check missing values: 컬럼별 결측값 개수/결측률 확인하기

- 이 데이터에서는 결측값이 null 로 들어가 있다. 

- 컬럼별로 결측값 개수를 구하는 것은 다양한 방식으로 구현할 수 있지만, 가장 간단하게 for 을 활용하는 방법은 아래와 같다. 

# 각 컬럼별로 결측값 개수를 확인함 
# 만약 결측값 코딩이 0으로 되어 있는 경우, filter 조건을 변경하면 된다. 
result = pd.DataFrame({'variable':[], 'missing_count':[]})
for col in data.columns:
    df_new = pd.DataFrame({'variable':[], 'missing_count':[]})
    df_new.at[0, 'variable'] = col
    df_new.at[0, 'missing_count'] = data.filter(data[col].isNull()).count()
    result = pd.concat([result, df_new], axis=0)
    
result.astype({'missing_count': 'int32'}) # missing_count 을 int 형으로 변환함 
result['missing_rate'] = result['missing_count'] / data.count() # 결측률 계산
# 결측률이 높은 순서대로 출력한다. 
# rbc 컬럼의 결측율이 38% 로 가장 높은 것을 확인할 수 있다. 
result.sort_values(by=['missing_rate'], ascending=False)

 

9. Replace: 값 치환하기 

- 데이터를 다루다보면 종종 값을 치환해야할 경우가 있다. (값이 잘못 들어가 있는 경우, 카테고리를 합치는 경우 등..)

- withColumn 구문과 when 구문을 합쳐여 값 치환을 구현해볼 수 있다. 

 

# label 에 'ckd\t' 로 데이터가 잘못 들어가 있는 것을 확인할 수 있다. 
# 이를 when 구문을 통해 원하는 값으로 바꾸어준다. 
data = data.withColumn("classification", \
              when(data["classification"] == 'ckd\t', 'ckd').otherwise(data["classification"]))

# 재출력하기 
print(f"처리후 labels: {data.select('classification').distinct().rdd.map(lambda r: r[0]).collect()}")
처리후 labels: ['notckd', 'ckd']

 

10. Imputation: 평균대치법 (for numeric variable)

- 본 데이터셋은 결측값이 null 로 채워져 있다. 평균 대치법으로 결측값을 처리하자. 

- 먼저, 특정 컬럼의 평균을 구하는 함수를 작성한다. 

ㄴ pyspark 에서는 이와 관련한 readymade 함수가 없기 때문에 직접 작성한다. 

- 여기서도 df.select(avg(df[col]))) 이나 rdd.map 같은 pyspark 특이적 문법이 등장한다. 

from pyspark.sql.functions import avg
from pyspark.sql.functions import when, lit

# 컬럼별 평균값 계산하기 
def mean_of_pyspark_columns(df, numeric_cols, verbose=False):
    col_with_mean=[]
    for col in numeric_cols:
        mean_value = df.select(avg(df[col]))
        avg_col = mean_value.columns[0]
        res = mean_value.rdd.map(lambda row : row[avg_col]).collect()
        
        if (verbose==True): print(mean_value.columns[0], "\t", res[0])
        col_with_mean.append([col, res[0]])    
    return col_with_mean

각 컬럼들에 대해 평균값을 구한다. 

numeric_cols = ['age', 'bp', 'sg', 'al', 'su', 'bgr', 'bu', 'sc', 'sod', 'hemo', 'pcv', 'wc', 'rc']
col_with_mean = mean_of_pyspark_columns(data, numeric_cols)
col_with_mean
[['age', 51.48337595907928],
 ['bp', 76.46907216494846],
 ['sg', 1.0174079320113256],
 ['al', 1.0169491525423728],
 ['su', 0.45014245014245013],
 ['bgr', 148.0365168539326],
 ['bu', 57.425721784776904],
 ['sc', 3.0724543080939934],
 ['sod', 137.52875399361022],
 ['hemo', 12.526436781609195],
 ['pcv', 38.88449848024316],
 ['wc', 8406.122448979591],
 ['rc', 4.707434944237919]]

각 컬럼별로 반복문을 돌면서, null 인 경우 평균으로 replace 를 수행한다. 

# with column when 구문으로 null 일 때, 평균으로 치환한다. 
for col, mean in col_with_mean:
    data = data.withColumn(col, when(data[col].isNull() == True, 
    lit(mean)).otherwise(data[col]))

 

11. Imputation: Mode 대치 (for categorical variable)

- 본 데이터셋은 결측값이 null 로 채워져 있다. 최빈값 대치법으로 결측값을 처리해보자. 

- 마찬가지로 컬럼별로 최빈값을 구하는 함수를 작성한다. 

def mode_of_pyspark_columns(df, cat_col_list, verbose=False):
    col_with_mode=[]
    for col in cat_col_list:
        #Filter null
        df = df.filter(df[col].isNull()==False)
        #Find unique_values_with_count
        unique_classes = df.select(col).distinct().rdd.map(lambda x: x[0]).collect()
        unique_values_with_count=[]
        for uc in unique_classes:
             unique_values_with_count.append([uc, df.filter(df[col]==uc).count()])
        #sort unique values w.r.t their count values
        sorted_unique_values_with_count= sorted(unique_values_with_count, key = lambda x: x[1], reverse =True)
        
        if (verbose==True): print(col, sorted_unique_values_with_count, " and mode is ", sorted_unique_values_with_count[0][0])
        col_with_mode.append([col, sorted_unique_values_with_count[0][0]])
    return col_with_mode

각 컬럼들에 대해 최빈값을 구한다. 

cat_cols = ['rbc', 'pc', 'pcc', 'ba', 'htn', 'dm', 'cad', 'appet', 'pe', 'ane']
col_with_mode = mode_of_pyspark_columns(data, cat_cols)
col_with_mode
[['rbc', 'normal'],
 ['pc', 'normal'],
 ['pcc', 'notpresent'],
 ['ba', 'notpresent'],
 ['htn', 'no'],
 ['dm', 'no'],
 ['cad', 'no'],
 ['appet', 'good'],
 ['pe', 'no'],
 ['ane', 'no']]

각 컬럼별로 반복문을 돌면서, null 인 경우 최빈값으로 replace 를 수행한다. 

for col, mode in col_with_mode:
    data = data.withColumn(col, when(data[col].isNull()==True, 
    lit(mode)).otherwise(data[col]))

결측값 치환이 잘 되었는지를 확인한다. 결측값이 없는 데이터가 되었음을 확인할 수 있다. 

# 각 컬럼별로 결측값 개수를 확인함 
# 만약 결측값 코딩이 0으로 되어 있는 경우, filter 조건을 변경하면 된다. 
result = pd.DataFrame({'variable':[], 'missing_count':[]})
for col in data.columns:
    df_new = pd.DataFrame({'variable':[], 'missing_count':[]})
    df_new.at[0, 'variable'] = col
    df_new.at[0, 'missing_count'] = data.filter(data[col].isNull()).count()
    result = pd.concat([result, df_new], axis=0)
    
result.astype({'missing_count': 'int32'}) # missing_count 을 int 형으로 변환함 
result['missing_rate'] = result['missing_count'] / data.count() # 결측률 계산 

# 결측률이 높은 순서대로 출력한다. 
# rbc 컬럼의 결측율이 38% 로 가장 높은 것을 확인할 수 있다. 
result.sort_values(by=['missing_rate'], ascending=False)

 

참고 자료

 

A Brief Introduction to PySpark

PySpark is a great language for performing exploratory data analysis at scale, building machine learning pipelines, and creating ETLs for…

towardsdatascience.com

 

 

 

 

 

 


PySpark DataFrame 을 사용하는 이유와 pandas 와의 차이점

/* DeepPlay 2022-08-26 */

 

왜 PySpark 인가?

 

한 마디로 빅데이터 환경에서 전통적인 데이터 처리 툴들 (R, pandas) 을 활용하기 어렵기 때문이다. 토이 데이터의 경우, 10GB 를 넘는 경우가 드물지만, 실제 회사의 빅데이터 환경에서는 하나의 데이터셋이 10GB 를 넘는 경우가 많으며, 크게는 10TB를 넘는 경우도 있다. 기본적으로 R과 python pandas 는 in-memory 처리 방식이다. 모든 데이터를 메모리에 적재한 후, 처리한다. 만약 램이 8GB 인 머신을 사용한다고 하면, 이러한 데이터들을 로드조차 하지 못하고, out-of-memory 에러로 커널이 죽는 모습을 확인할 수 있게 된다.

 

pyspark 환경에서는 메모리 사용량을 최소화하는 방식으로 용량이 크고, 포맷이 다양한 데이터들을 "특정 데이터 구조" 로 로드하고 처리하는 것이 가능하다. 즉, pyspark 는 시간 및 컴퓨팅 자원 측면에서 효율적으로 데이터 처리/분석을 할 수 있도록 도와준다.

 

만약, 데이터 분석 공부를 하거나, 큰 데이터를 접할 일이 없는 도메인에서 일을 하는 경우에는 R 또는 pandas 만 사용하여도 괜찮다. 하지만 빅데이터 환경에서 데이터를 처리하고 분석하는 것이 필요하다면, pyspark 를 활용하는 것이 더욱 효율적이며, 이것이 pyspark 를 배우면 좋은 이유이다. 

 

PySpark DataFrame

https://www.analyticsvidhya.com/blog/2016/10/spark-dataframe-and-operations/

 

pyspark 의 핵심 데이터 타입은 dataframe 이다. pyspark dataframe 은 쉽게 말해 여러 클러스터에 분산 되어 있는 테이블이라고 할 수 있다. 그 이름처럼 R이나 pandas의 dataframe 과 비슷한 함수들을 갖고 있다. 분산 실행을 위해서는 다른 데이터 타입이 아닌 pyspark 의 dataframe 객체를 이용하면 된다. pyspark dataframe을 활용하는 것은 R 과 pandas 와 거의 비슷하기 때문에, 둘 중 하나에 익숙한 경우 쉽게 활용할 수 있다. 

 

pyspark 와 pandas 의 큰 차이점 중 하나는 pyspark 는 lazy 하고, pandas 는 eager 하다는 것이다. pyspark 에서는 실제 결과가 필요할 때까지 실행을 유보한다 (lazy evaluation). 예를 들어, hive 환경에 있는 테이블을 읽어온 후, 특정 변수를 변환하는 코드를 짰다고 하자. 하지만 이 코드를 실행하는 즉시, 이 작업이 실행되지 않는다. 실제 데이터가 필요한 경우에만 이 작업이 실행횐다 예를 들어, 변환된 테이블을 다시 hive 환경에 파일로 저장하는 등의 경우를 들 수 있다. 이러한 방식이 좋은 점은 전체 데이터를 메모리에 저장하지 않아도 되기 때문에, 효율적으로 데이터를 처리할 수 있다. 반면, pandas 에서는 함수가 호출되는 즉시 실행되며 (eager evaluation), 모든 것은 메모리에 저장된다. pyspark 환경에서의 데이터 처리를 한다고 했을 때, eager operation 은 사용하는 것은 지양하는 것이, 리소스 절감 측면에서 바람직하다고 할 수 있다. 

 

 

참고 자료

여러 테이블을 Full outer join 하기

/* 2022-08-25 by DeepPlay */

 

결론: full outer join 은 3개 이상의 테이블에 대해서는 웬만해서는 쓰지 않는 것이 좋다.

만약 3개 이상의 테이블에 대해 full outer join 을 하려고 한다면, 다른 join 방법을 고려해보자. 

 

여러 테이블을 종합해서 하나의 테이블로 만들고 싶은데, 각 테이블들간 포함관계가 없어 full outer join을 해야하는 상황. 

예를 들면, 아래와 같은 4개의 테이블을 full outer join 을 해서 personid 와 row 개수가 1:1인 테이블을 만들고 싶은 상황이다. 

employee1
+---------------------+-----------------+--+
| employee1.personid  | employee1.name  |
+---------------------+-----------------+--+
| 111                 | aaa             |
| 222                 | bbb             |   
| 333                 | ccc             | 
+---------------------+-----------------+--+
employee2
+---------------------+----------------+--+
| employee2.personid  | employee2.sal  |
+---------------------+----------------+--+
| 111                 | 2              |
| 200                 | 3              |
+---------------------+----------------+--+
employee3
+---------------------+------------------+--+
| employee3.personid  | employee3.place  |
+---------------------+------------------+--+
| 111                 | bbsr             |
| 300                 | atl              |
| 200                 | ny               |
+---------------------+------------------+--+
employee4
+---------------------+---------------+--+
| employee4.personid  | employee4.dt  |
+---------------------+---------------+--+
| 111                 | 2019-02-21    |
| 300                 | 2019-03-18    |
| 400                 | 2019-03-18    |
+---------------------+---------------+--+

 

원하는 병합 테이블

+-----------+---------+--------+----------+-------------+--+
| personid  | f.name  | u.sal  | v.place  |   v_in.dt   |
+-----------+---------+--------+----------+-------------+--+
| 111       | aaa     | 2      | bbsr     | 2019-02-21  |
| 200       | NULL    | 3      | ny       | NULL        |
| 222       | bbb     | NULL   | NULL     | NULL        |
| 300       | NULL    | NULL   | atl      | 2019-03-18  |
| 333       | ccc     | NULL   | NULL     | NULL        |
| 400       | NULL    | NULL   | NULL     | 2019-03-18  |
+-----------+---------+--------+----------+-------------+--+

 

해결 방법

가장 깔끔한 방법은 유니크한 personid 를 갖는 병합 테이블을 하나 만들고, 이 테이블에 left outer join 을 하는 것이다. 

create table total_user as 
select ditinct account_id from (
    select distinct account_id from employee1
    union all 
    select distinct account_id from employee2
    union all 
    select distinct account_id from employee3
    union all 
    select distinct account_id from employee4
)

select t0.personid,
`name`, sal, place, dt
from total_user t0
left outer join employee1 t1 on t0.personid = t1.personid
left outer join employee1 t2 on t0.personid = t2.personid
left outer join employee1 t3 on t0.personid = t3.personid
left outer join employee1 t4 on t0.personid = t4.personid

 

참고) 잘못된 SQL

-> 이렇게 하게 되면, 테이블1과 계속해서 full outer join 을 하기 때문에, 중복된 personid 가 생기게 된다.

select coalesce(f.personid, u.personid, v.personid, v_in.personid) as personid,f.name,u.sal,v.place,v_in.dt
from employee1 f FULL OUTER JOIN employee2 u on f.personid=u.personid
FULL OUTER JOIN employee3 v on f.personid=v.personid
FULL OUTER JOIN employee4 v_in on f.personid=v_in.personid;

 

 

 


요약 & 의견 

 

1) 어떤 분야든 Self-report 는 100% 신뢰할 수 없다. (물론 분야별 신뢰도의 차이는 있겠지만..)

2) 세대별로 인터넷을 이용하는 방식은 다르지만, 거짓 정보에 대한 취약하다는 사실은 동일하다.


Lateral reading 이라고 불리는 읽기 습관은 사실 관계 확인 절차에 있어 핵심적인 부분이다.  (Lateral reading: 온라인 정보 파편의 사실 관계나 출처 등을 확인하기 위해, 수많은 브라우저 탭을 열고, 검색을 하는 등의 방법으로 다양한 정보를 습득하는 것을 의미) Poynter, YouGov, Google 의 최신 연구 결과는 Z세대 (Generation Z) 는 이러한 테크닉을 이전 어느 세대보다 잘 활용하고 있다는 것을 보여주었으며, 이는 매우 희망적인 결과라고 할 수 있다. 

 

2022년 8월 11일, 구글 검색 엔진팀은 다양한 연령대 및 국가별로 잘못된 정보를 처리하는 방법의 차이점에 대한 연구를 발표하였다. 이 연구에서는 8,000명 이상의 연구 대상자에게, 온라인 콘텐츠 조사 방법에 대한 설문 조사를 실시하였으며, Z 세대와 (18-25세) 부터 Silent Generation (>68세) 에 이르는 연령대별로, 7개 이상의 국가별로 수집한 설문 결과를 종합하였다. 

 

기본적으로, 이 연구의 결론은 젊은 사람일수록 의도치 않게 거짓 정보 또는 오해의 소지가 있는 정보가 공유 됐을 가능성을 생각하는 경향이 있다는 것이다. 그들은 또한 "고급 팩트 체킹 기술" 을 활용하는데에도 능숙하다고 한다.

3분의 1 이상의 Z 세대 응답자들중 사실 관계 확인을 위한 lateral reading 에 많은 시간을 투자한다고 응답한 비율은 베이비부머 세대의 응답자들보다 2배 이상 높았다. 또한 3분의 1이상의 젊은 사람들은 검색 결과 비교를 위해 다양한 검색 엔진을 활용하며, 검색 결과 2페이지 이상을 탐색한다고 답했다. 

 

62% 의 응답자들은 매주 잘못된 정보를 접하고 있다고 답했다. Z세대, 밀레니얼 세대, X세대의 응답자들은 잘못된 정보 판단에 있어 다른 세대 보다 상대적으로 더 자신감이 있었으며, 그들의 가족 또는 친구가 잘못된 정보를 믿지 않을까하는 걱정하는 경향을 보였다. 

 

그러나, 연구 결과는 설문 조사 결과에 의존하고 있고, 이는 대상자들이 믿는 자신의 신념과 습관에 불과하다. Z 세대의 "실제 습관" 과 관련된 수치는 이와 상반된다. 팩트 체킹을 연구하는 스탠포드 Sam Wineburg 교수에 따르면, 인터넷에서 사람들이 어떻게 행동하는지를 파악하기 위해 "자기 응답" (self-report) 을 활용하는 것은, 그의 표현을 빌리자면 "bullshit" 이라고 한다.  

 

"사람들이 실제로 하는 것과 한다고 말하는 것의 괴리는 사회 심리학 초기 연구 자료에서 쉽게 확인할 수 있다." 그는 말했다. 그의 연구 결과는 누군가의 개입이 없는 경우, 젊은 사람들은 자발적으로는 lateral reading 이나 사실 관계 확인을 거의 하지 않는다는 것을 보여주었다. 

 

Wineburg 와 그의 연구팀에 의해 수행된 최근 연구에서는 대학생들을 대상으로 사실 관계 확인 기술에 관한 온라인 수업이 사실 관계를 확인 습관에 도움을 줄 수 있을지를 확인하였다. 수업 전 사전 조사에서 87명중 불과 3명의 학생만이 lateral reading 을 능동적으로 하는 것으로 확인 되었다. 

 

 "만약 사람들이 자발적으로 lateral reading 을 한다면, 우리는 훨씬 더 나은 곳에 살 수 있을 겁니다." Wineburg 는 말했다. 

 

좀 더 큰 규모의 연구에서는 3,000명 이상의 고등학생들이 온라인내에 존재하는 일련의 주장들을 조사하도록 요청 받았다. 결과는 더욱 암울했다. 50% 이상의 학생들이 러시아에서 촬영된 익명의 페이스북 동영상을 보고, 이것이 "부정 투표" 에 관한 강력한 증거라고 믿었다. 

 

Z 세대는 다른 세대들과 인터넷을 다른 방식으로 이용하는 것은 확실하다. 그러나 Z 세대도 이전 세대들과 마찬가지로, 지난 몇 년간 온라인 생태계를 악화시킨, 정보의 함정, 전략적인 잘못된 정보의 영향에 취약하다. 

 

출처: https://www.technologyreview.com/2022/08/11/1057552/gen-z-misinformation/