Tools (124)

반응형

R - 컬럼별 동일한 함수 적용을 위한 lapply 테크닉

/* DeepPlay 2022-09-05 */

이전 포스트: apply 계열의 R 함수 정리 (포스팅)

 

데이터 처리 중, 각 컬럼별로 동일한 함수를 적용시키고 싶을 때가 있다. 예를 들면, string 형태로 저장된 컬럼들을 일괄적으로 numeric으로 바꾸고 싶다고 하자. 이 때, lapply 를 유용하게 사용할 수 있다. 

 

정보) lapply 는 vector, list 를 인풋으로 받아 list 를 아웃풋으로 내보낸다.   

 

아래 함수는 vars 에 지정된 컬럼들을 lapply 함수를 활용해 일괄적으로  numeric 형으로 변환하는 코드이다.  

# vars 에 numeric 으로 변환하고 싶은 컬럼 
data[,vars] <- lapply(vars, function(x){
  as.numeric(unlist(data[,x])) # [,x] 방식의 컬럼 선택은 output 을 list 형태로 반환한다. 
})

위 코드를 설명하면 우선 각 컬럼 x 별로 as.numeric 함수를 적용시켜 이 값을 list of vectors 로 반환한다. 그리고 이 반환값이 dataframe 의 컬럼값을 지정하도록 수행된다. (이러한 코드가 가능한 이유는 dataframe 이 기본적으로 list 의 결합이기 때문이다.)

반응형
반응형

PySpark DataFrame 을 이용한 탐색적 데이터 분석 및 처리 

/* DeepPlay 2022-08-26 */

 

* 본 포스팅에서는 pyspark 가 설치되어있음을 가정합니다. 

 

최초로 데이터를 받은 이후에, 가장 먼저할 작업은 탐색적 데이터 분석이다. 탐색적 데이터 분석을 통해 각 변수별로 처리 방법을 계획하여, 분석하기 쉬운 형태의 데이터로 만들게 된다. 이 과정에서 변수의 변환, 제거, 생성, 결측값 처리 등의 절차를 수행하게 된다. 본 포스팅은 pyspark 를 통해 데이터를 탐색적으로 확인하는 방법과 간단한 데이터 처리 방법 몇 가지를 다룬다. 

 

pyspark 를 통한 탐색적 분석 문법은 pandas 와 유사한 부분도 있고, 그렇지 않은 부분도 있다. 자주 등장하는 pyspark 의 특이적인 문법이 존재하는데, R, pandas 만 활용해온 사람이라면 이러한 부분에 점점 익숙해질 필요가 있을 것 같다. 

 

필요한 데이터 다운받기: Chronic Kidney Disease Dataset

https://archive.ics.uci.edu/ml/datasets/Chronic_Kidney_Disease

 

환경세팅

- MAC OS 에서 python 3.8 환경에 pyspark 를 설치하여 실습함 

# 데이터 다운로드 받기 
# https://www.kaggle.com/datasets/mansoordaku/ckdisease?resource=download
import pyspark
import pandas as pd
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
import os
import sys

# 환경변수 PYSPARK_PYTHON 와 PYSPARK_DRIVER_PYTHON 의 python path 를 동일하게 맞춰준다. 
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

# 환경 세팅
conf_spark = SparkConf().set("spark.driver.host", "127.0.0.1") # local 환경임을 명시적으로 지정해준다. 
sc = SparkContext(conf=conf_spark)

 

0. 데이터 로드

pyspark 에서 csv 파일을 load 하는 것은 아래 문법으로 수행할 수 있다. 

# data = pd.read_csv("data/kidney_disease.csv") # pandas 에서 csv 파일 읽기 
spark = SparkSession.builder.master("local[1]").appName("kidney_disease").getOrCreate()
data = spark.read.option("header", True).csv("data/kidney_disease.csv") # header가 있는 경우, option 구문 추가 
data_pd = pd.read_csv("data/kidney_disease.csv")

 

1. Schema: 데이터 스키마 확인하기

어떤 구조와 변수형으로 데이터가 들어가 있는지 먼저 파악한다. 

- 데이터를 직접 추출하여 만들었다면, 각 컬럼이 무엇을 의미하는지 알겠지만..
- 연습을 위한 토이 데이터이거나, 다른 사람한테 받은 데이터라면, 각 컬럼이 무엇을 의미하는지 이해하고 넘어갈 필요가 있다.

