Tools (124)

반응형

 

어떤 컬럼의 값이 아래와 같은 문자열로 저장되어있을 때

["2021_12","2022_3","2022_1","2022_12","2023_4"....] 

 

해당 문자열 컬럼을 벡터컬럼으로 바꾸고 해당값을 unnest 하는 예시 

하나의 컬럼 값이 벡터형테인 경우 nested 라고 하고, 이를 row 로 변경하는 것을 unnest 라고 한다. 

 

# 문자열 parsing하여 year와 month로 분리하고 각 row로 만들기

df$dates <- lapply(df$month_ids, function(x) {
  unlist(fromJSON(x, simplifyVector = TRUE))
})
df<- df%>% 
  mutate(month_id = map(dates, str_split, pattern = ",")) %>%
  unnest(month_id)



반응형

'Tools > R' 카테고리의 다른 글

R - dictionary 만들기  (0) 2023.03.15
R - 변수 bucketing (카테고리화)  (0) 2023.03.10
R - lag 변수 만들기  (0) 2023.03.10
R - 반복문 대신 사용하는 lapply 패턴  (0) 2023.03.10
R - na to zero  (0) 2023.03.09

Tools/R

R - dictionary 만들기

2023. 3. 15. 18:59
반응형

List 를 이용한 방법

# dictionary 생성
dict <- list(name = "John", age = 30, city = "New York")

# dictionary 사용
dict$name
# [1] "John"

dict$age
# [1] 30

dict$city
# [1] "New York"

 

vector 를 활용한 방법

ㄴ setNames 함수를 활용

# dictionary 생성
dict <- setNames(c("John", 30, "New York"), c("name", "age", "city"))

# dictionary 사용
dict$name
# [1] "John"

dict$age
# [1] 30

dict$city
# [1] "New York"

 

hash 함수를 활용한 방법

library(hash)
h <- hash() 
h[['a']] <- 'a'
h[['b']] <- 'b'
h[['c']] <- 'c'
h[['d']] <- 'd'

h[['a']]

 

반응형

'Tools > R' 카테고리의 다른 글

R - 리스트 문자열을 벡터로 바꾸고 unnest 하기  (0) 2024.03.06
R - 변수 bucketing (카테고리화)  (0) 2023.03.10
R - lag 변수 만들기  (0) 2023.03.10
R - 반복문 대신 사용하는 lapply 패턴  (0) 2023.03.10
R - na to zero  (0) 2023.03.09
반응형

R 에서 특정 변수를 카테고리화 하고 싶을 때가 많다. 

 

다양한 방법이 있지만, 

아래 cut 함수를 사용하는 코드로 0~5, 6~10, 11~15, ... >100 으로 카테고리화가 가능하다. 

cat <- seq(0,100,5)
df$cat <- cut(df$x, breaks = c(cat, Inf), labels = cat)
df$cat <- factor(df$cat, levels=cat)

-> breaks 의 element 보다 labels 의 elements 의 갯수가 1개 적다. 

 

좀 더 일반적으로는 다음과 같다.

# 예시 데이터 생성
set.seed(123)
data <- data.frame(id = 1:10, value = rnorm(10, mean = 50, sd = 10))

# 카테고리화
data$cat <- cut(data$value, breaks = c(0, 25, 50, 75, 100), labels = c("low", "medium-low", "medium-high", "high"))

 

반응형

'Tools > R' 카테고리의 다른 글

R - 리스트 문자열을 벡터로 바꾸고 unnest 하기  (0) 2024.03.06
R - dictionary 만들기  (0) 2023.03.15
R - lag 변수 만들기  (0) 2023.03.10
R - 반복문 대신 사용하는 lapply 패턴  (0) 2023.03.10
R - na to zero  (0) 2023.03.09

Tools/R

R - lag 변수 만들기

2023. 3. 10. 04:03
반응형

Hmisc 의 Lag 변수를 통해 timeseries 데이터의 lag 변수를 만들 수 있다. 

