Java で Apache Beam を使用

前回 と同等の処理を Apache Beam を使って実装してみます。

今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170327/

サンプルアプリケーション

Beam では Pipelineapply メソッドで処理を繋げるようですので、今回は以下のように実装してみました。

  • (1) Count.perElement メソッドを使って要素毎にカウントした KV<String, Long> を取得
  • (2) ToString.kvs メソッドを使って KV の Key と Value の値を連結して文字列化
  • (3) DoFn@ProcessElement を付与したメソッドを実装し (2) で取得した文字列を標準出力

apply メソッドの引数に使用する PTransformorg.apache.beam.sdk.transforms パッケージ下に主要なものが用意されているようです。

標準出力を行うための基本作法が分からなかったので、今回は DoFn を使っています。 (他に MapElements.via(SimpleFunction) を使う方法等も考えられます)

DoFn ではアノテーションを使って処理メソッドを指定するようになっており、入力要素を 1件ずつ処理するための @ProcessElement アノテーションの他にもいくつか用意されているようです。(例えば @Setup@StartBundle 等)

また、アノテーションを付与したメソッドは引数の型等をチェックするようになっています。 (org.apache.beam.sdk.transforms.reflect.DoFnSignatures 等のソース参照)

src/main/java/MoneyCount.java
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.ToString;

public class MoneyCount {
    public static void main(String... args) throws Exception {
        PipelineOptions opt = PipelineOptionsFactory.create();
        Pipeline p = Pipeline.create(opt);

        p.apply(TextIO.Read.from(args[0]))
                .apply(Count.perElement()) // (1)
                .apply(ToString.kvs())     // (2)
                .apply(ParDo.of(new DoFn<String, String>() { // (3)
                    @ProcessElement
                    public void process(ProcessContext ctx) {
                        System.out.println(ctx.element());
                    }
                }));

        p.run().waitUntilFinish();
    }
}

実行

以下のビルド定義ファイルを使って実行します。

今回は DirectRunnerbeam-runners-direct-java) で実行していますが、Apache Spark や Flink 等で実行する方法も用意されています。

build.gradle
apply plugin: 'application'

mainClassName = 'SampleApp'

repositories {
    jcenter()
}

dependencies {
    compile 'org.apache.beam:beam-sdks-java-core:0.6.0'

    runtime 'org.apache.beam:beam-runners-direct-java:0.6.0'
    runtime 'org.slf4j:slf4j-nop:1.7.25'
}

run {
    if (project.hasProperty('args')) {
        args project.args
    }
}

実行結果は以下の通りです。 なお、出力順は実行の度に変化します。

実行結果
> gradle run -q -Pargs=input_sample.txt

10000,2
5,3
2000,1
1000,3
10,2
500,1
100,2
1,2
50,1

input_sample.txt の内容は以下の通りです。

input_sample.txt
100
1
5
50
500
1000
10000
1000
1
10
5
5
10
100
1000
10000
2000

Java 8 で Apache Flink を使用

前回 と同様の処理を Java8 のラムダ式を使って実装してみました。

今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170313/

サンプル

前回 の処理をラムダ式を使って Java で実装すると以下のようになりました。

MoneyCount.java
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;

public class MoneyCount {
    public static void main(String... args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();

        env.readTextFile(args[0])
            .map( w -> new Tuple2<>(w, 1) )
            .groupBy(0)
            .sum(1)
            .print();
    }
}

実行

Flink 1.2.0 では上記のように map 等の引数へラムダ式を使った場合、通常の JDKコンパイルすると実行時にエラーが発生してしまいます。

(a) JDKコンパイルして実行

  • javac 1.8.0_121

以下の Gradle ビルド定義を使って実行してみます。

build.gradle
apply plugin: 'application'

mainClassName = 'MoneyCount'

repositories {
    jcenter()
}

dependencies {
    compile 'org.apache.flink:flink-java:1.2.0'
    runtime 'org.apache.flink:flink-clients_2.11:1.2.0'
}

run {
    if (project.hasProperty('args')) {
        args project.args
    }
}

実行すると以下のようにエラーとなりました。

Tuple2 の型引数が失われている点が問題となっており、Eclipse JDT compiler でコンパイルしなければならないようです。

実行結果 ※
> gradle run -q -Pargs=input_sample.txt

Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: 
  The return type of function 'main(MoneyCount.java:10)' could not be determined automatically, due to type erasure. 
  You can give type information hints by using the returns(...) method on the result of the transformation call, 
  or by letting your function implement the 'ResultTypeQueryable' interface.
        at org.apache.flink.api.java.DataSet.getType(DataSet.java:174)
        at org.apache.flink.api.java.DataSet.groupBy(DataSet.java:692)
        at MoneyCount.main(MoneyCount.java:11)
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Tuple2' are missing.
It seems that your compiler has not stored them into the .class file.
Currently, only the Eclipse JDT compiler preserves the type information necessary to use the lambdas feature type-safely.
See the documentation for more information about how to compile jobs containing lambda expressions.
        at org.apache.flink.api.java.typeutils.TypeExtractor.validateLambdaGenericParameter(TypeExtractor.java:1528)
        ・・・

※ 出力結果には改行を適当に加えています

(b) Eclipse JDT Compiler でコンパイル(-genericsignature オプション付)して実行

  • ecj 4.6.1

次に Eclipse JDT Compiler でコンパイルし実行してみます。

Eclipse JDT Compiler は java -jar ecj.jar ・・・ で実行できるので、Gradle で実施する場合は compileJavaforkOptions を使って設定します。

今回のケースでは、Eclipse JDT Compiler を -genericsignature オプション付きで実行する点が重要です。(付けない場合は JDK と同じ結果になります)

build-ecj.gradle
apply plugin: 'application'

mainClassName = 'MoneyCount'

configurations {
    ecj
}

repositories {
    jcenter()
}

dependencies {
    compile 'org.apache.flink:flink-java:1.2.0'
    runtime 'org.apache.flink:flink-clients_2.11:1.2.0'

    ecj 'org.eclipse.jdt.core.compiler:ecj:4.6.1'
    // 以下でも可
    //ecj 'org.eclipse.scout.sdk.deps:ecj:4.6.2'
}

// Eclipse JDT Compiler の設定
compileJava {
    options.fork = true
    // -genericsignature オプションの指定
    options.compilerArgs << '-genericsignature'

    // java -jar ecj.jar を実行するための設定
    options.forkOptions.with {
        executable = 'java'
        jvmArgs = ['-jar', configurations.ecj.asPath]
    }
}

run {
    if (project.hasProperty('args')) {
        args project.args
    }
}

実行結果は以下の通り、正常に実行できるようになりました。

実行結果
> gradle run -q -b build-ecj.gradle -Pargs=input_sample.txt