- kidney_disease 데이터의 변수명 의미 확인하기: https://archive.ics.uci.edu/ml/datasets/Chronic_Kidney_Disease

# 모든 필드가 string 타입으로 설정되어 있는 것을 확인할 수 있음  
data.printSchema()
root
 |-- id: string (nullable = true)
 |-- age: string (nullable = true)
 |-- bp: string (nullable = true)
 |-- sg: string (nullable = true)
 |-- al: string (nullable = true)
 |-- su: string (nullable = true)
 |-- rbc: string (nullable = true)

 

2. Show data: 데이터 확인하기

- csv 파일에서 공백인 경우에는 null 값으로 채워졌다는 것을 알 수 있다.

- 각 변수들의 타입을 눈으로 한 번 확인한다.(연속형, 범주형) 

data.show() 
# data.head() 도 사용 가능함
+---+----+-----+-----+----+----+--------+--------+----------+----------+-----+-----+----+-----+----+----+----+-----+----+---+---+---+-----+---+---+--------------+
| id| age|   bp|   sg|  al|  su|     rbc|      pc|       pcc|        ba|  bgr|   bu|  sc|  sod| pot|hemo| pcv|   wc|  rc|htn| dm|cad|appet| pe|ane|classification|
+---+----+-----+-----+----+----+--------+--------+----------+----------+-----+-----+----+-----+----+----+----+-----+----+---+---+---+-----+---+---+--------------+
|  0|48.0| 80.0| 1.02| 1.0| 0.0|    null|  normal|notpresent|notpresent|121.0| 36.0| 1.2| null|null|15.4|  44| 7800| 5.2|yes|yes| no| good| no| no|           ckd|
|  1| 7.0| 50.0| 1.02| 4.0| 0.0|    null|  normal|notpresent|notpresent| null| 18.0| 0.8| null|null|11.3|  38| 6000|null| no| no| no| good| no| no|           ckd|

 

3. Size: 데이터의 규모 확인하기

# 400 X 26 size  
print(f"Row count: {data.count()}")
print(f"Column count: {len(data.columns)}")
Row count: 400
Column count: 26

 

4. Univariate distribution (numeric): 단변수 분포 확인하기

#### string -> double 형변환

우선 연속형 변수를 double 형으로 변환해준다. 여기서도 withColumn 구문을 활용할 수 있다. 

# 연속형 변수들은 double 로 형변환 수행 
numeric_cols = ['age', 'bp', 'sg', 'al', 'su', 'bgr', 'bu', 'sc', 'sod', 'hemo', 'pcv', 'wc', 'rc']
for c in numeric_cols:
    data = data.withColumn(c, data[c].cast('double'))

#### describe 함수: 기본 통계치 추출 

data.describe(numeric_cols).show()
+-------+-----------------+-----------------+--------------------+------------------+-------------------+-----------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+
|summary|              age|               bp|                  sg|                al|                 su|              bgr|                bu|                sc|               sod|              hemo|              pcv|                wc|                rc|
+-------+-----------------+-----------------+--------------------+------------------+-------------------+-----------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+
|  count|              391|              388|                 353|               354|                351|              356|               381|               383|               313|               348|              329|               294|               269|
|   mean|51.48337595907928|76.46907216494846|  1.0174079320113256|1.0169491525423728|0.45014245014245013|148.0365168539326|57.425721784776904|3.0724543080939934|137.52875399361022|12.526436781609195|38.88449848024316| 8406.122448979591| 4.707434944237919|
| stddev|17.16971408926224|13.68363749352527|0.005716616974376756|1.3526789127628445|  1.099191251885407|79.28171423511773|50.503005849222504| 5.741126066859789|10.408752051798777| 2.912586608826765|8.990104814740933|2944.4741904103385|1.0253232655721791|
|    min|              2.0|             50.0|               1.005|               0.0|                0.0|             22.0|               1.5|               0.4|               4.5|               3.1|              9.0|            2200.0|               2.1|
|    max|             90.0|            180.0|               1.025|               5.0|                5.0|            490.0|             391.0|              76.0|             163.0|              17.8|             54.0|           26400.0|               8.0|
+-------+-----------------+-----------------+--------------------+------------------+-------------------+-----------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+

위 결과를 pandas dataframe 으로 변환하고, transpose 를 시켜 보기 쉬운 데이터 형태로 변환한다. 

# 첫번째 로우를 header 로 변경 
numeric_dist = data.describe(numeric_cols).toPandas().T
new_header = numeric_dist.iloc[0] #grab the first row for the header
numeric_dist = numeric_dist[1:] #take the data less the header row
numeric_dist.columns = new_header #set the header row as the df header

