sbt を使って Scala で Hadoop MapReduce 実装

sbt を使って MapReduce を Scala で実装してみました。
使用した環境は以下の通り。(sbtの環境は id:fits:20100810 と同じ)

サンプルソースは http://github.com/fits/try_samples/tree/master/blog/20101010/

プロジェクト設定

はじめに環境変数 HADOOP_HOME に Hadoop ルートディレクトリへのパスを設定しておきます。
次に sbt でプロジェクトを作成、プロジェクトファイルを以下のように作成し、Hadoop 関連の JAR ファイルを参照してビルドできるようにします。

project/build/MoneyCounterProject.scala
import sbt._

class MoneyCounterProject(info: ProjectInfo) extends DefaultProject(info) {
    //外部JARファイルを unmanaged クラスパスに追加
    override def unmanagedClasspath = super.unmanagedClasspath +++ hadoopClasspath +++ hadoopLibClasspath
    //メインクラスの設定
    override def mainClass = Some("fits.sample.MoneyCounter")

    //環境変数 HADOOP_HOME から Hadoop のホームディレクトリ取得
    lazy val hadoopHomePath = {
        val home = System.getenv("HADOOP_HOME")
        home match {
            case null => error("please set HADOOP_HOME")
            case _ => Path.fromFile(home)
        }
    }

    lazy val hadoopClasspath = hadoopHomePath * "*.jar"
    lazy val hadoopLibClasspath = hadoopHomePath / "lib" * "*.jar"
}

MapReduce実装

MapReduce の処理内容は、Software Design (ソフトウェア デザイン) 2010年 05月号 [雑誌] P.103 のコイン数え上げサンプルと同等のものを Scala で実装してみました。

注意すべき点は、map や reduce メソッドの第3引数 context はジェネリックタイプを指定したクラス(Mapper や Reducer)の Context 型を明示的に指定しなければならない点です。
Java では普通に Context とするだけで OK ですが、Context だけだと Scala ではコンパイルに失敗します、なお override 指定を消すとコンパイルには成功しますが実行時にエラーとなります)

なお、map や reduce メソッドの第3引数で "context: Mapper[LongWritable, Text, Text, IntWritable]#Context" のような記述をするのが面倒だったので、type を使った別名定義を利用しています。

src/main/scala/MoneyCounter.scala(MapReduce の実装)
package fits.sample

import scala.collection.JavaConversions._
import org.apache.hadoop.conf._
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io._
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input._
import org.apache.hadoop.mapreduce.lib.output._

object MoneyCounter {
    def main(args: Array[String]) {
        if (args.length < 2) {
            println(">hadoop jar moneycounter_2.8.0-1.0.jar [input file] [output dir]")
            return
        }

        val job = Job.getInstance(new Cluster(new Configuration))
        job.setJobName("myjob")

        //分散実行モードでこの JAR ファイルをリモート側に
        //自動配置するには以下が必要
        job.setJarByClass(classOf[CountMapper])

        //Mapper の設定
        job.setMapperClass(classOf[CountMapper])
        //Reducer の設定
        job.setReducerClass(classOf[CountReducer])

        //map 後のキーと値の型を設定
        job.setMapOutputKeyClass(classOf[Text])
        job.setMapOutputValueClass(classOf[IntWritable])
        //今回のケースでは不要
        //job.setOutputKeyClass(classOf[Text])
        //job.setOutputValueClass(classOf[IntWritable])

        //入力の設定
        FileInputFormat.setInputPaths(job, new Path(args(0)))
        //出力の設定
        FileOutputFormat.setOutputPath(job, new Path(args(1)))

        //ジョブの実行
        val res = job.waitForCompletion(true)
        println("result = " + res)
    }

    //ジェネリックタイプを指定したMapperの別名定義
    type M = Mapper[LongWritable, Text, Text, IntWritable]
    //Mapper のサブクラス
    class CountMapper extends M {
        val one = new IntWritable(1)

