sbt を使って Scala で Hadoop MapReduce 実装
sbt を使って MapReduce を Scala で実装してみました。
使用した環境は以下の通り。(sbtの環境は id:fits:20100810 と同じ)
サンプルソースは http://github.com/fits/try_samples/tree/master/blog/20101010/
プロジェクト設定
はじめに環境変数 HADOOP_HOME に Hadoop ルートディレクトリへのパスを設定しておきます。
次に sbt でプロジェクトを作成、プロジェクトファイルを以下のように作成し、Hadoop 関連の JAR ファイルを参照してビルドできるようにします。
project/build/MoneyCounterProject.scala
import sbt._ class MoneyCounterProject(info: ProjectInfo) extends DefaultProject(info) { //外部JARファイルを unmanaged クラスパスに追加 override def unmanagedClasspath = super.unmanagedClasspath +++ hadoopClasspath +++ hadoopLibClasspath //メインクラスの設定 override def mainClass = Some("fits.sample.MoneyCounter") //環境変数 HADOOP_HOME から Hadoop のホームディレクトリ取得 lazy val hadoopHomePath = { val home = System.getenv("HADOOP_HOME") home match { case null => error("please set HADOOP_HOME") case _ => Path.fromFile(home) } } lazy val hadoopClasspath = hadoopHomePath * "*.jar" lazy val hadoopLibClasspath = hadoopHomePath / "lib" * "*.jar" }
MapReduce実装
MapReduce の処理内容は、Software Design (ソフトウェア デザイン) 2010年 05月号 [雑誌] P.103 のコイン数え上げサンプルと同等のものを Scala で実装してみました。
注意すべき点は、map や reduce メソッドの第3引数 context はジェネリックタイプを指定したクラス(Mapper や Reducer)の Context 型を明示的に指定しなければならない点です。
(Java では普通に Context とするだけで OK ですが、Context だけだと Scala ではコンパイルに失敗します、なお override 指定を消すとコンパイルには成功しますが実行時にエラーとなります)
なお、map や reduce メソッドの第3引数で "context: Mapper[LongWritable, Text, Text, IntWritable]#Context" のような記述をするのが面倒だったので、type を使った別名定義を利用しています。
src/main/scala/MoneyCounter.scala(MapReduce の実装)
package fits.sample import scala.collection.JavaConversions._ import org.apache.hadoop.conf._ import org.apache.hadoop.fs.Path import org.apache.hadoop.io._ import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.input._ import org.apache.hadoop.mapreduce.lib.output._ object MoneyCounter { def main(args: Array[String]) { if (args.length < 2) { println(">hadoop jar moneycounter_2.8.0-1.0.jar [input file] [output dir]") return } val job = Job.getInstance(new Cluster(new Configuration)) job.setJobName("myjob") //分散実行モードでこの JAR ファイルをリモート側に //自動配置するには以下が必要 job.setJarByClass(classOf[CountMapper]) //Mapper の設定 job.setMapperClass(classOf[CountMapper]) //Reducer の設定 job.setReducerClass(classOf[CountReducer]) //map 後のキーと値の型を設定 job.setMapOutputKeyClass(classOf[Text]) job.setMapOutputValueClass(classOf[IntWritable]) //今回のケースでは不要 //job.setOutputKeyClass(classOf[Text]) //job.setOutputValueClass(classOf[IntWritable]) //入力の設定 FileInputFormat.setInputPaths(job, new Path(args(0))) //出力の設定 FileOutputFormat.setOutputPath(job, new Path(args(1))) //ジョブの実行 val res = job.waitForCompletion(true) println("result = " + res) } //ジェネリックタイプを指定したMapperの別名定義 type M = Mapper[LongWritable, Text, Text, IntWritable] //Mapper のサブクラス class CountMapper extends M { val one = new IntWritable(1) //context は Mapper[LongWritable, Text, Text, IntWritable]#Context 型 //を指定しないとコンパイルに失敗 override def map(key: LongWritable, value: Text, context: M#Context) { context.write(value, one) } } //ジェネリックタイプを指定したReducerの別名定義 type R = Reducer[Text, IntWritable, Text, IntWritable] //Reducer のサブクラス class CountReducer extends R { //context は Reducer[Text, IntWritable, Text, IntWritable]#Context 型 //を指定しないとコンパイルに失敗 override def reduce(key: Text, values: java.lang.Iterable[IntWritable], context: R#Context) { val count = values.foldLeft(0)(_ + _.get()) context.write(key, new IntWritable(key.toString.toInt * count)) } } }
また、setOutputKeyClass や setOutputValueClass を使って reduce 後の型を指定してませんが、今回のケースでは不要でした。(setMapOutputKeyClass・ValueClass で指定した型と同一のためだと考えられる)
ビルド
> sbt package
hadoop コマンドによる実行(スタンドアロン)
ビルドで生成された JAR ファイルを hadoop コマンドに指定して実行します。(Windows 環境では cygwin を使う必要あり)
入力ファイル(例 work/testfile)は "hadoop fs" 等を使って事前に用意しておきます。
なお、実行時に Scala のライブラリを使用するため、環境変数 HADOOP_CLASSPATH に scala-library.jar のパスを設定しておきます。
cygwinでの実行例: hadoop_testrun.sh ファイル参照(Java と Hadoop は C ドライブ直下に配置されているものとする)
$ export JAVA_HOME=/cygdrive/c/jdk1.6.0_21 $ export HADOOP_HOME=/cygdrive/c/hadoop-0.21.0 $ export PATH=$HADOOP_HOME/bin:$JAVA_HOME/bin:$PATH $ export HADOOP_CLASSPATH=project/boot/scala-2.8.0/lib/scala-library.jar $ hadoop jar target/scala_2.8.0/moneycounter_2.8.0-1.0.jar work/testfile work/testoutput ・・・ 10/10/10 18:53:45 INFO mapreduce.Job: Counters: 18 FileInputFormatCounters BYTES_READ=80 FileSystemCounters FILE_BYTES_READ=715 FILE_BYTES_WRITTEN=126007 Map-Reduce Framework Combine input records=0 Combine output records=0 Failed Shuffles=0 GC time elapsed (ms)=26 Map input records=17 Map output bytes=131 Map output records=17 Merged Map outputs=0 Reduce input groups=9 Reduce input records=17 Reduce output records=9 Reduce shuffle bytes=0 Shuffled Maps =0 Spilled Records=34 SPLIT_RAW_BYTES=138 result = true
[追記] sbt での実行(スタンドアロン実行)
Windows 環境で今回のサンプルを sbt から実行するだけなら、cygwin 上で以下のように sbt の run アクションを実行するだけで可能です。(入力ファイルと出力ディレクトリを引数で指定)
cygwin + sbt での実行例(Java, Hadoop, sbt は C ドライブ直下に配置されているものとする)
$ export JAVA_HOME=/cygdrive/c/jdk1.6.0_21 $ export HADOOP_HOME=c:/hadoop-0.21.0 $ export PATH=$JAVA_HOME/bin:$PATH $ java -jar c:/sbt/sbt-launch-0.7.4.jar [info] Building project MoneyCounter 1.0 against Scala 2.8.0 [info] using MoneyCounterProject with sbt 0.7.4 and Scala 2.7.7 > run work/testfile work/testoutput ・・・ result = true
ただし、sbt の unmanagedClasspath に HADOOP の JAR ファイルを適切に設定させるために、環境変数 HADOOP_HOME に "/cygdrive/xxx" のパス表記を使用してはいけない点に注意。("c:/xxx" のようなドライブ表記を使えばよい)
ちなみに、cygwin を使わずに実行すると java.io.IOException(Cannot run program "chmod")が発生します。