# 인덱스컬럼의 값을 추출해서 새로운 컬럼으로 지정
numeric_dist['col'] = numeric_dist.index.values
numeric_dist.reset_index(drop=True, inplace=True)

 

#### agg 함수 사용: quantile 값 추출 

- 아래와 같이 agg 함수를 활용해 연속형 변수의 quantile 값을 추출할 수 있다. 

- pyspark.sql.functions 의 expr 함수를 활용해, agg 함수 내에 원하는 집계 방법을 string 형태로 넣어서 quantile 을 구할 수 있다. 

import pyspark.sql.functions as F

perc = pd.DataFrame({'col':[], '%25':[], '%50':[], '%75':[], '%90':[], '%99':[]})
for c in numeric_cols :
    result_new = data.agg(
             F.expr('percentile(' + c + ',array(0.25))')[0].alias('%25'),
             F.expr('percentile(' + c + ',array(0.50))')[0].alias('%50'),
             F.expr('percentile(' + c + ',array(0.75))')[0].alias('%75'),
             F.expr('percentile(' + c + ',array(0.90))')[0].alias('%90'),
             F.expr('percentile(' + c + ',array(0.99))')[0].alias('%99'),
    ).toPandas()
    result_new['col'] = c
    perc = pd.concat([perc, result_new], axis=0)

#### 두 결과 테이블 조인하기 

- 위 두 테이블을 조인하여 최종적으로 원하는 형태의 연속형 변수 분포 테이블을 만들어 볼 수 있다. 

numeric_dist_merge = pd.merge(numeric_dist, perc, how="inner", left_on="col", right_on="col")
numeric_dist_merge[['col', 'count', 'mean', '%25', '%50', '%75', '%90', '%99']]

5. Distribution: 단변수 분포 확인하기 - Categorical variable

#### 범주형 변수에서 unique 한 값 추출하기

- 아래 스크립트에서 rdd.map 이 pyspark 특이적인 문법이라고 볼 수 있다. 

ㄴ rdd 는 dataframe 을 rdd 형태로 다룰 것임을 의미하며, rdd 에 lambda 함수를 적용해 결과를 추출한다. 

ㄴ collect 함수는 결과를 list 형태로 만들어준다. 

ㄴ 이러한 문법은 pandas 나 R 에서는 잘 나오지 않기 때문에 익숙해질 필요가 있을 것 같다. 

from pyspark.sql.functions import when
from pyspark.sql import functions as F

# Label 변수 확인하기 
print(f"labels: {data.select('classification').distinct().rdd.map(lambda r: r[0]).collect()}")
# data_pd['classification'].unique() # pandas 의 경우는 이와 같이 한다.
labels: ['notckd', 'ckd', 'ckd\t']

#### 분포 확인하기 

- sql.function 내 agg 함수 내에 원하는 집계 (count) 함수를 넣어, 아래와 같이 그룹별 행수를 계산할 수 있다. 

result = data.select(['classification']).\
   groupBy('classification').\
   agg(F.count('classification').alias('user_count')).toPandas()
result

 

6. Drop useless columns: 컬럼 드랍하기 (Option)

- 필요 없는 컬럼이 있는 경우 drop 함수로 드랍한다.

data = data.drop('pot')
print(f"Row count: {data.count()}")
print(f"Column count: {len(data.columns)}")
Row count: 400
Column count: 25

 

7. Subsetting: 원하는 행만 남기기 

- filter 함수를 통해 원하는 행만 남길 수 있다. 

data_ckd = data.filter(data['classification'] == 'ckd')
data_ckd.show()
+---+----+-----+-----+----+----+--------+--------+----------+----------+-----+-----+----+-----+----+----+-----+----+---+---+---+-----+---+---+--------------+
| id| age|   bp|   sg|  al|  su|     rbc|      pc|       pcc|        ba|  bgr|   bu|  sc|  sod|hemo| pcv|   wc|  rc|htn| dm|cad|appet| pe|ane|classification|
+---+----+-----+-----+----+----+--------+--------+----------+----------+-----+-----+----+-----+----+----+-----+----+---+---+---+-----+---+---+--------------+
|  0|48.0| 80.0| 1.02| 1.0| 0.0|    null|  normal|notpresent|notpresent|121.0| 36.0| 1.2| null|15.4|  44| 7800| 5.2|yes|yes| no| good| no| no|           ckd|
|  1| 7.0| 50.0| 1.02| 4.0| 0.0|    null|  normal|notpresent|notpresent| null| 18.0| 0.8| null|11.3|  38| 6000|null| no| no| no| good| no| no|           ckd|

 