만약, 그룹별 Lag 변수를 만들고 싶으면 dplyr group_by 를 통해 만들 수 있다. 

library(Hmisc)
data <- data %>% group_by(gender, age) %>% mutate(lag = Lag(variable, 1))
반응형

'Tools > R' 카테고리의 다른 글

R - dictionary 만들기  (0) 2023.03.15
R - 변수 bucketing (카테고리화)  (0) 2023.03.10
R - 반복문 대신 사용하는 lapply 패턴  (0) 2023.03.10
R - na to zero  (0) 2023.03.09
R - 컬럼별 동일한 함수 적용을 위한 lapply 테크닉  (0) 2022.09.05
반응형

반복문을 돌면서 여러개의 dataframe 을 만들고, 

이것들을 합쳐서 최종적인 결과 dataframe 을 만드는 경우 아래와 같이 함

 

brand_names <- c("BBQ", "BHC")
tmp_dfs <- lapply(brand_names, function(x){
  p <- paste0(x, "_payment")
  b <- paste0(x, "_buzz")
  tmp_df[1, 'brand'] <- x
  tmp_df[1, 'cor'] <- cor.test(merged[,p], merged[,b])$estimate
  tmp_df
})

df_result <- bind_rows(tmp_dfs)
df_result

출력
> df_result
        cor brand
1 0.6971354   BBQ
2 0.4675438   BHC

반응형

'Tools > R' 카테고리의 다른 글

R - 변수 bucketing (카테고리화)  (0) 2023.03.10
R - lag 변수 만들기  (0) 2023.03.10
R - na to zero  (0) 2023.03.09
R - 컬럼별 동일한 함수 적용을 위한 lapply 테크닉  (0) 2022.09.05
R - aggregate / separate_rows  (0) 2022.06.21

Tools/R

R - na to zero

2023. 3. 9. 22:39
반응형

 

모든 na값을 0로 바꾸기

df[is.na(df)] <- 0

 

특정 컬럼의 na 값을 0으로 바꾸기

df[is.na(df$col_name), 'col_name'] <- 0

 

여러 컬럼의 na 값을 0으로 바꾸기

impute_var <- c('a','b','c')
df[impute_var][is.na(df[impute_var])] <- 0

 

반응형
반응형

 

Pyspark export to delimited file

def myConcat(*cols):
    concat_columns = []
    for c in cols[:-1]:
        concat_columns.append(F.coalesce(c, F.lit("*")))
        concat_columns.append(F.lit("\t"))  
    concat_columns.append(F.coalesce(cols[-1], F.lit("*")))
    return F.concat(*concat_columns)

# combined column 에 모든 변수를 \t 로 concat 한 값 저장 
data_text = data.withColumn("combined", myConcat(*data.columns)).select("combined")
data_text.coalesce(1).write.format("text").option("header", "false").mode("overwrite").save(path)

출처 : https://stackoverflow.com/questions/17837871/how-to-copy-file-from-hdfs-to-the-local-file-system

반응형
반응형

가끔 .gitignore 파일에 올리고 싶지 않은 파일이나 폴더를 지정후, remote repo 에 push 를 해도 적용되지 않을 때가 있다. 

이 때는 캐시를 지우고 시도한다. 

git rm --cached -r .
git add .

 

반응형
반응형

 

Jupyter notebook에서 python 파일을 자동으로 갱신하는 방법

 

예를 들어 jupyter notebook 파일에서 model.py 라는 파일을 import 해서 사용하고 있다고 하자. 

만약 model.py 파일을 수정하면, 커널을 재시작한 후, 다시 import 를 해야한다. 

아래 구문을 통해 model.py 파일이 수정되는 즉시 적용되도록 만들 수 있다. 

%load_ext autoreload
%autoreload 2
반응형
반응형

Pyspark MLlib 를 활용한 모델링 기초 (w/ Random Forest)

/* DeepPlay 2022-09-20 */

 

개요

1) 배경: scalable machine learning 이란?