Connected to JobManager at Actor[akka://flink/user/jobmanager_1#1055289406]
03/13/2017 20:40:46     Job execution switched to status RUNNING.
・・・
03/13/2017 20:40:47     Job execution switched to status FINISHED.
(10000,2)
(10,2)
(100,2)
(50,1)
(500,1)
(1,2)
(1000,3)
(2000,1)
(5,3)

input_sample.txt の内容は以下の通りです。

input_sample.txt
100
1
5
50
500
1000
10000
1000
1
10
5
5
10
100
1000
10000
2000

コンパイル結果の比較

最後に、コンパイル結果(.class ファイル) を CFR で処理(以下のコマンドを適用)して違いを確認しておきます。 (--decodelambdas false オプションでラムダ式の部分をメソッドとして残すようにしています)

java -jar cfr_0_120.jar MoneyCount.class --decodelambdas false

まずは JDK(javac 1.8.0_121)のコンパイル結果を確認してみます。

(a) JDKコンパイルした場合(CFR の処理結果)
・・・
public class MoneyCount {
    public static /* varargs */ void main(String ... args) throws Exception {
        ・・・
    }

    private static /* synthetic */ Tuple2 lambda$main$95f17bfa$1(String w) throws Exception {
        return new Tuple2((Object)w, (Object)1);
    }
}

lambda$main$95f17bfa$1 の戻り値の型が Tuple2 となっており、型引数が失われています。(これが実行時のエラー原因)

次に Eclipse JDT Compiler のコンパイル結果を確認してみます。

(b) Eclipse JDT Compiler でコンパイル(-genericsignature オプション付)した場合(CFR の処理結果)
・・・
public class MoneyCount {
    public static /* varargs */ void main(String ... args) throws Exception {
        ・・・
    }

    private static /* synthetic */ Tuple2<String, Integer> lambda$0(String w) throws Exception {
        return new Tuple2((Object)w, (Object)1);
    }
}

lambda$0 の戻り値の型が Tuple2<String, Integer> となっており、型引数が失われずに残っています。(これが実行に成功した理由)

なお、-genericsignature オプションを付けずにコンパイルすると JDK と同様に型引数が失われます。

reveno でイベントソーシング

sourcerer でイベントソーシング」 等と同様の処理を reveno で実装してみました。

今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170306/

はじめに

使用する Gradle ビルド定義ファイルは以下の通りです。

build.gradle
apply plugin: 'application'

mainClassName = 'SampleApp'

repositories {
    jcenter()
}

dependencies {
    compileOnly "org.projectlombok:lombok:1.16.12"
    compile 'org.reveno:reveno-core:1.23'
}

lombok は必須ではありません。

(a) transaction 版

reveno では transaction メソッドを使う方法と transactionAction メソッドを使う方法が用意されているようなので、まずは transaction メソッドを使ってみます。

イベントクラスの作成

各種イベント用のクラスを作成します。

reveno はこれまでに試したフレームワークとは異なり、イベントからエンティティの状態を復元したりはしないのでイベントクラスは必須ではありません。(EventBus へ publishEvent しないのであれば不要)

そのため、reveno の場合はイベントソーシングではなくコマンドソーシングと呼べるのかもしれません。

在庫作成イベント src/main/java/sample/events/InventoryItemCreated.java
package sample.events;

import lombok.Value;

@Value
public class InventoryItemCreated {
    private long id;
}
在庫名の変更イベント src/main/java/sample/events/InventoryItemRenamed.java
package sample.events;

import lombok.Value;

@Value
public class InventoryItemRenamed {
    private long id;
    private String newName;
}
在庫数の変更イベント src/main/java/sample/events/ItemsCheckedInToInventory.java
package sample.events;

import lombok.Value;

@Value
public class ItemsCheckedInToInventory {
    private long id;
    private int count;
}

モデルクラスの作成

エンティティクラスとビュークラスを作成します。

エンティティクラスは状態を保存するため、ビュークラスはクエリーの結果としてエンティティクラスから変換して返す事になります。

エンティティクラス src/main/java/sample/model/InventoryItem.java
package sample.model;

import lombok.Value;

@Value
public class InventoryItem {
    private String name;
    private int count;
}
ビュークラス src/main/java/sample/model/InventoryItemView.java
package sample.model;

import lombok.Value;

@Value
public class InventoryItemView {
    private long id;
    private String name;
    private int count;
}

実行クラスの作成

今回はトランザクションやイベントのハンドリング処理等をこのクラスへ実装する事にしました。

transaction メソッドを使用する場合、文字列でトランザクションのアクションを定義し、アクションの実行時に Map でパラメータを渡せばよさそうです。

永続化したデータは Engine のコンストラクタ引数で指定したディレクトリ内のファイルへ保存されるようになっており、storeremap した内容は tx-xxx ファイルへ、publishEvent した内容は evn-xxx ファイルへ保存されるようです。※

 ※ publishEvent を実行しなかった場合、
    evn-xxx ファイルの内容は空になりました

QueryManager を使ってデータを取得する場合、エンティティを直接取得する事はできないので、viewMapper メソッドを使ってエンティティクラスからビュークラスへのマッピングを設定します。

イベントのハンドリングは events メソッドで取得した EventsManager に対して実施します。

今回は、executeSync のような同期用メソッドのみを使っていますが、非同期用のメソッドも用意されています。

実行クラス src/main/java/SampleApp.java
import lombok.val;

import org.reveno.atp.core.Engine;
import org.reveno.atp.utils.MapUtils;

import sample.model.InventoryItem;
import sample.model.InventoryItemView;
import sample.events.InventoryItemCreated;
import sample.events.InventoryItemRenamed;
import sample.events.ItemsCheckedInToInventory;

public class SampleApp {
    public static void main(String... args) {
        val reveno = new Engine("db");

        // 各種ハンドラやマッピング等の設定を実施
        setUp(reveno);

        reveno.startup();

        // 在庫の作成
        long id = reveno.executeSync("createInventoryItem",
                MapUtils.map("name", "sample1"));

        System.out.println("id: " + id);

        // 在庫数の更新
        reveno.executeSync("checkInItemsToInventory", MapUtils.map("id", id, "count", 5));
        // 在庫数の更新
        reveno.executeSync("checkInItemsToInventory", MapUtils.map("id", id, "count", 3));

        // 検索
        val res = reveno.query().find(InventoryItemView.class, id);

        System.out.println("result: " + res);

        reveno.shutdown();
    }

    private static void setUp(Engine reveno) {
        // エンティティクラスをビュークラスへ変換する設定
        reveno.domain().viewMapper(
            InventoryItem.class,
            InventoryItemView.class,
            (id, e, r) -> new InventoryItemView(id, e.getName(), e.getCount())
        );

        // 在庫の作成処理
        reveno.domain().transaction("createInventoryItem", (t, ctx) -> {
            long id = t.id();
            String name = t.arg("name");
            // エンティティ(状態)の保存
            ctx.repo().store(id, new InventoryItem(name, 0));

            // イベントの発行
            ctx.eventBus().publishEvent(new InventoryItemCreated(id));
            ctx.eventBus().publishEvent(new InventoryItemRenamed(id, name));

        }).uniqueIdFor(InventoryItem.class).command();

        // 在庫数の更新処理
        reveno.domain().transaction("checkInItemsToInventory", (t, ctx) -> {
            long id = t.longArg("id");
            int count = t.intArg("count");
            // エンティティ(状態)の更新
            ctx.repo().remap(id, InventoryItem.class, (rid, state) ->
                    new InventoryItem(state.getName(), state.getCount() + count));
            // イベントの発行
            ctx.eventBus().publishEvent(new ItemsCheckedInToInventory(id, count));
        }).command();

        // InventoryItemCreated イベントのハンドリング設定
        reveno.events().eventHandler(InventoryItemCreated.class, (event, meta) ->
                System.out.println("*** create event: " + event +
                        ", transactionTime: " + meta.getTransactionTime() +
                        ", isRestore: " + meta.isRestore()));
    }
}

実行

gradle run で実行した結果です。

実行結果
> gradle run

・・・
id: 1
result: InventoryItemView(id=1, name=sample1, count=8)
・・・
*** create event: InventoryItemCreated(id=1), transactionTime: 1488718421288, isRestore: false
・・・

(b) transactionAction 版

イベントクラス等は同じものを使用して transactionAction を使った処理を作成してみます。

コマンドクラスの作成

transactionAction ではコマンドクラスを使う事になるので作成します。

id の値はインスタンス化の時点では決定せず、コマンドハンドラ内で設定することになるので、@Wither を使って id の値のみ変更したコピーを返すメソッド (withId) を用意するようにしています。

在庫作成コマンド src/main/java/sample/commands/CreateInventoryItem.java
package sample.commands;

import lombok.Value;
import lombok.experimental.NonFinal;
import lombok.experimental.Wither;

@Value
public class CreateInventoryItem {
    @Wither @NonFinal private long id;
    private String name;
}
在庫数の更新コマンド src/main/java/sample/commands/CheckInItemsToInventory.java
package sample.commands;

import lombok.Value;

@Value
public class CheckInItemsToInventory {
    private long id;
    private int count;
}

実行クラスの作成

command メソッドでコマンドハンドラを設定し、コマンド毎のトランザクションアクションを transactionAction で設定します。

コマンドハンドラ内で executeTxAction へコマンドを渡せば該当するトランザクションアクションが実行されます。

実行クラス src/main/java/SampleApp.java
import lombok.val;

import org.reveno.atp.core.Engine;

import sample.commands.CheckInItemsToInventory;
import sample.commands.CreateInventoryItem;
import sample.model.InventoryItem;
import sample.model.InventoryItemView;
import sample.events.InventoryItemCreated;
import sample.events.InventoryItemRenamed;
import sample.events.ItemsCheckedInToInventory;

public class SampleApp {
    public static void main(String... args) {
        val reveno = new Engine("db");

        setUp(reveno);

        reveno.startup();

        // 在庫の作成
        long id = reveno.executeSync(new CreateInventoryItem(0, "sample1"));

        System.out.println("id: " + id);

        // 在庫数の更新
        reveno.executeSync(new CheckInItemsToInventory(id, 5));
        // 在庫数の更新
        reveno.executeSync(new CheckInItemsToInventory(id, 3));

        // 検索
        val res = reveno.query().find(InventoryItemView.class, id);

        System.out.println("result: " + res);

        reveno.shutdown();
    }

    private static void setUp(Engine reveno) {
        ・・・
        // 在庫作成コマンドのハンドリング設定
        reveno.domain().command(CreateInventoryItem.class, Long.class, (cmd, ctx) -> {
            long id = ctx.id(InventoryItem.class);
            // id を更新してトランザクションアクションを実行
            ctx.executeTxAction(cmd.withId(id));
            return id;
        });

        // 在庫数の更新コマンドのハンドリング設定
        reveno.domain().command(CheckInItemsToInventory.class, (cmd, ctx) -> ctx.executeTxAction(cmd));

        reveno.domain().transactionAction(CreateInventoryItem.class, (act, ctx) -> {
            // エンティティ(状態)の保存
            ctx.repo().store(act.getId(), new InventoryItem(act.getName(), 0));

            // イベントの発行
            ctx.eventBus().publishEvent(new InventoryItemCreated(act.getId()));
            ctx.eventBus().publishEvent(new InventoryItemRenamed(act.getId(), act.getName()));
        });

        reveno.domain().transactionAction(CheckInItemsToInventory.class, (act, ctx) -> {
            // エンティティ(状態)の更新
            ctx.repo().remap(act.getId(), InventoryItem.class, (id, state) ->
                    new InventoryItem(state.getName(), state.getCount() + act.getCount()));

            // イベントの発行
            ctx.eventBus().publishEvent(new ItemsCheckedInToInventory(act.getId(), act.getCount()));
        });

        ・・・
    }
}

実行

gradle run で実行した結果です。

実行結果
> gradle run

・・・
id: 1
*** create event: InventoryItemCreated(id=1), transactionTime: 1488721568341, isRestore: false
result: InventoryItemView(id=1, name=sample1, count=8)
・・・

Akka の FileIO でファイルを読み書き

Akka (akka-stream) の FileIO を使ってファイルの読み書きを行ってみます。

今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170131/

はじめに

akka-stream では Java 用の APIakka.stream.javadsl パッケージに、Scala 用の APIakka.stream.scaladsl パッケージに定義されています。

主要なクラスやメソッドは javadsl と scaladsl で概ね共通化されているようですが、Sourcerecover メソッドは deprecated の有無が違っていました。

Source の recover/recoverWith メソッドの deprecated 状況
パッケージ deprecated 有り deprecated 無し 備考
akka.stream.scaladsl recoverWith recover ソースは akka/stream/scaladsl/Flow.scala
akka.stream.javadsl recover recoverWith ソースは akka/stream/javadsl/Source.scala

ここで、deprecated の理由が @deprecated("Use recoverWithRetries instead.", "2.4.4") のようなので、scaladsl の方が正しく、javadsl の方は deprecated するメソッドが違っているのだと思います ※。

 ※ recoverWith の代わりに recoverWithRetries を使えるが
    (引数が 1つ増えただけなので)
    recover は引数の型がそもそも違う

Scala の場合

まずは Scala で実装してみます。

ファイル用の Sink は FileIO.toPath で、Source は FileIO.fromPath で取得できます。

ファイルの入出力の型は akka.util.ByteString となっており、以下では map を使って String との変換を行っています。

Source の recover メソッドを使えば、例外が発生していた場合に代用の値へ差し替えて処理を繋げられます。

今回は、ファイルが存在しない等で IOException が発生した際に、"invalid file, ・・・" という文字列へ差し替えるために recover を使っています。

src/main/scala/SampleApp.scala
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{FileIO, Source}
import akka.util.ByteString

import java.nio.file.Paths
import java.io.IOException

object SampleApp extends App {
  implicit val system = ActorSystem()
  import system.dispatcher  // ExecutionContext を implicit
  implicit val materializer = ActorMaterializer()

  // ファイルへの書き込み(sample1.txt へ "sample data" を出力)
  val res1 = Source.single("sample data")
                   .map(ByteString.fromString)
                   .runWith(FileIO.toPath(Paths.get("sample1.txt")))

  // ファイルの読み込み(sample2.txt の内容を println)
  val res2 = FileIO.fromPath(Paths.get("sample2.txt"))
                   .map(_.utf8String)
                   .recover {
                     case e: IOException => s"invalid file, ${e}"
                   }
                   .runForeach(println)

  res1.flatMap(_ => res2).onComplete(_ => system.terminate)
}

ビルドと実行

Gradle のビルド定義ファイルは以下の通りです。

build.gradle
apply plugin: 'scala'
apply plugin: 'application'

mainClassName = 'SampleApp'

repositories {
    jcenter()
}

dependencies {
    compile 'org.scala-lang:scala-library:2.12.1'
    compile 'com.typesafe.akka:akka-stream_2.12:2.5-M1'
}

sample2.txt ファイルの無い状態で実行します。

実行結果1
> gradle -q run

invalid file, java.nio.file.NoSuchFileException: sample2.txt

sample2.txt を作成して実行します。

実行結果2
> echo %time% > sample2.txt

> gradle -q run

22:11:48.03

Java の場合

次に Java で実装してみます。

Scala と概ね同じですが、akka.stream.javadsl.Source でも recover の引数が scala.PartialFunction となっており、多少の工夫が必要です。

recover の引数に合う scala.PartialFunction は Akka の akka.japi.pf.PFBuilder で作れるので、以下では Match.match から PFBuilder を取得して使っています。

なお、Akka では scala.PartialFunction を組み立てるための Java 用の APIakka.japi.pf パッケージにいくつか用意されています。

src/main/java/SampleApp.java
import akka.actor.ActorSystem;

import akka.japi.pf.Match;
import akka.japi.pf.PFBuilder;

import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.stream.javadsl.FileIO;
import akka.stream.javadsl.Source;

import akka.util.ByteString;

import java.io.IOException;
import java.nio.file.Paths;
import java.util.concurrent.CompletableFuture;

public class SampleApp {
    public static void main(String... args) {
        final ActorSystem system = ActorSystem.create();
        final Materializer materializer = ActorMaterializer.create(system);

        // ファイルへの書き込み(sample1.txt へ "sample data" を出力)
        CompletableFuture<?> res1 = Source.single("sample data")
                .map(ByteString::fromString)
                .runWith(FileIO.toPath(Paths.get("sample1.txt")), materializer)
                .toCompletableFuture();

        // scala.PartialFunction のビルダー定義
        PFBuilder<Throwable, String> pfunc = 
            Match.match(IOException.class, e -> "invalid file, " + e);

        // ファイルの読み込み(sample2.txt の内容を println)
        CompletableFuture<?> res2 = FileIO.fromPath(Paths.get("sample2.txt"))
                .map(ByteString::utf8String)
                .recover(pfunc.build())
                .runForeach(System.out::println, materializer)
                .toCompletableFuture();

        CompletableFuture.allOf(res1, res2)
                .handle((v, e) -> system.terminate())
                .join();
    }
}

ビルドと実行

Gradle のビルド定義ファイルは以下の通りです。

build.gradle
apply plugin: 'application'

mainClassName = 'SampleApp'

repositories {
    jcenter()
}

dependencies {
    compile 'com.typesafe.akka:akka-stream_2.12:2.5-M1'
}

sample2.txt ファイルの無い状態で実行します。

実行結果1
> gradle -q run

・・・\src\main\java\SampleApp.javaは非推奨のAPIを使用またはオーバーライドしています。
・・・
invalid file, java.nio.file.NoSuchFileException: sample2.txt

recover が deprecated されている件で警告メッセージが出力されますが、上述したように recover を deprecated しているのが誤りだと思われるので無視します。

sample2.txt を作成して実行します。

実行結果2
> echo %time% > sample2.txt

> gradle -q run

・・・
22:14:16.00

Sourcerer でイベントソーシング

Axon Framework でイベントソーシング」 や 「es4j でイベントソーシング」 と同様の処理を Sourcerer で実装してみました。

今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170110/

はじめに

以下のような Gradle ビルド定義ファイルを使います。

Sourcerer はイベントの保存先として Event Store を使用するため、今回は Event Store クライアントライブラリに esjc を使う sourcerer-esjc モジュールを使用します。

build.gradle
apply plugin: 'application'

mainClassName = 'SampleApp'

repositories {
    jcenter()
}

dependencies {
    compileOnly 'org.projectlombok:lombok:1.16.12'

    compile 'org.elder.sourcerer:sourcerer-esjc:v4.0.1'
    compile 'com.fasterxml.jackson.core:jackson-databind:2.8.5'

    compile 'io.javaslang:javaslang:2.1.0-alpha'
}

lombokjavaslang は必須ではありません。(javaslang はパターンマッチ処理に使いました)

Sourcerer によるイベントソーシング

コマンドの作成

コマンドは特に注意するところはなく、普通の Java クラスとして実装するだけです。

ここで定義したコマンドは、CommandFactory で作成した Command の引数 (Arguments) として使う事になります。

在庫作成コマンド src/main/java/sample/commands/CreateInventoryItem.java
package sample.commands;

import lombok.Value;

// 在庫作成コマンド
@Value
public class CreateInventoryItem {
    private String id;
    private String name;
}
在庫数追加コマンド src/main/java/sample/commands/CheckInItemsToInventory.java
package sample.commands;

import lombok.Value;

// 在庫数追加コマンド
@Value
public class CheckInItemsToInventory {
    private int count;
}

イベントの作成

イベントの型へ @EventType を付与します。

AggregateProjection 等で指定するイベントの型は 1つなので、まずはベースとなるインターフェースを定義します。

イベントの内容は JSON 化して Event Store へ保存する事になりますが、デフォルトでは JSON へ型情報が付与されず、複数のイベント型を使うと JSON からイベントオブジェクトを復元する際に不都合が生じます。

これを回避するために @JsonTypeInfo@JsonSubTypes を使って JSON へ型情報を残すように設定しています ※。

 ※ @JsonTypeInfo の use へ NAME を指定した場合は
    "@type":"クラス名" の情報を JSON へ出力します
イベントのベースインターフェース src/main/java/sample/events/InventoryEvent.java
package sample.events;

import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.elder.sourcerer.EventType;

@EventType
// JSON へ型情報を残すための設定
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME)
@JsonSubTypes({
    @Type(InventoryItemCreated.class),
    @Type(InventoryItemRenamed.class),
    @Type(ItemsCheckedInToInventory.class)
})
public interface InventoryEvent {
}
在庫作成イベント src/main/java/sample/events/InventoryItemCreated.java
package sample.events;