8. Check missing values: 컬럼별 결측값 개수/결측률 확인하기

- 이 데이터에서는 결측값이 null 로 들어가 있다. 

- 컬럼별로 결측값 개수를 구하는 것은 다양한 방식으로 구현할 수 있지만, 가장 간단하게 for 을 활용하는 방법은 아래와 같다. 

# 각 컬럼별로 결측값 개수를 확인함 
# 만약 결측값 코딩이 0으로 되어 있는 경우, filter 조건을 변경하면 된다. 
result = pd.DataFrame({'variable':[], 'missing_count':[]})
for col in data.columns:
    df_new = pd.DataFrame({'variable':[], 'missing_count':[]})
    df_new.at[0, 'variable'] = col
    df_new.at[0, 'missing_count'] = data.filter(data[col].isNull()).count()
    result = pd.concat([result, df_new], axis=0)
    
result.astype({'missing_count': 'int32'}) # missing_count 을 int 형으로 변환함 
result['missing_rate'] = result['missing_count'] / data.count() # 결측률 계산
# 결측률이 높은 순서대로 출력한다. 
# rbc 컬럼의 결측율이 38% 로 가장 높은 것을 확인할 수 있다. 
result.sort_values(by=['missing_rate'], ascending=False)

 

9. Replace: 값 치환하기 

- 데이터를 다루다보면 종종 값을 치환해야할 경우가 있다. (값이 잘못 들어가 있는 경우, 카테고리를 합치는 경우 등..)

- withColumn 구문과 when 구문을 합쳐여 값 치환을 구현해볼 수 있다. 

 

# label 에 'ckd\t' 로 데이터가 잘못 들어가 있는 것을 확인할 수 있다. 
# 이를 when 구문을 통해 원하는 값으로 바꾸어준다. 
data = data.withColumn("classification", \
              when(data["classification"] == 'ckd\t', 'ckd').otherwise(data["classification"]))

# 재출력하기 
print(f"처리후 labels: {data.select('classification').distinct().rdd.map(lambda r: r[0]).collect()}")
처리후 labels: ['notckd', 'ckd']

 

10. Imputation: 평균대치법 (for numeric variable)

- 본 데이터셋은 결측값이 null 로 채워져 있다. 평균 대치법으로 결측값을 처리하자. 

- 먼저, 특정 컬럼의 평균을 구하는 함수를 작성한다. 

ㄴ pyspark 에서는 이와 관련한 readymade 함수가 없기 때문에 직접 작성한다. 

- 여기서도 df.select(avg(df[col]))) 이나 rdd.map 같은 pyspark 특이적 문법이 등장한다. 

from pyspark.sql.functions import avg
from pyspark.sql.functions import when, lit

# 컬럼별 평균값 계산하기 
def mean_of_pyspark_columns(df, numeric_cols, verbose=False):
    col_with_mean=[]
    for col in numeric_cols:
        mean_value = df.select(avg(df[col]))
        avg_col = mean_value.columns[0]
        res = mean_value.rdd.map(lambda row : row[avg_col]).collect()
        
        if (verbose==True): print(mean_value.columns[0], "\t", res[0])
        col_with_mean.append([col, res[0]])    
    return col_with_mean

각 컬럼들에 대해 평균값을 구한다. 

numeric_cols = ['age', 'bp', 'sg', 'al', 'su', 'bgr', 'bu', 'sc', 'sod', 'hemo', 'pcv', 'wc', 'rc']
col_with_mean = mean_of_pyspark_columns(data, numeric_cols)
col_with_mean
[['age', 51.48337595907928],
 ['bp', 76.46907216494846],
 ['sg', 1.0174079320113256],
 ['al', 1.0169491525423728],
 ['su', 0.45014245014245013],
 ['bgr', 148.0365168539326],
 ['bu', 57.425721784776904],
 ['sc', 3.0724543080939934],
 ['sod', 137.52875399361022],
 ['hemo', 12.526436781609195],
 ['pcv', 38.88449848024316],
 ['wc', 8406.122448979591],
 ['rc', 4.707434944237919]]

각 컬럼별로 반복문을 돌면서, null 인 경우 평균으로 replace 를 수행한다. 

