Hadoop 完全分散モードで Scala による MapReduce のジョブ実行 - hdfs・mapred コマンドによる手動実行

前回 id:fits:20101010、Scala で実装した MapReduce のコードを完全分散モードで実行してみました。

今回は、クラスタ上の全デーモンを起動する start-dfs.sh スクリプトは使わずに、hdfs や mapred を使って NameNode・DataNode、JobTracker・TaskTracker を手動で起動しています。
多少不便ですが、こうする事で一般的に紹介されているような SSH の設定等は不要となります。


また、完全分散モードで実行する場合、Job オブジェクトに setJarByClass() しておきます。(スタンドアロン実行だけなら不要)
ちなみに、setJarByClass を行っていないとリモート側に JAR ファイルを手動で配置する羽目になります。

・・・
object MoneyCounter {
    def main(args: Array[String]) {
        ・・・
        val job = Job.getInstance(new Cluster(new Configuration))
        job.setJobName("myjob")

        //分散実行モードでこの JAR ファイルをリモート側に
        //自動配置するには以下が必要
        job.setJarByClass(classOf[CountMapper])
        ・・・

Hadoop の環境は以下の通りです。

クラスタ構成

クラスタの構成は以下のようにします。

  • fits1 (Windows7 + cygwin
    • NameNode
    • JobTracker
    • DataNode
    • TaskTracker
  • fits2 (Ubuntu 10.10)
    • DataNode
    • TaskTracker

fits1 側の設定(Windows)

まず、Hadoop の lib ディレクトリに scala-library.jar を配置しておきます。(環境変数 HADOOP_CLASSPATH に設定しても可だと思う)

今回は Hadoop の conf ディレクトリは編集せずに、自前の設定用ディレクトリ(fits1-conf)とファイルを以下のように用意しました。

  • fits1-conf ディレクトリ
    • core-site.xml
    • mapred-site.xml

ファイルの内容は以下の通り。

core-site.xml
<configuration>
  <property>
    <name>fs.default.name</name>
     <value>hdfs://fits1:9000</value>
  </property>
</configuration>
mapred-site.xml
<configuration>
  <property>
    <name>mapred.job.tracker</name>
    <value>fits1:9001</value>
  </property>
</configuration>

fits2 側の設定(Ubuntu

まず、Hadoop の lib ディレクトリに scala-library.jar を配置しておきます。(環境変数 HADOOP_CLASSPATH に設定しても可だと思う)

fits2 側でも自前の設定用ディレクトリ(fits2-conf)とファイルを用意します。

  • fits2-conf ディレクトリ
    • core-site.xml
    • mapred-site.xml

core-site.xml と mapred-site.xml ファイルの内容は fits1 と同じです。

実行

まず、fits1 で NameNode のフォーマット処理を実行しておきます。
なお、設定ディレクトリを指定するには "--config <設定ディレクトリ>" を使います。

fits1(Windows)の処理は全て cygwin 上で実施する点に注意。

NameNode フォーマット処理
$ hdfs --config fits1-conf namenode -format
起動処理(fits1)

次に、fits1 で NameNode・JobTracker・DataNode・TaskTracker をそれぞれ起動します。

NameNode 起動

$ hdfs --config fits1-conf namenode

JobTracker 起動

$ mapred --config fits1-conf jobtracker

DataNode 起動

$ hdfs --config fits1-conf datanode

TaskTracker 起動

$ mapred --config fits1-conf tasktracker
起動処理(fits2)

fits2 で DataNode・TaskTracker をそれぞれ起動します。

DataNode 起動

$ hdfs --config fits2-conf datanode

TaskTracker 起動

$ mapred --config fits2-conf tasktracker
起動確認

fits1 の NameNode 管理画面(http://localhost:50070/)で Live Nodes が 2、JobTracker 管理画面(http://localhost:50030/)で Nodes が 2 になっていると成功です。

fits1 側で Job 実行

入力ファイル(testinput.txt)を用意し、HDFS に登録(/work/testinput)しておきます。

$ hadoop --config fits1-conf fs -copyFromLocal testinput.txt /work/testinput

ジョブ(moneycounter_2.8.0-1.0.jar)を実行します。

$ hadoop --config fits1-conf jar moneycounter_2.8.0-1.0.jar /work/testinput /work/testoutput

これで処理が分散実行されると思います。

入力ファイルの内容によって処理が分散されないようであれば、Job オブジェクトに setNumReduceTasks(2) するようにソースコードを修正すればいいと思います。(Reduce 処理が 2台に分散されるはず)
また、moneycounter_2.8.0-1.0.jar を 2つ同時に実行しても良いかもしれません。