Java で Apache Spark を使用

以前、sbt を使って Scala で Hadoop MapReduce 実装Groovy で Storm を使う で実施したお金の数え上げ処理を Spark 0.8 を使って Java で実装してみました。

Spark は以前、Spark を使って単純なレコメンドを実施 で 0.4 を試しましたが、0.8 でも API に大きな変化はないようです。(パッケージ名は org.apache.spark へ変わってますが)

サンプルソースは http://github.com/fits/try_samples/tree/master/blog/20131116/

はじめに

実装する処理内容は、下記のようなファイルを読み込んで数値毎にカウントするだけの単純なものです。

input_sample.txt
100
1
5
50
500
1000
10000
1000
1
10
5
5
10
100
1000
10000
2000

Java で実装

実装した処理内容は下記の通りです。

今回は単一のワーカースレッドでローカル実行させるだけなので JavaSparkContext の第1引数に local と指定しています。

複数のワーカースレッドでローカル実行するには local[スレッド数] のように指定します。(例えば 4スレッドの場合は local[4]

src/main/java/fits/sample/MoneyCount.java
package fits.sample;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;

class MoneyCount {
    public static void main(String... args) {
        JavaSparkContext spark = new JavaSparkContext("local", "MoneyCount");

        JavaRDD<String> file = spark.textFile(args[0]);

        JavaPairRDD<String, Integer> res = file.map(new PairFunction<String, String, Integer>() {
            public Tuple2<String, Integer> call(String s) {
                return new Tuple2<String, Integer>(s, 1);
            }
        }).reduceByKey(new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer t1, Integer t2) {
                return t1 + t2;
            }
        });

        res.foreach(new VoidFunction<Tuple2<String, Integer>>() {
            public void call(Tuple2<String, Integer> t) {
                System.out.println(t._1 + " = " + t._2);
            }
        });
    }
}

gradle でビルド・実行してみます。

Spark 0.8 は古めの Akka ライブラリに依存しているため http://repo.typesafe.com/typesafe/releases/ リポジトリを追加する必要があるようです。

build.gradle
apply plugin: 'java'
apply plugin: 'application'

repositories {
    mavenCentral()

    maven {
        url 'http://repo.typesafe.com/typesafe/releases/'
    }
}

dependencies {
    compile 'org.apache.spark:spark-core_2.9.3:0.8.0-incubating'
    compile 'org.slf4j:slf4j-nop:1.7.5'
}

mainClassName = "fits.sample.MoneyCount"

run {
    if (project.hasProperty('args')) {
        args project.args
    }
}

実行結果は下記のようになります。

実行結果

> gradle run -Pargs=input_sample.txt

:compileJava
:processResources UP-TO-DATE
:classes
:run
13/11/16 02:12:37 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
13/11/16 02:12:37 WARN snappy.LoadSnappy: Snappy native library not loaded
13/11/16 02:12:37 INFO mapred.FileInputFormat: Total input paths to process : 1
1 = 2
10 = 2
10000 = 2
5 = 3
100 = 2
1000 = 3
50 = 1
500 = 1
2000 = 1

BUILD SUCCESSFUL

なお、Java 8 や Groovy で同様の処理を試そうとしてみましたが、実行時に ASM 関係でエラーが発生し、正常に実行させる事はできませんでした。

Scala で実装

同じ処理を Scala で実装すると下記のようにもっとシンプルになります。

src/main/scala/money_count.scala
package fits.sample

import org.apache.spark._
import SparkContext._

object MoneyCount {
    def main(args: Array[String]) {
        val spark = new SparkContext("local", "MoneyCount")

        val file = spark.textFile(args(0))

        val res = file.map {
            (_, 1)
        }.reduceByKey(_ + _)

        res.foreach { t => println(t._1 + " = " + t._2) }
    }
}

Maven セントラルリポジトリには、Scala 2.10 用の Spark モジュールが見当たらないため、基本的に Scala 2.9 で実行する事になります。

ちなみに、Spark(バージョンは 0.9)をソースからビルドすれば Scala 2.10 で実行できました。(git://github.com/apache/incubator-spark.git の scala-2.10 ブランチ使用)

build.gradle
apply plugin: 'scala'
apply plugin: 'application'

def scalaVersion = '2.9.3'

repositories {
    mavenCentral()

    maven {
        url 'http://repo.typesafe.com/typesafe/releases/'
    }
}

dependencies {
    compile "org.scala-lang:scala-library:${scalaVersion}"
    compile "org.apache.spark:spark-core_${scalaVersion}:0.8.0-incubating"
    compile 'org.slf4j:slf4j-nop:1.7.5'
}

mainClassName = "fits.sample.MoneyCount"

run {
    if (project.hasProperty('args')) {
        args project.args
    }
}

実行結果

> gradle run -Pargs=input_sample.txt

:compileJava UP-TO-DATE
:compileScala UP-TO-DATE
:processResources UP-TO-DATE
:classes UP-TO-DATE
:run
13/11/16 01:38:39 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
13/11/16 01:38:39 WARN snappy.LoadSnappy: Snappy native library not loaded
13/11/16 01:38:39 INFO mapred.FileInputFormat: Total input paths to process : 1
1 = 2
10 = 2
10000 = 2
5 = 3
100 = 2
1000 = 3
50 = 1
500 = 1
2000 = 1

BUILD SUCCESSFUL