Java 8 で Apache Flink を使用

前回 と同様の処理を Java8 のラムダ式を使って実装してみました。

今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170313/

サンプル

前回 の処理をラムダ式を使って Java で実装すると以下のようになりました。

MoneyCount.java
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;

public class MoneyCount {
    public static void main(String... args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();

        env.readTextFile(args[0])
            .map( w -> new Tuple2<>(w, 1) )
            .groupBy(0)
            .sum(1)
            .print();
    }
}

実行

Flink 1.2.0 では上記のように map 等の引数へラムダ式を使った場合、通常の JDKコンパイルすると実行時にエラーが発生してしまいます。

(a) JDKコンパイルして実行

  • javac 1.8.0_121

以下の Gradle ビルド定義を使って実行してみます。

build.gradle
apply plugin: 'application'

mainClassName = 'MoneyCount'

repositories {
    jcenter()
}

dependencies {
    compile 'org.apache.flink:flink-java:1.2.0'
    runtime 'org.apache.flink:flink-clients_2.11:1.2.0'
}

run {
    if (project.hasProperty('args')) {
        args project.args
    }
}

実行すると以下のようにエラーとなりました。

Tuple2 の型引数が失われている点が問題となっており、Eclipse JDT compiler でコンパイルしなければならないようです。

実行結果 ※
> gradle run -q -Pargs=input_sample.txt

Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: 
  The return type of function 'main(MoneyCount.java:10)' could not be determined automatically, due to type erasure. 
  You can give type information hints by using the returns(...) method on the result of the transformation call, 
  or by letting your function implement the 'ResultTypeQueryable' interface.
        at org.apache.flink.api.java.DataSet.getType(DataSet.java:174)
        at org.apache.flink.api.java.DataSet.groupBy(DataSet.java:692)
        at MoneyCount.main(MoneyCount.java:11)
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Tuple2' are missing.
It seems that your compiler has not stored them into the .class file.
Currently, only the Eclipse JDT compiler preserves the type information necessary to use the lambdas feature type-safely.
See the documentation for more information about how to compile jobs containing lambda expressions.
        at org.apache.flink.api.java.typeutils.TypeExtractor.validateLambdaGenericParameter(TypeExtractor.java:1528)
        ・・・

※ 出力結果には改行を適当に加えています

(b) Eclipse JDT Compiler でコンパイル(-genericsignature オプション付)して実行

  • ecj 4.6.1

次に Eclipse JDT Compiler でコンパイルし実行してみます。

Eclipse JDT Compiler は java -jar ecj.jar ・・・ で実行できるので、Gradle で実施する場合は compileJavaforkOptions を使って設定します。

今回のケースでは、Eclipse JDT Compiler を -genericsignature オプション付きで実行する点が重要です。(付けない場合は JDK と同じ結果になります)

build-ecj.gradle
apply plugin: 'application'

mainClassName = 'MoneyCount'

configurations {
    ecj
}

repositories {
    jcenter()
}

dependencies {
    compile 'org.apache.flink:flink-java:1.2.0'
    runtime 'org.apache.flink:flink-clients_2.11:1.2.0'

    ecj 'org.eclipse.jdt.core.compiler:ecj:4.6.1'
    // 以下でも可
    //ecj 'org.eclipse.scout.sdk.deps:ecj:4.6.2'
}

// Eclipse JDT Compiler の設定
compileJava {
    options.fork = true
    // -genericsignature オプションの指定
    options.compilerArgs << '-genericsignature'

    // java -jar ecj.jar を実行するための設定
    options.forkOptions.with {
        executable = 'java'
        jvmArgs = ['-jar', configurations.ecj.asPath]
    }
}

run {
    if (project.hasProperty('args')) {
        args project.args
    }
}

実行結果は以下の通り、正常に実行できるようになりました。

実行結果
> gradle run -q -b build-ecj.gradle -Pargs=input_sample.txt

Connected to JobManager at Actor[akka://flink/user/jobmanager_1#1055289406]
03/13/2017 20:40:46     Job execution switched to status RUNNING.
・・・
03/13/2017 20:40:47     Job execution switched to status FINISHED.
(10000,2)
(10,2)
(100,2)
(50,1)
(500,1)
(1,2)
(1000,3)
(2000,1)
(5,3)

input_sample.txt の内容は以下の通りです。

input_sample.txt
100
1
5
50
500
1000
10000
1000
1
10
5
5
10
100
1000
10000
2000

コンパイル結果の比較

最後に、コンパイル結果(.class ファイル) を CFR で処理(以下のコマンドを適用)して違いを確認しておきます。 (--decodelambdas false オプションでラムダ式の部分をメソッドとして残すようにしています)

java -jar cfr_0_120.jar MoneyCount.class --decodelambdas false

まずは JDK(javac 1.8.0_121)のコンパイル結果を確認してみます。

(a) JDKコンパイルした場合(CFR の処理結果)
・・・
public class MoneyCount {
    public static /* varargs */ void main(String ... args) throws Exception {
        ・・・
    }

    private static /* synthetic */ Tuple2 lambda$main$95f17bfa$1(String w) throws Exception {
        return new Tuple2((Object)w, (Object)1);
    }
}

lambda$main$95f17bfa$1 の戻り値の型が Tuple2 となっており、型引数が失われています。(これが実行時のエラー原因)

次に Eclipse JDT Compiler のコンパイル結果を確認してみます。

(b) Eclipse JDT Compiler でコンパイル(-genericsignature オプション付)した場合(CFR の処理結果)
・・・
public class MoneyCount {
    public static /* varargs */ void main(String ... args) throws Exception {
        ・・・
    }

    private static /* synthetic */ Tuple2<String, Integer> lambda$0(String w) throws Exception {
        return new Tuple2((Object)w, (Object)1);
    }
}

lambda$0 の戻り値の型が Tuple2<String, Integer> となっており、型引数が失われずに残っています。(これが実行に成功した理由)

なお、-genericsignature オプションを付けずにコンパイルすると JDK と同様に型引数が失われます。