근래 빅데이터쪽에서 scalability 라는 개념이 buzzword 중 하나이다. scalability는 스케일 아웃에 관한 것이며, 스케일 아웃이란 "처리 능력을 향상시키기 위해 트래픽을 분산시킬 서버의 수를 늘리는 방법" 을 의미한다. 즉, scalable 머신러닝이란, 머신러닝을 빠르게 처리하기 위해 여러 컴퓨터에 분산시켜 일을 시키는 것으로 정의해볼 수 있다. 빅데이터를 어떻게 효율적으로 처리할 것인가? 와 관련한 논의 및 발전이 머신러닝 쪽에서도 이루어지고 있다고 볼 수 있겠다. 

 

2) 데이터

필요한 데이터 다운받기: Chronic Kidney Disease Dataset

https://archive.ics.uci.edu/ml/datasets/Chronic_Kidney_Disease

 

3) 요점 

Pyspark MLlib 를 사용할 때 아래 사항을 주의해야함

1. Assembling : 모든 피쳐를 하나의 컬럼에 리스트 형태로 몰아 넣는 것 

2. Type casting: Assembling 한 피쳐는 동일한 타입이어야함. 

3. Missing value handling: Assebling 한 피쳐는 결측이 없어야 함 (NA 가 아닌 0 또는 imputation 된 값이어야 함)

 

0. Load data 

# 데이터 다운로드 받기 
# https://www.kaggle.com/datasets/mansoordaku/ckdisease?resource=download
import pyspark
import pandas as pd
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
import os
import sys

# 환경변수 PYSPARK_PYTHON 와 PYSPARK_DRIVER_PYTHON 의 python path 를 동일하게 맞춰준다. 
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

# 환경 세팅
conf_spark = SparkConf().set("spark.driver.host", "127.0.0.1") # local 환경임을 명시적으로 지정해준다. 
sc = SparkContext(conf=conf_spark)

# data = pd.read_csv("data/kidney_disease.csv") # pandas 에서 csv 파일 읽기 
spark = SparkSession.builder.master("local[1]").appName("kidney_disease").getOrCreate()
data = spark.read.option("header", True).csv("data/kidney_disease.csv") # header가 있는 경우, option 구문 추가 
data_pd = pd.read_csv("data/kidney_disease.csv")

1. Preprocessing

#### Imputation : Categorical variable

각 변수별 결측치 개수 확인하기

# 범주형 변수 리스트 
cat_cols = ['al', 'su', 'rbc', 'pc', 'pcc', 'ba', 'htn', 'dm', 'cad', 'appet', 'pe', 'ane']

# 각 컬럼별로 결측값 개수를 확인함 
# 만약 결측값 코딩이 0으로 되어 있는 경우, filter 조건을 변경하면 된다. 
result = pd.DataFrame({'variable':[], 'missing_count':[]})
for col in data.columns:
    df_new = pd.DataFrame({'variable':[], 'missing_count':[]})
    df_new.at[0, 'variable'] = col
    df_new.at[0, 'missing_count'] = data.filter(data[col].isNull()).count()
    result = pd.concat([result, df_new], axis=0)
    
result.astype({'missing_count': 'int32'}) # missing_count 을 int 형으로 변환함 
result['missing_rate'] = result['missing_count'] / data.count() # 결측률 계산 
result

각 변수별 최빈값 Imputation

from pyspark.sql.functions import when, lit
def mode_of_pyspark_columns(df, cat_col_list, verbose=False):
    col_with_mode=[]
    for col in cat_col_list:
        #Filter null
        df = df.filter(df[col].isNull()==False)
        #Find unique_values_with_count
        unique_classes = df.select(col).distinct().rdd.map(lambda x: x[0]).collect()
        unique_values_with_count=[]
        for uc in unique_classes:
             unique_values_with_count.append([uc, df.filter(df[col]==uc).count()])
        #sort unique values w.r.t their count values
        sorted_unique_values_with_count= sorted(unique_values_with_count, key = lambda x: x[1], reverse =True)
        
        if (verbose==True): print(col, sorted_unique_values_with_count, " and mode is ", sorted_unique_values_with_count[0][0])
        col_with_mode.append([col, sorted_unique_values_with_count[0][0]])
    return col_with_mode