# with column when 구문으로 null 일 때, 평균으로 치환한다. 
for col, mean in col_with_mean:
    data = data.withColumn(col, when(data[col].isNull() == True, 
    lit(mean)).otherwise(data[col]))

 

11. Imputation: Mode 대치 (for categorical variable)

- 본 데이터셋은 결측값이 null 로 채워져 있다. 최빈값 대치법으로 결측값을 처리해보자. 

- 마찬가지로 컬럼별로 최빈값을 구하는 함수를 작성한다. 

def mode_of_pyspark_columns(df, cat_col_list, verbose=False):
    col_with_mode=[]
    for col in cat_col_list:
        #Filter null
        df = df.filter(df[col].isNull()==False)
        #Find unique_values_with_count
        unique_classes = df.select(col).distinct().rdd.map(lambda x: x[0]).collect()
        unique_values_with_count=[]
        for uc in unique_classes:
             unique_values_with_count.append([uc, df.filter(df[col]==uc).count()])
        #sort unique values w.r.t their count values
        sorted_unique_values_with_count= sorted(unique_values_with_count, key = lambda x: x[1], reverse =True)
        
        if (verbose==True): print(col, sorted_unique_values_with_count, " and mode is ", sorted_unique_values_with_count[0][0])
        col_with_mode.append([col, sorted_unique_values_with_count[0][0]])
    return col_with_mode

각 컬럼들에 대해 최빈값을 구한다. 

cat_cols = ['rbc', 'pc', 'pcc', 'ba', 'htn', 'dm', 'cad', 'appet', 'pe', 'ane']
col_with_mode = mode_of_pyspark_columns(data, cat_cols)
col_with_mode
[['rbc', 'normal'],
 ['pc', 'normal'],
 ['pcc', 'notpresent'],
 ['ba', 'notpresent'],
 ['htn', 'no'],
 ['dm', 'no'],
 ['cad', 'no'],
 ['appet', 'good'],
 ['pe', 'no'],
 ['ane', 'no']]

각 컬럼별로 반복문을 돌면서, null 인 경우 최빈값으로 replace 를 수행한다. 

for col, mode in col_with_mode:
    data = data.withColumn(col, when(data[col].isNull()==True, 
    lit(mode)).otherwise(data[col]))

결측값 치환이 잘 되었는지를 확인한다. 결측값이 없는 데이터가 되었음을 확인할 수 있다. 

# 각 컬럼별로 결측값 개수를 확인함 
# 만약 결측값 코딩이 0으로 되어 있는 경우, filter 조건을 변경하면 된다. 
result = pd.DataFrame({'variable':[], 'missing_count':[]})
for col in data.columns:
    df_new = pd.DataFrame({'variable':[], 'missing_count':[]})
    df_new.at[0, 'variable'] = col
    df_new.at[0, 'missing_count'] = data.filter(data[col].isNull()).count()
    result = pd.concat([result, df_new], axis=0)
    
result.astype({'missing_count': 'int32'}) # missing_count 을 int 형으로 변환함 
result['missing_rate'] = result['missing_count'] / data.count() # 결측률 계산 

# 결측률이 높은 순서대로 출력한다. 
# rbc 컬럼의 결측율이 38% 로 가장 높은 것을 확인할 수 있다. 
result.sort_values(by=['missing_rate'], ascending=False)

 

참고 자료

 

A Brief Introduction to PySpark

PySpark is a great language for performing exploratory data analysis at scale, building machine learning pipelines, and creating ETLs for…

towardsdatascience.com

 

 

 

 

 

 

반응형
반응형


PySpark DataFrame 을 사용하는 이유와 pandas 와의 차이점

/* DeepPlay 2022-08-26 */

 

왜 PySpark 인가?

 

한 마디로 빅데이터 환경에서 전통적인 데이터 처리 툴들 (R, pandas) 을 활용하기 어렵기 때문이다. 토이 데이터의 경우, 10GB 를 넘는 경우가 드물지만, 실제 회사의 빅데이터 환경에서는 하나의 데이터셋이 10GB 를 넘는 경우가 많으며, 크게는 10TB를 넘는 경우도 있다. 기본적으로 R과 python pandas 는 in-memory 처리 방식이다. 모든 데이터를 메모리에 적재한 후, 처리한다. 만약 램이 8GB 인 머신을 사용한다고 하면, 이러한 데이터들을 로드조차 하지 못하고, out-of-memory 에러로 커널이 죽는 모습을 확인할 수 있게 된다.

 

