Java で Apache Beam を使用

前回 と同等の処理を Apache Beam を使って実装してみます。

今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170327/

サンプルアプリケーション

Beam では Pipelineapply メソッドで処理を繋げるようですので、今回は以下のように実装してみました。

  • (1) Count.perElement メソッドを使って要素毎にカウントした KV<String, Long> を取得
  • (2) ToString.kvs メソッドを使って KV の Key と Value の値を連結して文字列化
  • (3) DoFn@ProcessElement を付与したメソッドを実装し (2) で取得した文字列を標準出力

apply メソッドの引数に使用する PTransformorg.apache.beam.sdk.transforms パッケージ下に主要なものが用意されているようです。

標準出力を行うための基本作法が分からなかったので、今回は DoFn を使っています。 (他に MapElements.via(SimpleFunction) を使う方法等も考えられます)

DoFn ではアノテーションを使って処理メソッドを指定するようになっており、入力要素を 1件ずつ処理するための @ProcessElement アノテーションの他にもいくつか用意されているようです。(例えば @Setup@StartBundle 等)

また、アノテーションを付与したメソッドは引数の型等をチェックするようになっています。 (org.apache.beam.sdk.transforms.reflect.DoFnSignatures 等のソース参照)

src/main/java/MoneyCount.java
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.ToString;

public class MoneyCount {
    public static void main(String... args) throws Exception {
        PipelineOptions opt = PipelineOptionsFactory.create();
        Pipeline p = Pipeline.create(opt);

        p.apply(TextIO.Read.from(args[0]))
                .apply(Count.perElement()) // (1)
                .apply(ToString.kvs())     // (2)
                .apply(ParDo.of(new DoFn<String, String>() { // (3)
                    @ProcessElement
                    public void process(ProcessContext ctx) {
                        System.out.println(ctx.element());
                    }
                }));

        p.run().waitUntilFinish();
    }
}

実行

以下のビルド定義ファイルを使って実行します。

今回は DirectRunnerbeam-runners-direct-java) で実行していますが、Apache Spark や Flink 等で実行する方法も用意されています。

build.gradle
apply plugin: 'application'

mainClassName = 'SampleApp'

repositories {
    jcenter()
}

dependencies {
    compile 'org.apache.beam:beam-sdks-java-core:0.6.0'

    runtime 'org.apache.beam:beam-runners-direct-java:0.6.0'
    runtime 'org.slf4j:slf4j-nop:1.7.25'
}

run {
    if (project.hasProperty('args')) {
        args project.args
    }
}

実行結果は以下の通りです。 なお、出力順は実行の度に変化します。

実行結果
> gradle run -q -Pargs=input_sample.txt

10000,2
5,3
2000,1
1000,3
10,2
500,1
100,2
1,2
50,1

input_sample.txt の内容は以下の通りです。

input_sample.txt
100
1
5
50
500
1000
10000
1000
1
10
5
5
10
100
1000
10000
2000