このページでは、Python を使用して Hadoop ストリーミングジョブを作成する方法について説明します。

Python を使用して Hadoop ストリーミングジョブを作成

マッパーコードは以下のとおりです。
#! /usr/bin/env python
import sys
for line in sys.stdin:
    line = line.strip()
    words = line.split()
    for word in words:
        print '%s\t%s' % (word, 1)
レデューサーのコードは以下のとおりです。
#! /usr/bin/env python
from operator import itemgetter
import sys
current_word = None
current_count = 0
word = None
for line in sys.stdin:
    line = line.strip()
    word, count = line.split('\t', 1)
    try:
        count = int(count)
    except ValueError:
        continue
    if current_word == word:
        current_count += count
    else:
        if current_word:
            print '%s\t%s' % (current_word, current_count)
        current_count = count
        current_word = word
if current_word == word:
    print '%s\t%s' % (current_word, current_count)
マッパーコードが /home/hadoop/mapper.py、レデューサーコードが /home/hadoop/reducer.py で保存され、入力と出力のパスが HDFS ファイルシステム内でそれぞれ /tmp/input および /tmp/output であるとします。 E-MapReduce クラスター内で以下の Hadoop コマンドを送信します。
hadoop jar /usr/lib/hadoop-current/share/hadoop/tools/lib/hadoop-streaming-*.jar -file /home/hadoop/mapper.py -mapper mapper.py -file /home/hadoop/reducer.py -reducer reducer.py -input /tmp/hosts -output /tmp/output