2009/03/16

Amazon EC2 + Hadoop Streaming で データ処理を行う

Hadoop by Jinho.Jung.
Hadoop on Flickr - Photo Sharing!

お題は “Always on the side of the egg” をワードカウントしてよく使われている単語を見つけるというもの。ちなみにこの”Always on the side of the egg”というのは日本語訳すると「常に卵の側に」で、村上春樹氏がエルサレム賞の受賞スピーチで読み上げたものです。

今回は【英語全文】村上春樹さん「エルサレム賞」授賞式講演 - 47トピックスの英文を使いましたが、オリジナルの発表原稿も公開されていました。
Always on the side of the egg - Haaretz - Israel News

 

Hadoop Streaming というのは通常はHadoop上の処理をする場合はJavaで記述する必要があるのですが、好きな言語でコードを記述しそれを標準入出力を介しHadoopで実行させる事(?)です。ですので、Hadoopの分散処理のパワーをPython/Ruby/PerlといったLL言語でも扱うことが出来るようになります。

ちなみにPython以外知らないので、今回はPythonで記述。
Hadoop Streamingで調べるとどういう訳かRubyで記述されてものはあるんですがなかなかPythonは見あたらない…

ワードカウントのコードはRubyでHadoop Streaming[2/2] - 森薫の日記にてRubyで公開されているものを元にPythonに置き換えてみました。

 

map.py

#!/usr/bin/env/ python
#coding:utf-8
#Hadoop Map

import sys

for line in sys.stdin:
    word = line.split()
    for i in word:
        if i != None:
            print "%s\t1" %(i)

reduce.py
(後半の処理は無駄かも…)

#!/usr/bin/env/ python
#coding:utf-8
#Hadoop Reduce

import sys

h = {}

for line in sys.stdin:
    list = line.split('\t')
    key = list[0]
    if h.has_key(key):
        h[key] += 1
    else:
        h[key] = 1

hk = h.items()
hk.sort(key=lambda a:a[1])
hk.reverse()
for i in hk:
    print '%s\t%s' % (i[0] , i[1])

それと先の“Always on the side of the egg”をegg.txtにコピペしておきます。

 

HadoopからAmazon EC2のインスタンスを起動して、作成したPythonコードやegg.txtをローカルからアップロードします。手順はインスタンスの起動・終了と同じ手順です。

./hadoop-0.19.1/src/contrib/ec2/bin/hadoop-ec2 push hdfs-test-cluster eggs/

これで、eggsというフォルダの中身が起動したhdfs-test-clusterのマスターに転送(アップロード)されます。
(pushというコマンドがあるから、データをダウンロードするコマンドもあるかと調べてみましたがありませんでした:-)

 

この状態でHadoopを動かす前に、コードがきちんと動くかを確認します。

cat egg.txt | python map.py | python reduce.py

これで、英語の単語と数字がズラッと出力されれば成功。
Hadoop Streaming はこの処理を複数のコンピュータに分散して行わせるだけなので、同じ結果になるはずです。

 

現在の状態だと出力結果が圧縮されているので普通にcatすると文字化けしてみられないです。戻し方を知らないので根本的に出力結果を圧縮させないように設定します。

/usr/local/hadoop-0.19.0/conf/hadoop-site.xml

を開いて

<property>
  <name>mapred.output.compress</name>
  <value>true</value>
</property>

このtrueとなっている部分をfalseに書き換えてやります。

ついでにAmazon S3をHDFSとして利用したい場合はこのファイルに数行を追加するだけです。
Amazon S3 を Hadoopの HDFSとして利用する

 

処理をさせていきます。
まずは前処理でHDFS上にeggsというフォルダを作って、その中にegg.txtを入れます。

# hadoop dfs -mkdir eggs

# hadoop dfs -put egg.txt eggs

# hadoop dfs –ls
Found 1 items
drwxrwxrwx   -          0 1969-12-31 19:00 /user/root/eggs

# hadoop dfs -ls eggs
Found 1 items
-rwxrwxrwx   1       7450 1969-12-31 19:00 /user/root/eggs/egg.txt

 

実際にHadoop Streaming させるのはこの部分!

# hadoop jar /usr/local/hadoop-0.19.0/contrib/streaming/hadoop-0.19.0-streaming.jar -input eggs -output eggs_out -mapper "python map.py" -reducer "python reduce.py" -file map.py -file reduce.py

  • -input xxxx
    ここに処理させたいデータが入っているHDFSのフォルダ名
    ファイル名まで入れるとそれだけを処理してくれる(この場合はフォルダ内全体)
  • -output xxxx
    この部分に出力したいHDFSのフォルダ名を入れる
    注意点としてすでに存在しているフォルダ名を指定するとエラーになる
  • -mapper xxxx
    MapReduce処理のMap部分を担当させるコードのパスをここに入れる
    単純にコマンドラインで実行させるときの感じ
  • -reducer xxxx
    MapReduce処理のReduce部分を担当させるコードのパスをここに入れる
    同上
  • -file xxxx
    複数台のHadoopクラスタで処理を行う場合は必要(Amazon EC2上で行う場合は必須)
    各スレーブに-mapper -reducerで使用したファイルを転送することでそれぞれのノードでHadoop Streaming が行えるようになる
    指定するのは-mapper -reducerで指定したのと同じファイルのパス

Amazon EC2でHadoopを動かすと、基本複数ノードになるので-file指定は忘れずに。
うまく行くとJob completeが出ます:p
(何度Killされたことか…)

 

きちんと出力されているか確認。

# hadoop dfs -ls
Found 2 items
drwxrwxrwx   -          0 1969-12-31 19:00 /user/root/eggs
drwxrwxrwx   -          0 1969-12-31 19:00 /user/root/eggs_out

# hadoop dfs -ls eggs_out
Found 2 items
drwxrwxrwx   -          0 1969-12-31 19:00 /user/root/eggs_out/_logs
-rwxrwxrwx   1       5334 1969-12-31 19:00 /user/root/eggs_out/part-00000

# hadoop dfs -cat eggs_out/part-00000

圧縮されていないのでズラッと、英語の単語と数字が表示されると思います。

今回は処理対象が数KBのデータだったのでコマンドライン上だと数秒もかからなかった処理ですが、Hadoopを使うと結構時間がかかります。でも、このデータが10GBとか1TBとかになったときにHadoopを使う真価を発揮すると思います。

 

実際に処理した結果はこうなりました。

to      64
the     59
of      45
and     44
a       39
I       38
is      28
in      22
that    19
it      17
have    14
are     13
as      10
he      10
we      10
with    10
The     10
not     10
was     9
do      9
one     9
my      9
on      8
by      8
for     8
people  8

当たり前ですがtoとかandとかが多くなってしまいました。
さすが小説家…なのかは分かりませんが同じ意味の単語は微妙に言い換えて使っています。そのなかでも上位に来ている単語は“people”でした。

今回は非常に単純なワードカウント処理をHadoop Streaming で行ってみましたが、応用してちょっとは意味のあるコード動かしてみたいな!

 

自分の環境ではこの通りの手順で出来ましたが、「いや…出来ないんですけど…」とか「もっと簡単に設定する方法があるよ」とかありましたら教えて下さい。