Scala 2.10 で Apache Spark を使用

Apache Spark をソースからビルドして Scala 2.10 上で実行してみました。

サンプルソースは http://github.com/fits/try_samples/tree/master/blog/20131121/

Spark のソースをビルド

まずは、Spark ソースの scala-2.10 ブランチを git clone します。

ソースの取得
$ git clone -b scala-2.10 git://github.com/apache/incubator-spark.git

次に、sbt ディレクトリにある sbt コマンドで assembly を実行します。

なお、現時点のデフォルトでは Hadoop 1.0.4 用のモジュールが生成されるようですが、SPARK_HADOOP_VERSIONSPARK_YARN 環境変数Hadoop のバージョンを変更できるようです。

ビルド
$ cd incubator-spark
$ sbt/sbt assembly

ビルドが正常に完了すると、必要なモジュールを一通り組み込んだ assembly/target/scala-2.10/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop1.0.4.jar が生成されます。 (examples の方にも JAR が作成されます)

Scala 2.10 で実行

それでは、前回とほぼ同じ内容の下記ソースを実行してみます。

src/main/scala/money_count.scala
package fits.sample

import org.apache.spark._
import SparkContext._

object MoneyCount {
    def main(args: Array[String]) {

        val spark = new SparkContext("local", "MoneyCount")

        val file = spark.textFile(args(0))

        val res = file.map {
            (_, 1)
        }.reduceByKey(_ + _)

        res.foreach { t => println(s"${t._1} = ${t._2}") }
    }
}

実行は今回も Gradle で行います。

Spark のソースをビルドしてできた spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop1.0.4.jar を lib ディレクトリへ配置しました。

build.gradle
apply plugin: 'scala'
apply plugin: 'application'

def scalaVersion = '2.10.3'

repositories {
    mavenCentral()
}

dependencies {
    compile "org.scala-lang:scala-library:${scalaVersion}"
    compile files('lib/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop1.0.4.jar')
}

mainClassName = "fits.sample.MoneyCount"

run {
    if (project.hasProperty('args')) {
        args project.args
    }
}

ディレクトリ構成は以下のようにしました。

ディレクトリ構成
- lib
   |_ spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop1.0.4.jar

- src
   |_ main
       |_ scala
       |   |_ money_count.scala
       |
       |_ resources
           |_ log4j.properties

- build.gradle
- input_sample.txt

なお、ログ出力には slf4j-log4j12 が使われるので、とりあえず log4j の設定ファイルを用意しておきました。

src/main/resources/log4j.properties
log4j.rootLogger=WARN, CONSOLE

log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%5p %m%n

gradle run で実行してみると下記のような結果となり、Scala 2.10 上での動作が確認できました。

実行結果

> gradle run -Pargs=input_sample.txt

:compileJava UP-TO-DATE
:compileScala UP-TO-DATE
:processResources
:classes
:run
 WARN Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
 WARN Snappy native library not loaded
1 = 2
10 = 2
10000 = 2
5 = 3
100 = 2
1000 = 3
50 = 1
500 = 1
2000 = 1

BUILD SUCCESSFUL