Groovy で Apache Flink を使用
「Groovy で Apache Spark を使用」と同様の処理を Apache Flink で試してみました。
今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170311/
サンプルスクリプト
今回はローカルで実行するだけなので ExecutionEnvironment.createLocalEnvironment()
で取得した LocalEnvironment
を使用します。
map メソッドの引数へ Groovy のクロージャを使ったところ、org.apache.flink.api.common.InvalidProgramException: The implementation of the MapFunction is not serializable. ・・・
となってしまい駄目でしたので、MapFunction
の実装クラスを定義しました。
その場合、MapFunction の型引数をきちんと指定する必要があります。(そうしないと InvalidTypesException
が発生)
なお、flink-clients_2.10 を使用する場合、scala-library の @Grab 定義は不要でした。(flink-clients_2.11 の場合のみ scala-library が必要)
money_count.groovy
@Grapes([ @Grab('org.apache.flink:flink-java:1.2.0'), @GrabExclude('io.netty#netty;3.7.0.Final') ]) @Grab('org.apache.flink:flink-clients_2.11:1.2.0') @Grab('org.scala-lang:scala-library:2.11.8') @Grab('org.jboss.netty:netty:3.2.10.Final') import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.java.ExecutionEnvironment import org.apache.flink.api.java.tuple.Tuple2 import groovy.transform.CompileStatic // @CompileStatic は必須ではない(無くても動作する) @CompileStatic class ToTuple implements MapFunction<String, Tuple2<String, Integer>> { Tuple2 map(String v) { new Tuple2(v, 1) } } def env = ExecutionEnvironment.createLocalEnvironment() env.readTextFile(args[0]).map(new ToTuple()).groupBy(0).sum(1).print()
groupBy
メソッドではグルーピング対象とする項目を、sum
メソッドでは合計する項目を数値で指定します。
実行
Groovy のデフォルト設定では java.lang.IllegalArgumentException: Size of total memory must be positive.
が発生しましたので、JAVA_OPTS
環境変数で最大メモリサイズ (-Xmx) を変更して実行します。
実行結果
> set JAVA_OPTS=-Xmx512m > groovy money_count.groovy input_sample.txt Connected to JobManager at Actor[akka://flink/user/jobmanager_1#1033779003] 03/08/2017 00:56:11 Job execution switched to status RUNNING. ・・・ (10000,2) (10,2) (100,2) (50,1) (500,1) (1,2) (1000,3) (2000,1) (5,3)
input_sample.txt の内容は以下の通りです。
input_sample.txt
100 1 5 50 500 1000 10000 1000 1 10 5 5 10 100 1000 10000 2000