col_with_mode = mode_of_pyspark_columns(data, cat_cols)

for col, mode in col_with_mode:
    data = data.withColumn(col, when(data[col].isNull()==True, 
    lit(mode)).otherwise(data[col]))

#### Imputation : Numeric variable

각 변수별 평균값 치환 코드

numeric_cols=['age', 'bp', 'sg','bgr', 'bu', 'sc', 'sod', 'pot', 'hemo', 'pcv', 'rc', 'wc']

from pyspark.sql.functions import avg
from pyspark.sql.functions import when, lit

# 컬럼별 평균값 계산하기 
def mean_of_pyspark_columns(df, numeric_cols, verbose=False):
    col_with_mean=[]
    for col in numeric_cols:
        mean_value = df.select(avg(df[col]))
        avg_col = mean_value.columns[0]
        res = mean_value.rdd.map(lambda row : row[avg_col]).collect()
        
        if (verbose==True): print(mean_value.columns[0], "\t", res[0])
        col_with_mean.append([col, res[0]])    
    return col_with_mean

col_with_mean = mean_of_pyspark_columns(data, numeric_cols)

# with column when 구문으로 null 일 때, 평균으로 치환한다. 
for col, mean in col_with_mean:
    data = data.withColumn(col, when(data[col].isNull() == True, 
    lit(mean)).otherwise(data[col]))

 

2. Categorical variable encoding

범주형 피쳐 인코딩

데이터에 관한 지식을 활용해 모든 피쳐들이 binary 또는 ordinal 변수라는 것을 확인한다. 이 경우, Pyspark 의 StringIndexer 를 활용하여 encoding 해볼 수 있다. 예를 들어, normal/abnormal 의 경우, binary 변수이며, good/poor 등도 각 값들이 독립적이지 않고, 등급이 존재하는 ordinal 변수라는 것을 알 수 있다.

data.select(cat_cols).show()
+---+---+--------+--------+----------+----------+---+---+---+-----+---+---+
| al| su|     rbc|      pc|       pcc|        ba|htn| dm|cad|appet| pe|ane|
+---+---+--------+--------+----------+----------+---+---+---+-----+---+---+
|1.0|0.0|  normal|  normal|notpresent|notpresent|yes|yes| no| good| no| no|
|4.0|0.0|  normal|  normal|notpresent|notpresent| no| no| no| good| no| no|
|2.0|3.0|  normal|  normal|notpresent|notpresent| no|yes| no| poor| no|yes|
|4.0|0.0|  normal|abnormal|   present|notpresent|yes| no| no| poor|yes|yes|

 

각 변수별로 suffix로 Index를 붙힌 새로운 변수를 만든다.

# Categorical 변수들을 StringIndexer 을 이용해 Numeric 변수로 변환함 
from pyspark.ml.feature import StringIndexer 

indexed = data
for col in cat_cols:
    indexer = StringIndexer(inputCol=col, outputCol=col + "Index")
    indexed = indexer.fit(indexed).transform(indexed)

예를 들어, 아래와 같이 새로운 컬럼이 생성된다. 

# 각 범주형 변수별로 Index 변수를 아래와 같이 만든다.
indexed.select(['id', 'pc', 'pcIndex']).show()
| id|      pc|pcIndex|
+---+--------+-------+
|  0|  normal|    0.0|
|  1|  normal|    0.0|
|  2|  normal|    0.0|
|  3|abnormal|    1.0|

 

레이블 변수 인코딩

분류 문제의 경우 레이블은 범주형 변수로 마찬가지로 인코딩이 필요하다. 실제 유니크 레이블들을 확인해보니 잘못 들어간 값이 존재하여 이를 제대로된 값으로 치환해준다. 