pyspark 환경에서는 메모리 사용량을 최소화하는 방식으로 용량이 크고, 포맷이 다양한 데이터들을 "특정 데이터 구조" 로 로드하고 처리하는 것이 가능하다. 즉, pyspark 는 시간 및 컴퓨팅 자원 측면에서 효율적으로 데이터 처리/분석을 할 수 있도록 도와준다.

 

만약, 데이터 분석 공부를 하거나, 큰 데이터를 접할 일이 없는 도메인에서 일을 하는 경우에는 R 또는 pandas 만 사용하여도 괜찮다. 하지만 빅데이터 환경에서 데이터를 처리하고 분석하는 것이 필요하다면, pyspark 를 활용하는 것이 더욱 효율적이며, 이것이 pyspark 를 배우면 좋은 이유이다. 

 

PySpark DataFrame

https://www.analyticsvidhya.com/blog/2016/10/spark-dataframe-and-operations/

 

pyspark 의 핵심 데이터 타입은 dataframe 이다. pyspark dataframe 은 쉽게 말해 여러 클러스터에 분산 되어 있는 테이블이라고 할 수 있다. 그 이름처럼 R이나 pandas의 dataframe 과 비슷한 함수들을 갖고 있다. 분산 실행을 위해서는 다른 데이터 타입이 아닌 pyspark 의 dataframe 객체를 이용하면 된다. pyspark dataframe을 활용하는 것은 R 과 pandas 와 거의 비슷하기 때문에, 둘 중 하나에 익숙한 경우 쉽게 활용할 수 있다. 

 

pyspark 와 pandas 의 큰 차이점 중 하나는 pyspark 는 lazy 하고, pandas 는 eager 하다는 것이다. pyspark 에서는 실제 결과가 필요할 때까지 실행을 유보한다 (lazy evaluation). 예를 들어, hive 환경에 있는 테이블을 읽어온 후, 특정 변수를 변환하는 코드를 짰다고 하자. 하지만 이 코드를 실행하는 즉시, 이 작업이 실행되지 않는다. 실제 데이터가 필요한 경우에만 이 작업이 실행횐다 예를 들어, 변환된 테이블을 다시 hive 환경에 파일로 저장하는 등의 경우를 들 수 있다. 이러한 방식이 좋은 점은 전체 데이터를 메모리에 저장하지 않아도 되기 때문에, 효율적으로 데이터를 처리할 수 있다. 반면, pandas 에서는 함수가 호출되는 즉시 실행되며 (eager evaluation), 모든 것은 메모리에 저장된다. pyspark 환경에서의 데이터 처리를 한다고 했을 때, eager operation 은 사용하는 것은 지양하는 것이, 리소스 절감 측면에서 바람직하다고 할 수 있다. 

 

 

참고 자료

반응형
반응형

여러 테이블을 Full outer join 하기

/* 2022-08-25 by DeepPlay */

 

결론: full outer join 은 3개 이상의 테이블에 대해서는 웬만해서는 쓰지 않는 것이 좋다.

만약 3개 이상의 테이블에 대해 full outer join 을 하려고 한다면, 다른 join 방법을 고려해보자. 

 

여러 테이블을 종합해서 하나의 테이블로 만들고 싶은데, 각 테이블들간 포함관계가 없어 full outer join을 해야하는 상황. 

예를 들면, 아래와 같은 4개의 테이블을 full outer join 을 해서 personid 와 row 개수가 1:1인 테이블을 만들고 싶은 상황이다. 

employee1
+---------------------+-----------------+--+
| employee1.personid  | employee1.name  |
+---------------------+-----------------+--+
| 111                 | aaa             |
| 222                 | bbb             |   
| 333                 | ccc             | 
+---------------------+-----------------+--+
employee2
+---------------------+----------------+--+
| employee2.personid  | employee2.sal  |
+---------------------+----------------+--+
| 111                 | 2              |
| 200                 | 3              |
+---------------------+----------------+--+
employee3
+---------------------+------------------+--+
| employee3.personid  | employee3.place  |
+---------------------+------------------+--+
| 111                 | bbsr             |
| 300                 | atl              |
| 200                 | ny               |
+---------------------+------------------+--+
employee4
+---------------------+---------------+--+
| employee4.personid  | employee4.dt  |
+---------------------+---------------+--+
| 111                 | 2019-02-21    |
| 300                 | 2019-03-18    |
| 400                 | 2019-03-18    |
+---------------------+---------------+--+

 

