Hadoop MapReduce 파이썬 구현 (단어 빈도수 세기)
Map-Reduce 란?
본 포스팅에서는 하둡이 분산처리를 하는 방법인 맵리듀스를 파이썬으로 구현하는 방법을 간단하게 다루겠습니다. 하둡은 자바로 코딩되어 있지만 파이썬, C++ 등의 언어를 이용해서 하둡을 이용할 수 있습니다. 가장 간단하게는 파이썬 코드를 Jython 을 이용해 자바의 jar 파일로 만드는 것인데, 이것은 매우 불편합니다. 특히, Jython 에서 제공하지 않는 파이썬의 기능, 패키지를 이용할 때 문제가 됩니다. 그래서 하둡에서는 standard input, standard output 을 인터페이스로해서 파이썬과 같은 다른 언어에서도 하둡을 이용하는 방법을 제공하고 있습니다. 이번 포스팅에서는 파이썬으로 맵리듀스 코드를 구현하고 테스트 해보는 것을 정리하였습니다. 맵리듀스의 프로세스는 아래와 같습니다.
맵 리듀스는 기본적으로 Split -> Map -> Shuffle -> Reduce 의 절차를 갖습니다. 여기서 Split-Map을 합쳐서 맵 태스크, Shuffle-Reduce 를 합쳐서 리듀스 태스크라고도 부릅니다. Split 은 인풋데이터를 쪼개서 인풋을 키-쌍 값으로 만들어주는 작업이고, Map 은 키-쌍 값을 인풋으로 받아 list(키-값 쌍)을 내보냅니다. Shuffle 에서는 list(키-값 쌍) 을 인풋으로 받아 키-값 쌍을 내보내고 Reduce 에서는 Shuffle 의 결과인 키-값 쌍 입력으로 받아 마지막으로 list(키-값 쌍) 을 내보냅니다.
Map-Reduce 의 인풋/아웃풋
Input | Output | |
Split | 텍스트 | <k1, v1> |
Map | <k1, v1> | list (<k2, v2>) |
Shuffle | list (<k2, v2>) | <k2, list(v2)> |
Reduce | <k2, list(v2)> | list (<k3, v3>) |
단어 빈도수 세기 예제
맵 리듀스는 왜 필요할까요? 예를 들어, 100개의 문서가 있을 때, 이 문서들에서 단어의 빈도수를 세는 프로그램을 작성한다고 해봅시다. 그런데 문서의 크기가 각각 1TB 라서 500GB 램을 갖는 하나의 컴퓨터에서 실행할 수 없다고 해봅시다. 만약, 분산 컴퓨팅에 대한 학습이 안 되어있는 프로그래머라면, 우선 메모리가 충분하지 않으므로 문서의 일부만 불러와서 단어수를 세고, 결과를 어딘가에 저장하고, 메모리에서 지우는 것을 반복하는 것을 생각해볼 수 있을 것입니다. 물론 이것도 좋은 해법이겠지만, 단점은 시간이 오래걸린다는 것입니다. 만약 충분한 수의 컴퓨터가 있다면 문서의 총 크기는 100TB 이므로, 예를 들어, 각각 500GB 의 메모리를 갖는 200개의 컴퓨터를 활용해서 문제를 해결하고자 하는 것이 분산 컴퓨팅이고 분산컴퓨팅에 사용되는 유명한 방법이 바로 맵리듀스라고 할 수 있습니다.
단어 빈도수 세기 예제에서의 자료 구조 흐름
인풋데이터
We hold these truths to be self-evident. Governments long established should not. Such has been the patient. .....
Splitting 결과 (<k1, v1>)
(0, We hold these truths to be self-evident, ..)
(138, Governments long established should not ...)
(256, Such has been the patient ...)
이렇게 나눠진 문서는 각 노드에 정해진 양만큼 할당이 됩니다. 맵 태스크가 하는 일은 이 키-값 쌍을 인풋으로 받아 다음과 같은 리스트를 만들어주는 것입니다. 총 100개의 문장이라면 다음과 같은 100개의 리스트가 생깁니다.
Map 결과 (list (<k2, v2>))
('We' : 1, 'hold' : 1 ...)
('Government' : 1, 'long' : 1 ...)
('Such' : 1, 'has', 1 ...)
맵 태스크의 결과는 문장별 단어의 빈도수를 갖고 있는 리스트입니다 (여러번 단어가 나오더라도 각각 1을 같습니다.). 이를 키-값 쌍으로 변화하기 위해 셔플링 (Shuffling) 을 수행합니다. 셔플링은 각 단어별로 문장 내에서 찾은 빈도수의 리스트를 갖고 있습니다.
Shuffle 결과 (<k2, list(v2)>)
('We' : [1,1,1])
('Government : [1,1,1])
('Such' : [1,1,1])
리듀스 태스크는 셔플링의 결과 키-값 쌍을 입력으로 받아 최종 결과를 출력합니다.
Reduce 결과 (list (<k3, v3>))
('We' : 100)
('Government' : 10)
파이썬 Mapper
Map Task 는 인풋 데이터를 적절히 쪼갠 후, 여러 개의 키-값 쌍 (key-value pair) 으로 만드는 과정입니다. 단어 수를 세는 문제에서는 먼저 텍스트를 라인 단위로 나눈 후 (스플릿), 각 라인별로 단어를 쪼개서 출력합니다. Map task 를 실습해보기 위해 hadoop.txt 라는 샘플 텍스트 파일을 만든 후 아래와 같이 실행하였습니다.
#!/usr/bin/env python
"""mapper.py"""
import sys
# input comes from STDIN (standard input)
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# split the line into words
words = line.split()
# increase counters
for word in words:
# write the results to STDOUT (standard output);
# what we output here will be the input for the
# Reduce step, i.e. the input for reducer.py
#
# tab-delimited; the trivial word count is 1
print ('%s\t%s' % (word, 1))
cat hadoop.txt | python mapper.py
MapReduce 1
framework. 1
The 1
idea 1
is 1
based 1
on 1
the 1
위와 같이 (단어, 1) 의 키-값 쌍이 아웃풋으로 나오게 됩니다. 이제 이 아웃풋을 리듀서에 전달하면 됩니다. 참고로 이 Mapper 가 실제 hadoop 에서 실행될 때, 아웃풋이 셔플 단계에 의해 sorting 되고 적절한 수의 노드에 나누어서 전달 됩니다. 나누어서 전달할 때도 랜덤하게 나누는 것이 아니라 sorting 된 채로 나누어지기 때문에 효율을 최대화할 수 있는 방법으로 데이터를 전달합니다. 하지만 본 포스팅에서 이 부분은 다루지 않습니다. 이 부분은 하둡에 의해 제어되며, 프로그래머는 코드를 통해 맵리듀스의 각 프로세스의 데이터 구조 프로토콜에 맞게 아웃풋을 내주기만 하면됩니다.
파이썬 Reducer 작성
Reduce task 는 map task 의 output 을 input 으로 받아 원하는 결과를 집계해주는 작업입니다. 실제로 hadoop 에서는 reduce task 는 shuffle 과 reduce 로 나뉩니다. reduce 작업에 사용되는 노드는 하나일 수도 있지만, 여러개의 노드를 사용하기도 합니다. 이러한 작업이 복잡해보이지만 분산해서 처리하는 일은 hadoop 에서 제어하는 부분에 속합니다. 즉, 코드를 작성하는 사람은 코드의 로직에만 집중하면됩니다.
#!/usr/bin/env python
"""reducer.py"""
from operator import itemgetter
import sys
current_word = None
current_count = 0
word = None
# input comes from STDIN
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# parse the input we got from mapper.py
word, count = line.split('\t', 1)
# convert count (currently a string) to int
try:
count = int(count)
except ValueError:
# count was not a number, so silently
# ignore/discard this line
continue
# this IF-switch only works because Hadoop sorts map output
# by key (here: word) before it is passed to the reducer
if current_word == word:
current_count += count
else:
if current_word:
# write result to STDOUT
print ('%s\t%s' % (current_word, current_count))
current_count = count
current_word = word
# do not forget to output the last word if needed!
if current_word == word:
print ('%s\t%s' % (current_word, current_count))
우선 코드가 잘 작동하는지 알아보기 위해 mapper 의 아웃풋을 '키' 인 단어를 기준으로 아래와 같이 정렬합니다.
cat hadoop.txt | python mapper.py | sort -k 1
(Figure 1
(typically 1
1). 1
Adobe, 1
Amazon 1
Amazon, 1
An 1
An 1
Apache 1
Automatic 1
Because 1
Elastic 1
Google 1
Google 1
Hadoop 1
다음으로 위 명령어의 아웃풋을 piping 을 통해 reducer 에 전달한 후, '값' 인 빈도수를 기준으로 내림차순 정렬하면 최종 결과를 얻게 됩니다.
cat hadoop.txt | python mapper.py | sort -k1,1 | python reducer.py | sort -k 2 -r
to 8
MapReduce 8
is 7
that 6
in 6
and 6
a 6
on 5
reduce 4
can 4
be 4
are 4
map 3
by 3
The 3
which 2
본 포스팅에서는 파이썬을 통해 맵리듀스 코드를 작성하는 방법을 포스팅하였습니다. 하지만 이는 분산 컴퓨팅은 아닙니다. 다음 포스팅에서는 이러한 파이썬 맵리듀스 코드를 하둡으로 실행해서 분산 처리하는 방법 대해 다루어보겠습니다.
References
'Data Engineering > Hadoop ecosystem' 카테고리의 다른 글
하둡과 응용 프레임워크 3) 하둡 기반 응용 프로그램 (0) | 2020.03.08 |
---|---|
하둡과 응용 프레임워크 2) 하둡 실행 환경 (YARN, Tez, Spark) (0) | 2020.03.07 |
Docker 를 통한 Hive 실습환경 구축 및 간단 예제 (0) | 2020.03.02 |
하둡과 응용 프레임워크 1) Hadoop Basic module (0) | 2020.02.28 |