import lombok.Value;

// 在庫作成イベント
@Value
public class InventoryItemCreated implements InventoryEvent {
    private String id;
}
在庫名の変更イベント src/main/java/sample/events/InventoryItemRenamed.java
package sample.events;

import lombok.Value;

// 在庫名の変更イベント
@Value
public class InventoryItemRenamed implements InventoryEvent {
    private String name;
}
在庫数の追加イベント src/main/java/sample/events/ItemsCheckedInToInventory.java
package sample.events;

import lombok.Value;

// 在庫数の追加イベント
@Value
public class ItemsCheckedInToInventory implements InventoryEvent {
    private int count;
}

コマンドハンドラの作成

コマンドを処理してイベントを生成するメソッドを定義します。

引数や戻り値の型は CommandFactoryfromOperation メソッドへ渡す値 (Operations の各種メソッドで生成) に応じて調整します。

単一のコマンドから複数のイベントを生成する場合は戻り値を List<イベント型> とするだけのようです。

src/main/java/sample/InventoryOperations.java
package sample;

import com.google.common.collect.ImmutableList;
import sample.commands.CheckInItemsToInventory;
import sample.commands.CreateInventoryItem;
import sample.events.InventoryEvent;
import sample.events.InventoryItemCreated;
import sample.events.InventoryItemRenamed;
import sample.events.ItemsCheckedInToInventory;