        //context は Mapper[LongWritable, Text, Text, IntWritable]#Context 型
        //を指定しないとコンパイルに失敗
        override def map(key: LongWritable, value: Text, context: M#Context) {
            context.write(value, one)
        }
    }

    //ジェネリックタイプを指定したReducerの別名定義
    type R = Reducer[Text, IntWritable, Text, IntWritable]
    //Reducer のサブクラス
    class CountReducer extends R {
        //context は Reducer[Text, IntWritable, Text, IntWritable]#Context 型
        //を指定しないとコンパイルに失敗
        override def reduce(key: Text, values: java.lang.Iterable[IntWritable], context: R#Context) {

            val count = values.foldLeft(0)(_ + _.get())
            context.write(key, new IntWritable(key.toString.toInt * count))
        }
    }
}

また、setOutputKeyClass や setOutputValueClass を使って reduce 後の型を指定してませんが、今回のケースでは不要でした。(setMapOutputKeyClass・ValueClass で指定した型と同一のためだと考えられる)

ビルド

> sbt package

hadoop コマンドによる実行(スタンドアロン

ビルドで生成された JAR ファイルを hadoop コマンドに指定して実行します。(Windows 環境では cygwin を使う必要あり)
入力ファイル(例 work/testfile)は "hadoop fs" 等を使って事前に用意しておきます。

なお、実行時に Scala のライブラリを使用するため、環境変数 HADOOP_CLASSPATHscala-library.jar のパスを設定しておきます。

cygwinでの実行例: hadoop_testrun.sh ファイル参照(Java と Hadoop は C ドライブ直下に配置されているものとする)
$ export JAVA_HOME=/cygdrive/c/jdk1.6.0_21
$ export HADOOP_HOME=/cygdrive/c/hadoop-0.21.0
$ export PATH=$HADOOP_HOME/bin:$JAVA_HOME/bin:$PATH
$ export HADOOP_CLASSPATH=project/boot/scala-2.8.0/lib/scala-library.jar

$ hadoop jar target/scala_2.8.0/moneycounter_2.8.0-1.0.jar work/testfile work/testoutput
・・・
10/10/10 18:53:45 INFO mapreduce.Job: Counters: 18
        FileInputFormatCounters
                BYTES_READ=80
        FileSystemCounters
                FILE_BYTES_READ=715
                FILE_BYTES_WRITTEN=126007
        Map-Reduce Framework
                Combine input records=0
                Combine output records=0
                Failed Shuffles=0
                GC time elapsed (ms)=26
                Map input records=17
                Map output bytes=131
                Map output records=17
                Merged Map outputs=0
                Reduce input groups=9
                Reduce input records=17
                Reduce output records=9
                Reduce shuffle bytes=0
                Shuffled Maps =0
                Spilled Records=34
                SPLIT_RAW_BYTES=138
result = true

[追記] sbt での実行(スタンドアロン実行)

Windows 環境で今回のサンプルを sbt から実行するだけなら、cygwin 上で以下のように sbt の run アクションを実行するだけで可能です。(入力ファイルと出力ディレクトリを引数で指定)

cygwin + sbt での実行例(Java, Hadoop, sbt は C ドライブ直下に配置されているものとする)
$ export JAVA_HOME=/cygdrive/c/jdk1.6.0_21
$ export HADOOP_HOME=c:/hadoop-0.21.0
$ export PATH=$JAVA_HOME/bin:$PATH

$ java -jar c:/sbt/sbt-launch-0.7.4.jar
[info] Building project MoneyCounter 1.0 against Scala 2.8.0
[info]    using MoneyCounterProject with sbt 0.7.4 and Scala 2.7.7
> run work/testfile work/testoutput
・・・
result = true

ただし、sbt の unmanagedClasspath に HADOOP の JAR ファイルを適切に設定させるために、環境変数 HADOOP_HOME に "/cygdrive/xxx" のパス表記を使用してはいけない点に注意。("c:/xxx" のようなドライブ表記を使えばよい)

ちなみに、cygwin を使わずに実行すると java.io.IOException(Cannot run program "chmod")が発生します。