indexed.select('classification').distinct().show()
+--------------+
|classification|
+--------------+
|        notckd|
|           ckd|
|         ckd\t|
+--------------+

값 치환 

# 잘못된 값이 있기 때문에 이를 replace 해준다. 
indexed = indexed.withColumn("classification", \
              when(indexed["classification"] == 'ckd\t', 'ckd').otherwise(indexed["classification"]))

결과를 보면, notckd 를 1, ckd를 0으로 인코딩 해주었는데, 반대로 notckd를 0, ckd를 1로 바꾸어 인코딩해보자. 

from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="classification", outputCol="label")
indexed = indexer.fit(indexed).transform(indexed)
indexed.select('label').distinct().show()
+--------------+-----+
|classification|label|
+--------------+-----+
|        notckd|  1.0|
|           ckd|  0.0|
+--------------+-----+
indexed = indexed.withColumn("label", when(indexed["label"] == 1, 0).otherwise(1))

 

2. Typecasting of Features

- Numeric 변수들을 Double 형으로 변환한다.
- 이 때 주의할점은, 만약 원래 numeric 변수인데, 오류로 인해 문자가 들어있는 경우, Double 형변환을 하면 값이 null 이 된다.
- 따라서 확인하고, imputation 을 한 번 더 수행해야할 수도 있다. 

 

numeric 변수들을 double 형으로 변환하기

from pyspark.sql.types import DoubleType
for col in numeric_cols:
    indexed = indexed.withColumn(col, indexed[col].cast(DoubleType()))
# with column when 구문으로 null 일 때, 평균으로 치환한다. 
for col, mean in col_with_mean:
    indexed = indexed.withColumn(col, when(indexed[col].isNull() == True, 
    lit(mean)).otherwise(indexed[col]))

 

3. Assembling of Input Features

- feature 를 assemble 하여 하나의 필드로 변환 후, 이후 피쳐 변환이나 모델링 절차 등이 수행한다. 

 

피쳐들의 Type 재확인

-> 모두 double type 이라는 것을 알 수 있음

for col in indexed[feature_list].dtypes:
    print(col[0]+" , "+col[1])

 

VectorAssemble 활용

피쳐들을 assemble 한 features 라고 하는 컬럼을 생성한다. inputCols 파라미터에 피쳐명을 리스트 형태로 지정하고, outputCols 에 assemble 한 값을 넣을 컬럼명을 넣는다. 

# 주의 사항: Assemble 을 할 때, 데이터에 결측이 없어야함 
from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler(inputCols=feature_list,
                                  outputCol="features")
                                  