원하는 병합 테이블

+-----------+---------+--------+----------+-------------+--+
| personid  | f.name  | u.sal  | v.place  |   v_in.dt   |
+-----------+---------+--------+----------+-------------+--+
| 111       | aaa     | 2      | bbsr     | 2019-02-21  |
| 200       | NULL    | 3      | ny       | NULL        |
| 222       | bbb     | NULL   | NULL     | NULL        |
| 300       | NULL    | NULL   | atl      | 2019-03-18  |
| 333       | ccc     | NULL   | NULL     | NULL        |
| 400       | NULL    | NULL   | NULL     | 2019-03-18  |
+-----------+---------+--------+----------+-------------+--+

 

해결 방법

가장 깔끔한 방법은 유니크한 personid 를 갖는 병합 테이블을 하나 만들고, 이 테이블에 left outer join 을 하는 것이다. 

create table total_user as 
select ditinct account_id from (
    select distinct account_id from employee1
    union all 
    select distinct account_id from employee2
    union all 
    select distinct account_id from employee3
    union all 
    select distinct account_id from employee4
)

select t0.personid,
`name`, sal, place, dt
from total_user t0
left outer join employee1 t1 on t0.personid = t1.personid
left outer join employee1 t2 on t0.personid = t2.personid
left outer join employee1 t3 on t0.personid = t3.personid
left outer join employee1 t4 on t0.personid = t4.personid

 

참고) 잘못된 SQL

-> 이렇게 하게 되면, 테이블1과 계속해서 full outer join 을 하기 때문에, 중복된 personid 가 생기게 된다.

select coalesce(f.personid, u.personid, v.personid, v_in.personid) as personid,f.name,u.sal,v.place,v_in.dt
from employee1 f FULL OUTER JOIN employee2 u on f.personid=u.personid
FULL OUTER JOIN employee3 v on f.personid=v.personid
FULL OUTER JOIN employee4 v_in on f.personid=v_in.personid;

 

반응형
반응형

 

R - aggregate / separate_rows

 

데이터 처리 도중, 특정 key 를 기준으로, 문자를 리스트형태로 바꾸고 싶을 때가 있다. 

이 때, R 에서 사용할 수 있는 함수가 aggregate 와 separate_rows 이다. 

(hive 에서는 collect_set 과 explode 함수가 비슷한 기능을 함)

 

왼쪽 테이블 -> 오른쪽 테이블로: separate_rows

오른쪽 테이블 -> 왼쪽 테이블로: aggregate

library(tidyverse)
df <- data.frame(Family_ID = 1:2,
  name = c("Smith, John", "Walker, Mike"),
  stringsAsFactors = FALSE)
  
df2 <- df %>% separate_rows(name)
df2
#   Family_ID name  
#      1 Smith 
#      1 John  
#      2 Walker
#      2 Mike  

df3 <- aggregate(name ~ Family_ID, df2, toString)
df3
#  Family_ID name
#      1  Smith, John
#      2 Walker, Mike
반응형
반응형

R - 난수 생성을 일별로 변경하기

 

R 에서 난수를 생성 또는 랜덤 샘플링 작업의 결과가 일별로 바뀌도록 하고 싶을 때가 있다. 

방법은 간단하게 일별로 random seed 를 동일하게 맞춰주면 된다. 

특정 날짜 '01/06/2022' 를 integer 형으로 변환하면 일별로 동일한 숫자가 나오도록 구현할 수 있다. 

 

library(tidyverse)
dayYear <- as.Date(Sys.Date(),format='%d/%m/%Y') %>% lubridate::yday() %>% as.integer()
set.seed(dayYear)

sample(nrow(10)) # 같은 날에는 동일한 순서의 숫자 10개가 나온다.

 

주의할점은 값이 정수를 갖도록 as.integer 함수를 통해 변환해주어야한다.

(만약 double 인 경우, 실제 시드는 매번 달라진다. 이는 컴퓨터가 double 형을 메모리에 저장하는 방식 때문일듯하다.)

반응형
반응형

 

아나콘다 설치 이후 프로세스

ㄴ 파이썬 3.8 버전 설치 가정

 

1. python3.8 가상환경 생성

conda create -n py38 python=3.8

 

2. 가상환경 활성화

source activate py38

 

3. ipykernel 설치

pip install ipykernel

 

4. 가상환경을 ipykernel 에 등록

python -m ipykernel install --user --name py38 --display-name py38

 