import java.util.List;

public class InventoryOperations {
    // 作成コマンドの処理
    public static List<InventoryEvent> create(CreateInventoryItem cmd) {
        return ImmutableList.of(
            new InventoryItemCreated(cmd.getId()),
            new InventoryItemRenamed(cmd.getName())
        );
    }
    // 在庫数追加コマンドの処理
    public static ItemsCheckedInToInventory checkIn(InventoryItem state, CheckInItemsToInventory cmd) {
        return new ItemsCheckedInToInventory(cmd.getCount());
    }
}

AggregateProjection の作成

AggregateProjection<状態の型, イベントの型> を実装して、イベントから状態を復元する処理を実装します。

今回は javaslang モジュールの MatchCase を使ってパターンマッチで処理しています。

src/main/java/sample/InventoryProjection.java
package sample;

import org.elder.sourcerer.AggregateProjection;
import org.jetbrains.annotations.NotNull;
import sample.events.InventoryItemCreated;
import sample.events.InventoryEvent;
import sample.events.InventoryItemRenamed;
import sample.events.ItemsCheckedInToInventory;

import static javaslang.API.*;
import static javaslang.Predicates.*;

public class InventoryProjection implements AggregateProjection<InventoryItem, InventoryEvent>{

    @NotNull
    @Override
    public InventoryItem empty() {
        return new InventoryItem("", "", 0);
    }

