반응형

Hadoop MapReduce 파이썬 구현 (단어 빈도수 세기)

 

Map-Reduce 란?

 

본 포스팅에서는 하둡이 분산처리를 하는 방법인 맵리듀스를 파이썬으로 구현하는 방법을 간단하게 다루겠습니다. 하둡은 자바로 코딩되어 있지만 파이썬, C++ 등의 언어를 이용해서 하둡을 이용할 수 있습니다. 가장 간단하게는 파이썬 코드를 Jython 을 이용해 자바의 jar 파일로 만드는 것인데, 이것은 매우 불편합니다. 특히, Jython 에서 제공하지 않는 파이썬의 기능, 패키지를 이용할 때 문제가 됩니다. 그래서 하둡에서는 standard input, standard output 을 인터페이스로해서 파이썬과 같은 다른 언어에서도 하둡을 이용하는 방법을 제공하고 있습니다. 이번 포스팅에서는 파이썬으로 맵리듀스 코드를 구현하고 테스트 해보는 것을 정리하였습니다. 맵리듀스의 프로세스는 아래와 같습니다. 

 

출처 -  http://www.admin-magazine.com/HPC/Articles/MapReduce-and-Hadoop

 

맵 리듀스는 기본적으로 Split -> Map -> Shuffle -> Reduce 의 절차를 갖습니다. 여기서 Split-Map을 합쳐서 맵 태스크, Shuffle-Reduce 를 합쳐서 리듀스 태스크라고도 부릅니다. Split 은 인풋데이터를 쪼개서 인풋을 키-쌍 값으로 만들어주는 작업이고, Map 은 키-쌍 값을 인풋으로 받아 list(키-값 쌍)을 내보냅니다. Shuffle 에서는 list(키-값 쌍) 을 인풋으로 받아 키-값 쌍을 내보내고 Reduce 에서는 Shuffle 의 결과인 키-값 쌍 입력으로 받아 마지막으로 list(키-값 쌍) 을 내보냅니다. 

 

Map-Reduce 의 인풋/아웃풋

 


 Input Output 
 Split  텍스트  <k1, v1>
 Map  <k1, v1>  list (<k2, v2>)
 Shuffle  list (<k2, v2>)  <k2, list(v2)>
 Reduce  <k2, list(v2)>  list (<k3, v3>)

 

단어 빈도수 세기 예제

 

맵 리듀스는 왜 필요할까요? 예를 들어, 100개의 문서가 있을 때, 이 문서들에서 단어의 빈도수를 세는 프로그램을 작성한다고 해봅시다. 그런데 문서의 크기가 각각 1TB 라서 500GB 램을 갖는 하나의 컴퓨터에서 실행할 수 없다고 해봅시다. 만약, 분산 컴퓨팅에 대한 학습이 안 되어있는 프로그래머라면, 우선 메모리가 충분하지 않으므로 문서의 일부만 불러와서 단어수를 세고, 결과를 어딘가에 저장하고, 메모리에서 지우는 것을 반복하는 것을 생각해볼 수 있을 것입니다. 물론 이것도 좋은 해법이겠지만, 단점은 시간이 오래걸린다는 것입니다. 만약 충분한 수의 컴퓨터가 있다면 문서의 총 크기는 100TB 이므로, 예를 들어, 각각 500GB 의 메모리를 갖는 200개의 컴퓨터를 활용해서 문제를 해결하고자 하는 것이 분산 컴퓨팅이고 분산컴퓨팅에 사용되는 유명한 방법이 바로 맵리듀스라고 할 수 있습니다.

 

단어 빈도수 세기 예제에서의 자료 구조 흐름

 

인풋데이터

We hold these truths to be self-evident. Governments long established should not. Such has been the patient. .....

 

Splitting 결과 (<k1, v1>)

(0, We hold these truths to be self-evident, ..)

(138, Governments long established should not ...)

(256, Such has been the patient ...) 

 

이렇게 나눠진 문서는 각 노드에 정해진 양만큼 할당이 됩니다. 맵 태스크가 하는 일은 이 키-값 쌍을 인풋으로 받아 다음과 같은 리스트를 만들어주는 것입니다. 총 100개의 문장이라면 다음과 같은 100개의 리스트가 생깁니다. 

 

Map 결과 (list (<k2, v2>))

('We' : 1, 'hold' : 1 ...) 

('Government' : 1, 'long' : 1 ...)

('Such' : 1, 'has', 1 ...)

 

맵 태스크의 결과는 문장별 단어의 빈도수를 갖고 있는 리스트입니다 (여러번 단어가 나오더라도 각각 1을 같습니다.). 이를 키-값 쌍으로 변화하기 위해 셔플링 (Shuffling) 을 수행합니다. 셔플링은 각 단어별로 문장 내에서 찾은 빈도수의 리스트를 갖고 있습니다. 

 

