반응형

 

sparklyr 에서 bucketing 하기 (기본 R과의 비교)

 

sparkly 에서는 spark 에서 실행하기 위한 함수들이 존재한다. 물론 그룹별 집계 등의 기본적인 데이터 처리의 경우, dplyr wrapper 를 통해 추가적인 학습 없이도 바로 사용할 수 있지만, 이외의 데이터 처리의 경우 sparklyr 의 고유 함수에 대한 학습이 필요하다. 

 

본 포스팅에서는 10분위수를 기준으로 변수를 10개의 카테고리로 분류하는 bucketing 작업을 sparklyr 을 통해 해보고 기본 R 과 비교해보았다. 

 

sparklyr 에서 bucketing 하기

get_spark_connection 의 경우 spark connection 을 잡는 함수인데, 관련해서 이 포스팅을 참고하길 바란다. 이 데이터처리에서 중요함수는 sdf_quantile 과 ft_bucketizer 이다. sdf 는 spark dataframe 의 약자이고, ft는 feature transformation 의 약자이다.

 

10분위수 구하기: sdf_quantile 을 사용하면 병렬적으로 변수의 quantile 을 구해서 리턴해준다. 이 작업은 데이터를 메모리로 로드후에 R 의 기본 quantile 을 실행하는 것보다 빠르다.

 

bucket 만들기: ft_bucketizer 는 연속형 변수에 대해 bucketizing 을 해준다. 이 때, 만약 10개의 bucket 으로 나눈다고 하면, splits 에 원소가 11개인 벡터를 전달해주어야한다. 10개의 bucket 으로 나눈다고 하면, 9개의 split point 가 필요한데, 여기에 최소값과 최대값을 추가로 더해 11개라고 볼 수 있다. quantile 함수에서 9개의 decile 과 최대값을 구했으므로, 최소값인 0을 추가로 더해 split 함수에 넣어준다. 

 

bucket 이름 만들기: ft_bucketizer 함수에는 아쉽게도 bucket 의 label 을 지정하는 방법이 없다. 단지 index 만 만들어줄 뿐이다. 따라서 index 와 label 을 매핑하는 데이터프레임을 따로 만들어서 해결해볼 수 있다. quantile 의 개수만큼 seq_along 을 통해 인덱스를 만들고, names 함수를 통해 벡터의 이름을 구해서 bucket_df 라는 데이터프레임을 만든 후에, 이를 데이터와 조인하여 bucket 의 label 을 만들 수 있다. 

library(tidyverse)
library(sparklyr)

sc <- get_spark_connection()

cars <- copy_to(sc, cars, "cars", overwrite=TRUE)

val_quantile <- sdf_quantile(cars %>% filter(speed > 0), column = "speed", probabilities = seq(0.1, 1, 0.1))
val_quantile <- val_quantile[!duplicated(val_quantile, fromLast = TRUE)]

val_quantile
#    10%       20%       30%       40%       50%       60%       70%       80%       90%      100% 
#   15900     27000     35000     49900     63800     86200    113800    158310    246000 608623010 

names(val_quantile)

bucket_df <- data.frame(bucket_index = seq_along(val_quantile) - 1, bucket = names(val_quantile))
colnames(bucket_df) <- c("speed_index", "speed_cat")

# bucketing: 10분위수 총 9개와 함께, 최솟값, 최댓값을 벡터로 만들어 전달. (총 원소가 11개인 벡터)
cars <- ft_bucketizer(cars,
                        input_col = "speed",
                        output_col = "speed_index",
                        splits = c(0,val_quantile))

# spark dataframe 으로 변경
bucket_df <- sdf_copy_to(sc, bucket_df, "bucket_df", overwrite = TRUE)
cars <- left_join(cars, bucket_df, by = "speed_index")
cars %>% select(speed, speed_index, speed_cat) %>% sample_n(50)

 

기본 R 에서 10분위 bucketing 하기

기본 R dml

val_quantile <- quantile(cars %>% filter(speed > 0) %>% pull(speed), probs=seq(0.1, 1, 0.1))
val_quantile
# 10%  20%  30%  40%  50%  60%  70%  80%  90% 100% 
#   8.9 11.0 12.7 14.0 15.0 17.0 18.3 20.0 23.1 25.0 


cars$speed_cat <- if_else(cars$speed == 0, "X", 
                          as.character(cut(cars$speed, breaks = c(0,  val_quantile), labels = names(val_quantile))))
cars %>% sample_n(50)

# speed dist speed_cat
# 1     14   60       40%
# 2     13   26       40%
# 3     19   36       80%
# 4     13   34       40%
# 5     12   24       30%
# 6      4   10       10%
반응형