5. 주피터 노트북 or 주피터랩 실행

conda install jupyter
jupyter lab

반응형
반응형

theme_bw(base_size = 12, base_family = "Kakao Regular") 또는

par("Kakao Regular") 를 적용해도 한글이 나오지 않는 이슈가 발생

library(showtext)
showtext_auto()

위 블록을 통해 해결할 수 있었음

반응형
반응형

Ubuntu 에 Python 새로운 버전 설치하기


sudo apt update
sudo apt install build-essential zlib1g-dev libncurses5-dev libgdbm-dev libnss3-dev libssl-dev libreadline-dev libffi-dev wget

필요한 패키지를 받습니다. 


cd /tmp wget https://www.python.org/ftp/python/3.7.2/Python-3.7.2.tar.xz


설치파일을 다운로드 받습니다. 


tar -xf Python-3.7.2.tar.xz cd Python-3.7.2 ./configure --enable-optimizations


설치 파일의 압축을 풀고, 인스톨 준비를 합니다. 


make -j 1
sudo make altinstall

-j 1 은 1 개의 CPU 를 이용해서 build 하겠다는 것입니다.포인트는 sudo make altinstall 을 통해 버전을 따로 관리하는 것입니다. sudo make install 을 하면, 기존 파이썬을 덮어써버리게 되므로 주의해야합니다. 이후, 커맨드 창에 python3.7 을 입력해 잘 설치되었는지 확인합니다. 


특정 버전에 pip 를 통해 패키지 설치하는 법 

예를 들어, beatifulsoup4 패키지를 설치하려면 아래와 같이 합니다. pip 자체가 파이썬 코드이기 때문에 이런식으로 원하는 파이썬 버전을 통해 pip 를 실행시켜주면 됩니다. 

python3.7 -m pip install beautifulsoup4

References
https://websiteforstudents.com/installing-the-latest-python-3-7-on-ubuntu-16-04-18-04/


반응형
반응형


R 에서 한글 파일 쉽게 읽어오는 팁


운영체제 별로 다른 파일 인코딩으로 저장되는 문제로 인해, R 에서 한글이 인코딩 된 파일을 읽어올 때 문제가 자주 생깁니다. 아예 읽어오지 못하는 경우도 있고, 읽어와도 프린트 했을 때, 한글이 깨져 있는 경우가 많은데요. 특히 EUC-KR 로 인코딩 된 파일의 경우, data.table의 fread 나 readr의 read_csv 를 이용하기가 힘듭니다. 


이런 상황에서 readAny 패키지의 read.any 함수를 이용하면 delimeter 로 구분된 text 파일이나, csv 파일 등을 쉽게 읽어올 수 있습니다 (또는 패키지 설치를 하지 않고 함수를 변수로 저장한 다음 사용하셔도 됩니다). 출처는 이곳입니다. readr 패키지의 guess_encoding 함수를 이용해 파일 인코딩을 알아낸 후, 이 정보를 이용해 read.table 로 읽어오는 방식입니다. 그리고 확장자에 맞게 delimter 를 지정하는 로직까지 있습니다. 

library(devtools)

install_github("plgrmr/readAny", force = TRUE)
library(readAny)

read.any("http://philogrammer.com/melon10_euc.csv", header = TRUE)

library(readr) read.any <- function(text, sep = "", ...) { encoding <- as.character(guess_encoding(text)[1,1]) setting <- as.character(tools::file_ext(text)) if(sep != "" | !(setting %in% c("csv", "txt")) ) setting <- "custom" separate <- list(csv = ",", txt = "\n", custom = sep) result <- read.table(text, sep = separate[[setting]], fileEncoding = encoding, ...) return(result) }

philogrammer 님의 방법에 추가적으로, 한 함수를 통해 엑셀 파일까지 읽어오기 위해 아래와 같이 변형해서 사용하였습니다. 

read_any <- function(text, sep = "", ...) {
  encoding <- as.character(guess_encoding(text)[1,1])
  setting <- as.character(tools::file_ext(text))
  
  if(setting == 'xlsx'){
      result <- read_excel(text)
  }
  else {
      if(sep != "" | !(setting  %in% c("csv", "txt")) ) setting <- "custom"
      separate <- list(csv = ",", txt = "\n", custom = sep)
      result <- read.table(text, sep = separate[[setting]], fileEncoding = encoding, ...)
  }
  return(result)
}

참고자료

http://philogrammer.com/2017-03-15/encoding/

반응형
반응형