Shuffle 결과 (<k2, list(v2)>)

('We' : [1,1,1])

('Government : [1,1,1])

('Such' : [1,1,1])

 

리듀스 태스크는 셔플링의 결과 키-값 쌍을 입력으로 받아 최종 결과를 출력합니다. 

 

Reduce 결과 (list (<k3, v3>))

('We' : 100)

('Government' : 10)

 

파이썬 Mapper 

 

Map Task 는 인풋 데이터를 적절히 쪼갠 후, 여러 개의 키-값 쌍 (key-value pair) 으로 만드는 과정입니다. 단어 수를 세는 문제에서는 먼저 텍스트를 라인 단위로 나눈 후 (스플릿), 각 라인별로 단어를 쪼개서 출력합니다. Map task 를 실습해보기 위해 hadoop.txt 라는 샘플 텍스트 파일을 만든 후 아래와 같이 실행하였습니다. 

#!/usr/bin/env python
"""mapper.py"""

import sys

# input comes from STDIN (standard input)
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # split the line into words
    words = line.split()
    # increase counters
    for word in words:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        #
        # tab-delimited; the trivial word count is 1
        print ('%s\t%s' % (word, 1))

hadoop.txt
0.00MB

cat hadoop.txt | python mapper.py

MapReduce       1
framework.      1
The     1
idea    1
is      1
based   1
on      1
the     1

 

위와 같이 (단어, 1) 의 키-값 쌍이 아웃풋으로 나오게 됩니다. 이제 이 아웃풋을 리듀서에 전달하면 됩니다. 참고로 이 Mapper 가 실제 hadoop 에서 실행될 때, 아웃풋이 셔플 단계에 의해 sorting 되고 적절한 수의 노드에 나누어서 전달 됩니다. 나누어서 전달할 때도 랜덤하게 나누는 것이 아니라 sorting 된 채로 나누어지기 때문에 효율을 최대화할 수 있는 방법으로 데이터를 전달합니다. 하지만 본 포스팅에서 이 부분은 다루지 않습니다. 이 부분은 하둡에 의해 제어되며, 프로그래머는 코드를 통해 맵리듀스의 각 프로세스의 데이터 구조 프로토콜에 맞게 아웃풋을 내주기만 하면됩니다.  

 

파이썬 Reducer 작성

 

Reduce task 는 map task 의 output 을 input 으로 받아 원하는 결과를 집계해주는 작업입니다. 실제로 hadoop 에서는 reduce task 는 shuffle 과 reduce 로 나뉩니다. reduce 작업에 사용되는 노드는 하나일 수도 있지만, 여러개의 노드를 사용하기도 합니다. 이러한 작업이 복잡해보이지만 분산해서 처리하는 일은 hadoop 에서 제어하는 부분에 속합니다. 즉, 코드를 작성하는 사람은 코드의 로직에만 집중하면됩니다. 

#!/usr/bin/env python
"""reducer.py"""

from operator import itemgetter
import sys

current_word = None
current_count = 0
word = None

# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()

    # parse the input we got from mapper.py
    word, count = line.split('\t', 1)

    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue

    # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            print ('%s\t%s' % (current_word, current_count))
        current_count = count
        current_word = word

# do not forget to output the last word if needed!
if current_word == word:
    print ('%s\t%s' % (current_word, current_count))

 

우선 코드가 잘 작동하는지 알아보기 위해 mapper 의 아웃풋을 '키' 인 단어를 기준으로 아래와 같이 정렬합니다.  

cat hadoop.txt | python mapper.py | sort -k 1

(Figure 1
(typically      1
1).     1
Adobe,  1
Amazon  1
Amazon, 1
An      1
An      1
Apache  1
Automatic       1
Because 1
Elastic 1
Google  1
Google  1
Hadoop  1

 

다음으로 위 명령어의 아웃풋을 piping 을 통해 reducer 에 전달한 후, '값' 인 빈도수를 기준으로 내림차순 정렬하면 최종 결과를 얻게 됩니다. 

cat hadoop.txt | python mapper.py | sort -k1,1 | python reducer.py | sort -k 2 -r

to      8
MapReduce       8
is      7
that    6
in      6
and     6
a       6
on      5
reduce  4
can     4
be      4
are     4
map     3
by      3
The     3
which   2

 

본 포스팅에서는 파이썬을 통해 맵리듀스 코드를 작성하는 방법을 포스팅하였습니다. 하지만 이는 분산 컴퓨팅은 아닙니다. 다음 포스팅에서는 이러한 파이썬 맵리듀스 코드를 하둡으로 실행해서 분산 처리하는 방법 대해 다루어보겠습니다. 

 

References

반응형