    @NotNull
    @Override
    public InventoryItem apply(@NotNull String id,
                               @NotNull InventoryItem state, 
                               @NotNull InventoryEvent event) {

        return Match(event).of(
            // InventoryItemCreated イベントの場合
            Case(instanceOf(InventoryItemCreated.class), ev -> 
                    state.withId(ev.getId())),
            // InventoryItemRenamed イベントの場合
            Case(instanceOf(InventoryItemRenamed.class), ev -> 
                    state.withName(ev.getName())),
            // ItemsCheckedInToInventory イベントの場合
            Case(instanceOf(ItemsCheckedInToInventory.class), ev ->
                    state.withCount(state.getCount() + ev.getCount())),
            // その他
            Case($(), state)
        );
    }
}

在庫の状態 InventoryItem は以下のように実装しました。

src/main/java/sample/InventoryItem.java
package sample;

import lombok.Value;

@Value
public class InventoryItem {
    private String id;
    private String name;
    private int count;

    public InventoryItem withId(String newId) {
        return new InventoryItem(newId, name, count);
    }

    public InventoryItem withName(String newName) {
        return new InventoryItem(id, newName, count);
    }

    public InventoryItem withCount(int newCount) {
        return new InventoryItem(id, name, newCount);
    }
}

実行クラスの作成

sourcerer-esjc-spring モジュールを使う方が楽だと思われますが、今回は AggregateRepositoryCommandFactory の組み立て処理を自前で実装してみました。

  • (1) EventStore クライアント作成
  • (2) EventRepositoryFactory 作成
  • (3) EventRepository 作成
  • (4) AggregateRepository 作成
  • (5) CommandFactory 作成

CommandFactory へコマンドハンドラのメソッドをそれぞれ指定して Command を作成します。(コマンドハンドラメソッドのシグネチャに対応する Operations のメソッドを使います)

Command へ aggregateId を設定し arguments へコマンドの内容を設定した後、run を実行するとコマンドが適用されます。

src/main/java/SampleApp.java
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.msemys.esjc.EventStoreBuilder;

import lombok.val;

import org.elder.sourcerer.*;
import org.elder.sourcerer.esjc.EventStoreEsjcEventRepositoryFactory;

import sample.*;
import sample.commands.CheckInItemsToInventory;
import sample.commands.CreateInventoryItem;
import sample.events.InventoryEvent;

import java.util.UUID;

public class SampleApp {
    public static void main(String... args) throws JsonProcessingException {
        // (1) EventStore クライアント作成
        val eventStore = EventStoreBuilder.newBuilder()
                // ポート番号に 1113 を使う点に注意 (2113 ではない)
                .singleNodeAddress("localhost", 1113) 
                .build();

        val mapper = new ObjectMapper();
        // (2) EventRepositoryFactory 作成
        val eventRepositoryFactory = new EventStoreEsjcEventRepositoryFactory(
                eventStore, 
                mapper, 
                "sample"
        );
        // (3) EventRepository 作成
        val eventRepository = eventRepositoryFactory.getEventRepository(InventoryEvent.class);
        // (4) AggregateRepository 作成
        val aggregateRepository = new DefaultAggregateRepository<>(
                eventRepository, 
                new InventoryProjection()
        );
        // (5) CommandFactory 作成
        val commandFactory = new DefaultCommandFactory<>(aggregateRepository);

        // 在庫作成処理
        val createCommand = commandFactory.fromOperation(
                Operations.constructorOf(InventoryOperations::create));
        // 在庫数の追加処理
        val checkInCommand = commandFactory.fromOperation(
                Operations.updateOf(InventoryOperations::checkIn));

        val id = UUID.randomUUID().toString();
        // 在庫作成の実行
        val r1 = createCommand.setAggregateId(id)
                .setArguments(new CreateInventoryItem(id, "sample"))
                .run();

        printCommandResult(r1);
        // 在庫数の追加の実行(1回目)
        val r2 = checkInCommand.setAggregateId(id)
                .setArguments(new CheckInItemsToInventory(5))
                .run();

        printCommandResult(r2);

        // 在庫数の追加の実行(2回目)
        val r3 = checkInCommand.setAggregateId(id)
                .setArguments(new CheckInItemsToInventory(3))
                .run();

        printCommandResult(r3);

        // 集約の内容を取得
        val aggregate = aggregateRepository.load(id);
        // 状態を出力
        System.out.println("*** result state: " + aggregate.state());

        eventStore.disconnect();
    }

    private static void printCommandResult(CommandResult<? extends InventoryEvent> result) {
        System.out.printf(
            "*** command result: events=%s, newVersion=%s\n", 
            result.getEvents(), 
            result.getNewVersion()
        );
    }
}

実行

https://geteventstore.com/ から Event Store の実行環境をダウンロードして、起動しておきます。

Event Store 起動
> EventStore.ClusterNode.exe --db ./db --log ./logs

・・・
[08912,14,15:31:00.488] Starting Normal TCP listening on TCP endpoint: 127.0.0.1:1113.
・・・
[08912,09,15:31:07.100] Created stats stream '$stats-127.0.0.1:2113', code = Success
サンプルアプリケーションの実行
> gradle -q run

・・・
*** command result: events=[InventoryItemCreated(id=4184fd14-7725-4d09-8e24-59e9a94859a1), InventoryItemRenamed(name=sample)], newVersion=1
・・・
*** command result: events=[ItemsCheckedInToInventory(count=5)], newVersion=2
・・・
*** command result: events=[ItemsCheckedInToInventory(count=3)], newVersion=3
・・・
*** result state: InventoryItem(id=4184fd14-7725-4d09-8e24-59e9a94859a1, name=sample, count=8)
・・・

Event Store への登録内容を確認するため、http://127.0.0.1:2113/ へアクセスして 「Stream Browser」 で sample:・・・・ の内容を見てみました。

f:id:fits:20170110235412p:plain

備考

今回のサンプルも処理が終わった後(disconnect 実行後)にプロセスが終了しません。

原因を詳しく調べていませんが、Event Store クライアントライブラリの esjc 1.6.0 を単体で試した際に、以下が原因でプロセスが終了しないようだったので、esjc が原因かもしれません。

  • executor 未指定の際に Settings 内で作る ThreadPoolExecutor の shutdown を実行しない
  • EventStoreTcp 内の NioEventLoopGroup の shutdownGracefully を実行しない

es4j でイベントソーシング

Axon Framework でイベントソーシング」 と同様の処理を es4j (Eventsourcing for Java) で実装してみました。

今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170107/

はじめに

今回はイベントをインメモリで保持する eventsourcing-inmem を使うため、以下のような Gradle 用ビルド定義を用意しました。 (lombok は便利なので使います)

build.gradle
apply plugin: 'application'

mainClassName = 'SampleApp'

repositories {
    jcenter()
}

dependencies {
    compileOnly "org.projectlombok:lombok:1.16.12"

    compile 'com.eventsourcing:eventsourcing-inmem:0.4.5'
}

es4j によるイベントソーシング

コマンドの作成

コマンドは StandardCommand<S, R> を継承して作成し、S に状態の型を R に結果の型を指定します。

コマンドを適用することで生じるイベントは events メソッドの戻り値で返し、コマンド適用結果は result メソッドの戻り値で返します。

状態は events メソッドで用意し、result メソッドの引数として渡ってきます。

まずは、在庫作成のコマンドを実装してみます。

状態の型に InventoryItemCreated を、結果の型に InventoryItem を指定しています。

状態を伴う EventStream を返すために EventStream.ofWithState を使用します。

コンストラクタの引数名からプロパティを認識するようなので、-parameters オプションを使ってビルドするか、引数へ @PropertyName を付与します。(今回は @PropertyName を使用)