features_vectorized = vectorAssembler.transform(indexed)
features_vectorized.select(['id', 'features']).show(5)
+---+--------------------+
| id|            features|
+---+--------------------+
|  0|[1.0,0.0,0.0,0.0,...|
|  1|(24,[0,12,13,14,1...|
|  2|[2.0,2.0,0.0,0.0,...|
|  3|[4.0,0.0,0.0,1.0,...|
|  4|(24,[0,12,13,14,1...|
+---+--------------------+

 

 

4. Normalization of Input Features 

- 피쳐들은 모두 같은 스케일이 아니다. 
- 피쳐들을 같은 스케일로 변환하면, 모델링 단계에서 더 좋은 결과를 보여준다. 

- Scaling 을 할 수도 있고, Normalization 을 할 수도 있다. 이 둘을 interchangable 하게 말하기도 하지만, 정확히는 분포의 모양을 바꾸지 않는 경우 scaling, 분포를 바꾸는 경우 normalization 이라고 한다. (https://www.kaggle.com/code/alexisbcook/scaling-and-normalization/tutorial)


scaling 이나 normalization 은 왜 필요할까?

변수의 scale 이 모델 성능에 미치는 영향을 줄여줌. 피쳐들의 스케일까지 학습해야 하는 부담을 줄일 수 있다. 

ㄴ 만약 normalization 이 적절히 수행된다면, 단점은 없고 장점만 있기 때문에 일반적으로 많이 수행한다. 

 

Normalizer 를 활용한 standard scaling 

inputCols 로 assemble 한 컬럼을 넣어주고, outputCols 로 normalization 된 피쳐를 넣을 컬럼명을 지정해준다. p는 normalization 을 할 때, L1 norm 을 이용하는 것을 의미한다. (default 는 L2 norm 이다.) 본 포스팅에서 normalization 수식이나 변환 결과는 생략한다. 

from pyspark.ml.feature import Normalizer

normalizer = Normalizer(inputCol="features", outputCol="features_norm", p=1.0)
l1NormData = normalizer.transform(features_vectorized)
l1NormData.select(['features','features_norm']).show()
+--------------------+--------------------+
|            features|       features_norm|
+--------------------+--------------------+
|[1.0,0.0,0.0,0.0,...|[1.20525839810946...|
|(24,[0,12,13,14,1...|(24,[0,12,13,14,1...|
|[2.0,2.0,0.0,0.0,...|[2.40521254800404...|
|[4.0,0.0,0.0,1.0,...|[5.58159914210821...|
|(24,[0,12,13,14,1...|(24,[0,12,13,14,1...|
preprocessed_data = l1NormData

 

 

5. Spiliting into train and test set

#Split the dataset into training and testing dataset
splits = preprocessed_data.randomSplit([0.8, 0.2])
df_train = splits[0]
df_test = splits[1]

 

6. Train & Predict

pyspark 에서 학습하고 예측하는 일에 대해 fit, transform  이라는 용어를 사용한다. fit 은 학습 데이터를 입력 받아 모델을 만드는 작업업을 의미하며, transform 은 예측하고 싶은 데이터를 입력 받아, 학습된 모델로 그 데이터의 label을 예측한다. 

from pyspark.ml.classification import RandomForestClassifier

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="label", featuresCol="features_norm", numTrees=10)
rf_model = rf.fit(df_train)
prediction = rf_model.transform(df_train)

transform 을 수행하면, rawPrediction, probability, prediction 이라는 컬럼을 데이터에 추가해준다. 

- rawPrediction (list): random forest 의 각 tree 들이 각 클래스에 속한다고 예측한 값들의 합이다. 

- probabilty (list): rawPrediction 을 0~1 사이로 scaling 한 값 

- prediction (element): probabilty 를 토대로 가장 가능성이 높은 label 예측한 값 

prediction.select(['id', 'rawPrediction', 'probability', 'prediction']).show()
+---+--------------------+--------------------+----------+
| id|       rawPrediction|         probability|prediction|
+---+--------------------+--------------------+----------+
|  0|[0.94594594594594...|[0.09459459459459...|       1.0|
|  1|[2.06800093632958...|[0.20680009363295...|       1.0|
| 10|[0.01515151515151...|[0.00151515151515...|       1.0|
|100|[0.20461020461020...|[0.02046102046102...|       1.0|
|102|[4.91567118722564...|[0.49156711872256...|       1.0|

 

7. Evaluation

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

binEval = MulticlassClassificationEvaluator().setMetricName("accuracy") .setPredictionCol("prediction").setLabelCol("label")
print(f'training accuracy: {binEval.evaluate(prediction)}')

prediction_test = rf_model.transform(df_test)
binEval = MulticlassClassificationEvaluator().setMetricName("accuracy") .setPredictionCol("prediction").setLabelCol("label")
print(f'test accuracy: {binEval.evaluate(prediction_test)}')
training accuracy: 0.9900662251655629
test accuracy: 0.9489795918367347
 

참고문서

https://medium.com/@aieeshashafique/machine-learning-with-pysparkmlib-and-random-forest-solving-a-chronic-kidney-disease-problem-752287d5ffd0
https://www.silect.is/blog/random-forest-models-in-spark-ml/  

- https://dzone.com/articles/what-scalable-machine-learning

반응형
반응형