Groovy で Apache Flink を使用

Groovy で Apache Spark を使用」と同様の処理を Apache Flink で試してみました。

今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170311/

サンプルスクリプト

今回はローカルで実行するだけなので ExecutionEnvironment.createLocalEnvironment() で取得した LocalEnvironment を使用します。

map メソッドの引数へ Groovy のクロージャを使ったところ、org.apache.flink.api.common.InvalidProgramException: The implementation of the MapFunction is not serializable. ・・・ となってしまい駄目でしたので、MapFunction の実装クラスを定義しました。

その場合、MapFunction の型引数をきちんと指定する必要があります。(そうしないと InvalidTypesException が発生)

なお、flink-clients_2.10 を使用する場合、scala-library の @Grab 定義は不要でした。(flink-clients_2.11 の場合のみ scala-library が必要)

money_count.groovy
@Grapes([
    @Grab('org.apache.flink:flink-java:1.2.0'),
    @GrabExclude('io.netty#netty;3.7.0.Final')
])
@Grab('org.apache.flink:flink-clients_2.11:1.2.0')
@Grab('org.scala-lang:scala-library:2.11.8')
@Grab('org.jboss.netty:netty:3.2.10.Final')
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.java.ExecutionEnvironment
import org.apache.flink.api.java.tuple.Tuple2
import groovy.transform.CompileStatic

// @CompileStatic は必須ではない(無くても動作する)
@CompileStatic
class ToTuple implements MapFunction<String, Tuple2<String, Integer>> {
    Tuple2 map(String v) {
        new Tuple2(v, 1)
    }
}

def env = ExecutionEnvironment.createLocalEnvironment()

env.readTextFile(args[0]).map(new ToTuple()).groupBy(0).sum(1).print()

groupBy メソッドではグルーピング対象とする項目を、sum メソッドでは合計する項目を数値で指定します。

実行

Groovy のデフォルト設定では java.lang.IllegalArgumentException: Size of total memory must be positive. が発生しましたので、JAVA_OPTS 環境変数で最大メモリサイズ (-Xmx) を変更して実行します。

実行結果
> set JAVA_OPTS=-Xmx512m
> groovy money_count.groovy input_sample.txt

Connected to JobManager at Actor[akka://flink/user/jobmanager_1#1033779003]
03/08/2017 00:56:11     Job execution switched to status RUNNING.
・・・
(10000,2)
(10,2)
(100,2)
(50,1)
(500,1)
(1,2)
(1000,3)
(2000,1)
(5,3)

input_sample.txt の内容は以下の通りです。

input_sample.txt
100
1
5
50
500
1000
10000
1000
1
10
5
5
10
100
1000
10000
2000