なお、状態の型は InventoryItemCreated としなければならないわけでもなく、UUID を代わりに使っても動作に支障はなさそうでした。

在庫作成コマンド src/main/java/sample/commands/CreateInventoryItem.java
package sample.commands;

import com.eventsourcing.EventStream;
import com.eventsourcing.Repository;
import com.eventsourcing.StandardCommand;
import com.eventsourcing.layout.PropertyName;

import lombok.EqualsAndHashCode;
import lombok.Value;
import lombok.val;

import sample.domain.InventoryItem;
import sample.events.InventoryItemCreated;
import sample.events.InventoryItemRenamed;

// 在庫作成コマンド
@Value
@EqualsAndHashCode(callSuper = false)
public class CreateInventoryItem extends StandardCommand<InventoryItemCreated, InventoryItem> {
    private String name;

    public CreateInventoryItem(@PropertyName("name") String name) {
        this.name = name;
    }
    // イベントの生成
    @Override
    public EventStream<InventoryItemCreated> events() throws Exception {
        // 在庫作成イベント
        val created = new InventoryItemCreated();
        // 名前変更イベント
        val renamed = new InventoryItemRenamed(created.uuid(), name);

        return EventStream.ofWithState(created, created, renamed);
    }
    // 結果の生成
    @Override
    public InventoryItem result(InventoryItemCreated state, Repository repository) {
        return InventoryItem.lookup(repository, state.uuid()).get();
    }
}

次に、在庫数を加えるためのコマンドです。

ドメインモデルの内部状態を更新するだけなので、状態と結果の型を Void としました。

状態を伴わないので EventStream.of で EventStream を返します。 コマンドの適用結果が Void なので result メソッドをオーバーライドする必要もありません。

在庫数追加コマンド src/main/java/sample/commands/CheckInItemsToInventory.java
package sample.commands;

import com.eventsourcing.EventStream;
import com.eventsourcing.StandardCommand;
import com.eventsourcing.layout.PropertyName;

import lombok.EqualsAndHashCode;
import lombok.Value;

import java.util.UUID;

import sample.events.ItemsCheckedInToInventory;

// 在庫数追加コマンド
@Value
@EqualsAndHashCode(callSuper = false)
public class CheckInItemsToInventory extends StandardCommand<Void, Void> {
    private UUID reference;
    private int count;

    public CheckInItemsToInventory(
            @PropertyName("reference") UUID reference,
            @PropertyName("count") int count) {

        this.reference = reference;
        this.count = count;
    }

    @Override
    public EventStream<Void> events() throws Exception {
        return EventStream.of(new ItemsCheckedInToInventory(reference, count));
    }
}

イベントの作成

イベントは StandardEvent を継承して作成します。

イベントを検索するための条件を SimpleIndex を使って定義します。

以下では、uuid の値で InventoryItemCreated を取得できるように、SimpleIndex<InventoryItemCreated, UUID> を定義しています。

在庫作成イベント src/main/java/sample/events/InventoryItemCreated.java
package sample.events;

import com.eventsourcing.StandardEvent;
import com.eventsourcing.index.SimpleIndex;

import java.util.UUID;

// 在庫作成イベント
public class InventoryItemCreated extends StandardEvent {
    public static SimpleIndex<InventoryItemCreated, UUID> ID = SimpleIndex.as(InventoryItemCreated::uuid);
}

在庫名の更新イベントの場合、最新のものを取得すれば最終的な名称が決定するので、timestamp で比較する SimpleIndex<InventoryItemRenamed, HybridTimestamp> を定義しています。

なお、名前変更のためのイベントは eventsourcing-cep モジュールに予め用意されていますが(NameChanged)、今回は自前で実装しています。

在庫名の変更イベント src/main/java/sample/events/InventoryItemRenamed.java
package sample.events;

import com.eventsourcing.StandardEvent;
import com.eventsourcing.hlc.HybridTimestamp;
import com.eventsourcing.index.Index;
import com.eventsourcing.index.SimpleIndex;
import com.eventsourcing.layout.PropertyName;

import lombok.EqualsAndHashCode;
import lombok.Value;

import java.util.UUID;

import static com.eventsourcing.index.IndexEngine.IndexFeature.*;

// 在庫名の変更イベント
@Value
@EqualsAndHashCode(callSuper = false)
public class InventoryItemRenamed extends StandardEvent {
    private UUID reference;
    private String newName;

    public InventoryItemRenamed(@PropertyName("reference") UUID reference,
                                @PropertyName("newName") String newName) {

        this.reference = reference;
        this.newName = newName;
    }

    public static SimpleIndex<InventoryItemRenamed, UUID> ID =
            SimpleIndex.as(InventoryItemRenamed::uuid);

    public static SimpleIndex<InventoryItemRenamed, UUID> REFERENCE_ID =
            SimpleIndex.as(InventoryItemRenamed::getReference);

    @Index({EQ, LT, GT})
    public static SimpleIndex<InventoryItemRenamed, HybridTimestamp> TIMESTAMP =
            SimpleIndex.as(InventoryItemRenamed::timestamp);
}
在庫数の追加イベント src/main/java/sample/events/ItemsCheckedInToInventory.java
package sample.events;

import com.eventsourcing.StandardEvent;
import com.eventsourcing.index.SimpleIndex;
import com.eventsourcing.layout.PropertyName;

import lombok.EqualsAndHashCode;
import lombok.Value;

import java.util.UUID;

// 在庫数の追加イベント
@Value
@EqualsAndHashCode(callSuper = false)
public class ItemsCheckedInToInventory extends StandardEvent {
    private UUID reference;
    private int count;

    public ItemsCheckedInToInventory(
            @PropertyName("reference") UUID reference,
            @PropertyName("count") int count) {

        this.reference = reference;
        this.count = count;
    }

    public static SimpleIndex<ItemsCheckedInToInventory, UUID> ID =
            SimpleIndex.as(ItemsCheckedInToInventory::uuid);

    public static SimpleIndex<ItemsCheckedInToInventory, UUID> REFERENCE_ID =
            SimpleIndex.as(ItemsCheckedInToInventory::getReference);
}

ドメインプロトコルの作成

ドメインプロトコルという機能を使えば、共通的なプロパティ(例えば name など)をドメインモデルから独立して定義できます。(ドメインモデルへ implements して使う)

Protocol を extends したインターフェースとして定義し、プロパティの値を(イベントの内容から)確定する処理をデフォルトメソッドとして定義します。

まずは、在庫名のドメインプロトコルを実装してみます。

在庫名は一番最後の InventoryItemRenamed の newName で確定するため、latestAssociatedEntity を使って最後の InventoryItemRenamed イベントを取得します。

なお、名前に関するドメインプロトコルも eventsourcing-cep モジュールに予め用意されています(NameProtocol)。

在庫名 src/main/java/sample/protocols/InventoryItemNameProtocol.java
package sample.protocols;

import com.eventsourcing.Protocol;
import com.eventsourcing.queries.ModelQueries;

import sample.events.InventoryItemRenamed;

// 在庫名
public interface InventoryItemNameProtocol extends Protocol, ModelQueries {

    default String name() {
        // reference が id の値に等しい最後の InventoryItemRenamed を取得し
        // newName の値を返す
        return latestAssociatedEntity(
            InventoryItemRenamed.class,
            InventoryItemRenamed.REFERENCE_ID,
            InventoryItemRenamed.TIMESTAMP
        ).map(InventoryItemRenamed::getNewName).orElse("");
    }
}

次に、在庫数の場合は、ItemsCheckedInToInventory を集計するように Repositoryquery メソッドで該当する全ての ItemsCheckedInToInventory を取得し、count の合計値を算出しています。

