Java で Apache Beam を使用
前回 と同等の処理を Apache Beam を使って実装してみます。
今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170327/
サンプルアプリケーション
Beam では Pipeline
の apply
メソッドで処理を繋げるようですので、今回は以下のように実装してみました。
- (1)
Count.perElement
メソッドを使って要素毎にカウントしたKV<String, Long>
を取得 - (2)
ToString.kvs
メソッドを使ってKV
の Key と Value の値を連結して文字列化 - (3)
DoFn
で@ProcessElement
を付与したメソッドを実装し (2) で取得した文字列を標準出力
apply
メソッドの引数に使用する PTransform
は org.apache.beam.sdk.transforms
パッケージ下に主要なものが用意されているようです。
標準出力を行うための基本作法が分からなかったので、今回は DoFn
を使っています。 (他に MapElements.via(SimpleFunction)
を使う方法等も考えられます)
DoFn
ではアノテーションを使って処理メソッドを指定するようになっており、入力要素を 1件ずつ処理するための @ProcessElement
アノテーションの他にもいくつか用意されているようです。(例えば @Setup
や @StartBundle
等)
また、アノテーションを付与したメソッドは引数の型等をチェックするようになっています。 (org.apache.beam.sdk.transforms.reflect.DoFnSignatures
等のソース参照)
src/main/java/MoneyCount.java
import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.ToString; public class MoneyCount { public static void main(String... args) throws Exception { PipelineOptions opt = PipelineOptionsFactory.create(); Pipeline p = Pipeline.create(opt); p.apply(TextIO.Read.from(args[0])) .apply(Count.perElement()) // (1) .apply(ToString.kvs()) // (2) .apply(ParDo.of(new DoFn<String, String>() { // (3) @ProcessElement public void process(ProcessContext ctx) { System.out.println(ctx.element()); } })); p.run().waitUntilFinish(); } }
実行
以下のビルド定義ファイルを使って実行します。
今回は DirectRunner
(beam-runners-direct-java
) で実行していますが、Apache Spark や Flink 等で実行する方法も用意されています。
build.gradle
apply plugin: 'application' mainClassName = 'SampleApp' repositories { jcenter() } dependencies { compile 'org.apache.beam:beam-sdks-java-core:0.6.0' runtime 'org.apache.beam:beam-runners-direct-java:0.6.0' runtime 'org.slf4j:slf4j-nop:1.7.25' } run { if (project.hasProperty('args')) { args project.args } }
実行結果は以下の通りです。 なお、出力順は実行の度に変化します。
実行結果
> gradle run -q -Pargs=input_sample.txt 10000,2 5,3 2000,1 1000,3 10,2 500,1 100,2 1,2 50,1
input_sample.txt の内容は以下の通りです。
input_sample.txt
100 1 5 50 500 1000 10000 1000 1 10 5 5 10 100 1000 10000 2000