在庫数 src/main/java/sample/protocols/InventoryItemCountProtocol.java
package sample.protocols;

import com.eventsourcing.EntityHandle;
import com.eventsourcing.Protocol;

import lombok.val;

import java.util.stream.StreamSupport;

import sample.events.ItemsCheckedInToInventory;

import static com.eventsourcing.index.EntityQueryFactory.equal;

// 在庫数
public interface InventoryItemCountProtocol extends Protocol {

    default int count() {
        // reference が id の値に等しい ItemsCheckedInToInventory を全て取得
        val res = getRepository().query(
            ItemsCheckedInToInventory.class,
            equal(ItemsCheckedInToInventory.REFERENCE_ID, getId())
        );

        return StreamSupport.stream(res.spliterator(), false)
                .map(EntityHandle::get)
                .mapToInt(ItemsCheckedInToInventory::getCount)
                .sum();
    }
}

ドメインモデルの作成

ドメインモデルは Model と必要なドメインプロトコルを implements して作成します。

同一インスタンスの判定に id の値だけを使うように equals と hashCode をオーバーライドします。(今回は @EqualsAndHashCode(of = "id") で実施)

CreateInventoryItem コマンドの result メソッドで使った lookup メソッドの実装も行います。

src/main/java/sample/domain/InventoryItem.java
package sample.domain;

import com.eventsourcing.Model;
import com.eventsourcing.Repository;
import com.eventsourcing.queries.ModelQueries;

import lombok.EqualsAndHashCode;
import lombok.Value;

import java.util.Optional;
import java.util.UUID;

import sample.events.InventoryItemCreated;
import sample.protocols.InventoryItemCountProtocol;
import sample.protocols.InventoryItemNameProtocol;

@Value
@EqualsAndHashCode(of = "id")
public class InventoryItem implements Model, InventoryItemNameProtocol, InventoryItemCountProtocol {

    private final Repository repository;
    private final UUID id;

    protected InventoryItem(Repository repository, UUID id) {
        this.repository = repository;
        this.id = id;
    }

    public static Optional<InventoryItem> lookup(Repository repository, UUID id) {
        // InventoryItemCreated.ID インデックス(SimpleIndex)を使って
        // id に合致する InventoryItemCreated を取得
        Optional<InventoryItemCreated> res = ModelQueries.lookup(
            repository,
            InventoryItemCreated.class,
            InventoryItemCreated.ID,
            id
        );

        return res.map(ev -> new InventoryItem(repository, id));
    }
}

実行クラスの作成

es4j では Repository を使ってイベントソーシングの処理を行います。

Repository は StandardRepository へ以下のような設定を行う事で構築できます。

  • (1) Journal と IndexEngine の設定
  • (2) コマンドとイベントの設定(CommandSetProvider と EventSetProvider)

StandardRepository のデフォルト設定では localhost の NTP サーバーを使うようになっているようですが (NTP サーバーが無いとエラーになる)、今回はデフォルト設定の代わりに PhysicalTimeProvider 実装クラスを用意しました。

(2) で PackageXXXSetProvider を使うと指定のパッケージへ属するコマンドやイベントを一括で設定できます。

コマンドを適用する際は publish メソッドを使います。(publish の戻り値は CompletableFuture

src/main/java/SampleApp.java
import com.eventsourcing.*;
import com.eventsourcing.hlc.PhysicalTimeProvider;
import com.eventsourcing.index.MemoryIndexEngine;
import com.eventsourcing.inmem.MemoryJournal;
import com.eventsourcing.repository.StandardRepository;
import com.google.common.util.concurrent.AbstractService;

import java.util.concurrent.*;

import lombok.val;

import sample.commands.CheckInItemsToInventory;
import sample.commands.CreateInventoryItem;
import sample.domain.InventoryItem;
import sample.events.InventoryItemCreated;

public class SampleApp {
    public static void main(String... args) {
        // (1) Journal と IndexEngine の設定
        val repository = StandardRepository.builder()
                .journal(new MemoryJournal())
                .indexEngine(new MemoryIndexEngine())
                .physicalTimeProvider(new SampleTimeProvider())
                .build();

        // (2) コマンドとイベントの設定(CommandSetProvider と EventSetProvider)
        repository.addCommandSetProvider(
            new PackageCommandSetProvider(new Package[] {CreateInventoryItem.class.getPackage()}));

        repository.addEventSetProvider(
            new PackageEventSetProvider(new Package[] {InventoryItemCreated.class.getPackage()}));

        // 開始
        repository.startAsync().awaitRunning();

        try {
            // CreateInventoryItem コマンド適用
            val d = repository.publish(new CreateInventoryItem("sample1")).get();

            dumpInventoryItem(d);

            // CheckInItemsToInventory コマンド適用
            repository.publish(new CheckInItemsToInventory(d.getId(), 5)).get();

            dumpInventoryItem(d);

            // CheckInItemsToInventory コマンド適用
            repository.publish(new CheckInItemsToInventory(d.getId(), 3)).get();

            dumpInventoryItem(d);

        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        } finally {
            // 停止
            stopRepository(repository);
        }
    }

    private static InventoryItem dumpInventoryItem(InventoryItem item) {
        System.out.println("----- InventoryItem -----");

        System.out.println("id: " + item.getId());
        System.out.println("name: " + item.name());
        System.out.println("count: " + item.count());

        return item;
    }

    private static void stopRepository(Repository repository) {
        System.out.println("stop...");

        try {
            repository.stopAsync().awaitTerminated(10, TimeUnit.SECONDS);
        } catch (TimeoutException ex) {
            ex.printStackTrace();
        }
    }
    // PhysicalTimeProvider の実装
    static class SampleTimeProvider extends AbstractService implements PhysicalTimeProvider {

        @Override
        public long getPhysicalTime() {
            return System.currentTimeMillis();
        }

        @Override
        protected void doStart() {
            System.out.println("timeprovider start...");
            notifyStarted();
        }

        @Override
        protected void doStop() {
            System.out.println("timeprovider stop...");
            notifyStopped();
        }
    }
}

publish の呼び出し箇所は CompletableFuture で処理を繋げれば以下のように実装できます。

src/main/java/SampleApp.java(CompletableFuture 活用版)
public class SampleApp {
    public static void main(String... args) {
        ・・・

        repository.publish(new CreateInventoryItem("sample1"))
            .thenApply(SampleApp::dumpInventoryItem)
            .thenCompose(d ->
                repository.publish(new CheckInItemsToInventory(d.getId(), 5))
                    .thenApply(v -> d)
            )
            .thenApply(SampleApp::dumpInventoryItem)
            .thenCompose(d ->
                repository.publish(new CheckInItemsToInventory(d.getId(), 3))
                    .thenApply(v -> d)
            )
            .thenApply(SampleApp::dumpInventoryItem)
            .whenComplete((d, e) -> stopRepository(repository));
    }
    ・・・
}
実行結果
> gradle -q run

[main] INFO org.reflections.Reflections - Reflections took 64 ms to scan 1 urls, producing 2 keys and 4 values
[main] INFO org.reflections.Reflections - Reflections took 17 ms to scan 1 urls, producing 2 keys and 6 values
timeprovider start...
----- InventoryItem -----
id: d7636355-850d-4a15-9e85-ce0f1de514e7
name: sample1
count: 0
----- InventoryItem -----
id: d7636355-850d-4a15-9e85-ce0f1de514e7
name: sample1
count: 5
----- InventoryItem -----
id: d7636355-850d-4a15-9e85-ce0f1de514e7
name: sample1
count: 8
stop...
timeprovider stop...

備考

実は、今回のサンプルを実行すると処理が終わってもプロセスは終了しません。

スレッドダンプを出力してみたところ、StandardEntity に原因がありそうです。

スレッドダンプ出力例
> jcmd 8904 Thread.print

・・・
"Thread-1" #11 prio=5 os_prio=0 ・・・ waiting on condition [・・・]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <・・・> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingDeque.putLast(LinkedBlockingDeque.java:396)
        at java.util.concurrent.LinkedBlockingDeque.put(LinkedBlockingDeque.java:649)
        at com.eventsourcing.StandardEntity.lambda$static$0(StandardEntity.java:27)
        at com.eventsourcing.StandardEntity$$Lambda$14/540159270.run(Unknown Source)
        at java.lang.Thread.run(Thread.java:745)
・・・

StandardEntity のソース(27行目付近)を見てみると、以下のようにスレッドを開始して放置していました。

com.eventsourcing.StandardEntity のソース
・・・
public abstract class StandardEntity<E extends Entity<E>> implements Entity<E> {

    private static LinkedBlockingDeque<UUID> uuids = new LinkedBlockingDeque<>(10_000);

    static {
        new Thread(() -> {
            while (true) {
                try {
                    uuids.put(UUID.randomUUID()); // 27行目
                } catch (InterruptedException e) {
                }
            }
        }).start();
    }
    ・・・

RxJava2 で並列処理

前回 と同じような並列処理を RxJava 2.0 で試してみました。

ソースは http://github.com/fits/try_samples/tree/master/blog/20161226/

はじめに

まずは、map の処理と subscribe の処理を順番に 3回繰り返す処理です。 前回の Reactor との違いは MonoSingle に変わっただけです。

(A) サンプルコード
Single.just("(A)")
    .repeat(3)
    .map(n -> {
        printThread(n + "_map");
        return n;
    })
    .subscribe(n -> printThread(n + "_subscribe"));
(A) サンプルコードの実行結果
(A)_map, thread: Thread[main,5,main]
(A)_subscribe, thread: Thread[main,5,main]
(A)_map, thread: Thread[main,5,main]
(A)_subscribe, thread: Thread[main,5,main]
(A)_map, thread: Thread[main,5,main]
(A)_subscribe, thread: Thread[main,5,main]

observeOn の場合

RxJava 2.0 では Reactor の publishOn の代わりに observeOn が使えます。 main スレッドとは別のスレッドで実行しますが、順番に実行する点は変わりません。

(B) observeOn を使った場合
Single.just("(B)")
    .repeat(3)
    .observeOn(Schedulers.computation())
    // 実質的には以下でも同じ
    //.observeOn(Schedulers.single())
    .map(n -> {
        printThread(n + "_map");
        return n;
    })
    .subscribe(n -> printThread(n + "_subscribe"));
(B) observeOn を使った場合の実行結果
(B)_map, thread: Thread[RxComputationThreadPool-1,5,main]
(B)_subscribe, thread: Thread[RxComputationThreadPool-1,5,main]
(B)_map, thread: Thread[RxComputationThreadPool-1,5,main]
(B)_subscribe, thread: Thread[RxComputationThreadPool-1,5,main]
(B)_map, thread: Thread[RxComputationThreadPool-1,5,main]
(B)_subscribe, thread: Thread[RxComputationThreadPool-1,5,main]

flatMap と subscribeOn を使った並列処理

RxJava 2.0 では Reactor の ParallelFlux 相当の処理は用意されていないようです。

そこで、flatMapsubscribeOn を使ってみました。

(C) flatMap + subscribeOn
Single.just("(C)")
    .repeat(3)
    .flatMap(s ->
        Flowable.just(s)
            .map(n -> {
                printThread(n + "_map");
                return n;
            })
            .subscribeOn(Schedulers.computation())
            //.subscribeOn(Schedulers.newThread())
    )
    .subscribe(n -> printThread(n + "_subscribe"));

ただし、以下の実行結果のように別スレッドで実行してはいるものの、map と subscribe を同じスレッドで実行するわけではありませんでした。

なお、Schedulers.computation() の場合、スレッド数は実行環境のコア数に依存するようです。(newThread は新しいスレッドを使う)

(C) flatMap + subscribeOn の実行結果1
(C)_map, thread: Thread[RxComputationThreadPool-2,5,main]
(C)_map, thread: Thread[RxComputationThreadPool-3,5,main]
(C)_map, thread: Thread[RxComputationThreadPool-1,5,main]
(C)_subscribe, thread: Thread[RxComputationThreadPool-2,5,main]
(C)_subscribe, thread: Thread[RxComputationThreadPool-1,5,main]
(C)_subscribe, thread: Thread[RxComputationThreadPool-1,5,main]
(C) flatMap + subscribeOn の実行結果2
(C)_map, thread: Thread[RxComputationThreadPool-1,5,main]
(C)_map, thread: Thread[RxComputationThreadPool-3,5,main]
(C)_map, thread: Thread[RxComputationThreadPool-2,5,main]
(C)_subscribe, thread: Thread[RxComputationThreadPool-1,5,main]
(C)_subscribe, thread: Thread[RxComputationThreadPool-2,5,main]
(C)_subscribe, thread: Thread[RxComputationThreadPool-2,5,main]

備考

最後に、今回使用したソースを記載しておきます。

build.gradle
apply plugin: 'application'

mainClassName = 'App'

repositories {
    jcenter()
}

dependencies {
    compile 'io.reactivex.rxjava2:rxjava:2.0.3'
}

run {
    standardInput = System.in

    if (project.hasProperty('args')) {
        args project.args
    }
}
src/main/java/App.java
import io.reactivex.Single;
import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;

import java.lang.invoke.MethodHandle;

import static java.lang.invoke.MethodHandles.publicLookup;
import static java.lang.invoke.MethodType.methodType;

public class App {
    public static void main(String... args) throws Throwable {
        MethodHandle mh = publicLookup().findStatic(
            App.class, 
            "sample" + args[0], 
            methodType(void.class)
        );

        mh.invoke();

        System.in.read();
    }

    // (A)
    public static void sampleA() {
        Single.just("(A)")
            .repeat(3)
            .map(n -> {
                printThread(n + "_map");
                return n;
            })
            .subscribe(n -> printThread(n + "_subscribe"));
    }

    // (B)
    public static void sampleB() {
        Single.just("(B)")
            .repeat(3)
            .observeOn(Schedulers.computation())
            //.observeOn(Schedulers.single())
            .map(n -> {
                printThread(n + "_map");
                return n;
            })
            .subscribe(n -> printThread(n + "_subscribe"));
    }

    // (C) 
    public static void sampleC() {
        Single.just("(C)")
            .repeat(3)
            .flatMap(s ->
                Flowable.just(s)
                    .map(n -> {
                        printThread(n + "_map");
                        return n;
                    })
                    .subscribeOn(Schedulers.computation())
                    //.subscribeOn(Schedulers.newThread())

            )
            .subscribe(n -> printThread(n + "_subscribe"));
    }

    private static void printThread(String msg) {
        System.out.println(msg + ", thread: " + Thread.currentThread());
    }
}
実行例
> gradle -q run -Pargs=C

(C)_map, thread: Thread[RxComputationThreadPool-2,5,main]
(C)_map, thread: Thread[RxComputationThreadPool-3,5,main]
(C)_map, thread: Thread[RxComputationThreadPool-1,5,main]
(C)_subscribe, thread: Thread[RxComputationThreadPool-2,5,main]
(C)_subscribe, thread: Thread[RxComputationThreadPool-3,5,main]
(C)_subscribe, thread: Thread[RxComputationThreadPool-3,5,main]