reveno でイベントソーシング

sourcerer でイベントソーシング」 等と同様の処理を reveno で実装してみました。

今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170306/

はじめに

使用する Gradle ビルド定義ファイルは以下の通りです。

build.gradle
apply plugin: 'application'

mainClassName = 'SampleApp'

repositories {
    jcenter()
}

dependencies {
    compileOnly "org.projectlombok:lombok:1.16.12"
    compile 'org.reveno:reveno-core:1.23'
}

lombok は必須ではありません。

(a) transaction 版

reveno では transaction メソッドを使う方法と transactionAction メソッドを使う方法が用意されているようなので、まずは transaction メソッドを使ってみます。

イベントクラスの作成

各種イベント用のクラスを作成します。

reveno はこれまでに試したフレームワークとは異なり、イベントからエンティティの状態を復元したりはしないのでイベントクラスは必須ではありません。(EventBus へ publishEvent しないのであれば不要)

そのため、reveno の場合はイベントソーシングではなくコマンドソーシングと呼べるのかもしれません。

在庫作成イベント src/main/java/sample/events/InventoryItemCreated.java
package sample.events;

import lombok.Value;

@Value
public class InventoryItemCreated {
    private long id;
}
在庫名の変更イベント src/main/java/sample/events/InventoryItemRenamed.java
package sample.events;

import lombok.Value;

@Value
public class InventoryItemRenamed {
    private long id;
    private String newName;
}
在庫数の変更イベント src/main/java/sample/events/ItemsCheckedInToInventory.java
package sample.events;

import lombok.Value;

@Value
public class ItemsCheckedInToInventory {
    private long id;
    private int count;
}

モデルクラスの作成

エンティティクラスとビュークラスを作成します。

エンティティクラスは状態を保存するため、ビュークラスはクエリーの結果としてエンティティクラスから変換して返す事になります。

エンティティクラス src/main/java/sample/model/InventoryItem.java
package sample.model;

import lombok.Value;

@Value
public class InventoryItem {
    private String name;
    private int count;
}
ビュークラス src/main/java/sample/model/InventoryItemView.java
package sample.model;

import lombok.Value;

@Value
public class InventoryItemView {
    private long id;
    private String name;
    private int count;
}

実行クラスの作成

今回はトランザクションやイベントのハンドリング処理等をこのクラスへ実装する事にしました。

transaction メソッドを使用する場合、文字列でトランザクションのアクションを定義し、アクションの実行時に Map でパラメータを渡せばよさそうです。

永続化したデータは Engine のコンストラクタ引数で指定したディレクトリ内のファイルへ保存されるようになっており、storeremap した内容は tx-xxx ファイルへ、publishEvent した内容は evn-xxx ファイルへ保存されるようです。※

 ※ publishEvent を実行しなかった場合、
    evn-xxx ファイルの内容は空になりました

QueryManager を使ってデータを取得する場合、エンティティを直接取得する事はできないので、viewMapper メソッドを使ってエンティティクラスからビュークラスへのマッピングを設定します。

イベントのハンドリングは events メソッドで取得した EventsManager に対して実施します。

今回は、executeSync のような同期用メソッドのみを使っていますが、非同期用のメソッドも用意されています。

実行クラス src/main/java/SampleApp.java
import lombok.val;

import org.reveno.atp.core.Engine;
import org.reveno.atp.utils.MapUtils;

import sample.model.InventoryItem;
import sample.model.InventoryItemView;
import sample.events.InventoryItemCreated;
import sample.events.InventoryItemRenamed;
import sample.events.ItemsCheckedInToInventory;

public class SampleApp {
    public static void main(String... args) {
        val reveno = new Engine("db");

        // 各種ハンドラやマッピング等の設定を実施
        setUp(reveno);

        reveno.startup();

        // 在庫の作成
        long id = reveno.executeSync("createInventoryItem",
                MapUtils.map("name", "sample1"));

        System.out.println("id: " + id);

        // 在庫数の更新
        reveno.executeSync("checkInItemsToInventory", MapUtils.map("id", id, "count", 5));
        // 在庫数の更新
        reveno.executeSync("checkInItemsToInventory", MapUtils.map("id", id, "count", 3));

        // 検索
        val res = reveno.query().find(InventoryItemView.class, id);

        System.out.println("result: " + res);

        reveno.shutdown();
    }

    private static void setUp(Engine reveno) {
        // エンティティクラスをビュークラスへ変換する設定
        reveno.domain().viewMapper(
            InventoryItem.class,
            InventoryItemView.class,
            (id, e, r) -> new InventoryItemView(id, e.getName(), e.getCount())
        );

        // 在庫の作成処理
        reveno.domain().transaction("createInventoryItem", (t, ctx) -> {
            long id = t.id();
            String name = t.arg("name");
            // エンティティ(状態)の保存
            ctx.repo().store(id, new InventoryItem(name, 0));

            // イベントの発行
            ctx.eventBus().publishEvent(new InventoryItemCreated(id));
            ctx.eventBus().publishEvent(new InventoryItemRenamed(id, name));

        }).uniqueIdFor(InventoryItem.class).command();

        // 在庫数の更新処理
        reveno.domain().transaction("checkInItemsToInventory", (t, ctx) -> {
            long id = t.longArg("id");
            int count = t.intArg("count");
            // エンティティ(状態)の更新
            ctx.repo().remap(id, InventoryItem.class, (rid, state) ->
                    new InventoryItem(state.getName(), state.getCount() + count));
            // イベントの発行
            ctx.eventBus().publishEvent(new ItemsCheckedInToInventory(id, count));
        }).command();

        // InventoryItemCreated イベントのハンドリング設定
        reveno.events().eventHandler(InventoryItemCreated.class, (event, meta) ->
                System.out.println("*** create event: " + event +
                        ", transactionTime: " + meta.getTransactionTime() +
                        ", isRestore: " + meta.isRestore()));
    }
}

実行

gradle run で実行した結果です。

実行結果
> gradle run

・・・
id: 1
result: InventoryItemView(id=1, name=sample1, count=8)
・・・
*** create event: InventoryItemCreated(id=1), transactionTime: 1488718421288, isRestore: false
・・・

(b) transactionAction 版

イベントクラス等は同じものを使用して transactionAction を使った処理を作成してみます。

コマンドクラスの作成

transactionAction ではコマンドクラスを使う事になるので作成します。

id の値はインスタンス化の時点では決定せず、コマンドハンドラ内で設定することになるので、@Wither を使って id の値のみ変更したコピーを返すメソッド (withId) を用意するようにしています。

在庫作成コマンド src/main/java/sample/commands/CreateInventoryItem.java
package sample.commands;

import lombok.Value;
import lombok.experimental.NonFinal;
import lombok.experimental.Wither;

@Value
public class CreateInventoryItem {
    @Wither @NonFinal private long id;
    private String name;
}
在庫数の更新コマンド src/main/java/sample/commands/CheckInItemsToInventory.java
package sample.commands;

import lombok.Value;

@Value
public class CheckInItemsToInventory {
    private long id;
    private int count;
}

実行クラスの作成

command メソッドでコマンドハンドラを設定し、コマンド毎のトランザクションアクションを transactionAction で設定します。

コマンドハンドラ内で executeTxAction へコマンドを渡せば該当するトランザクションアクションが実行されます。

実行クラス src/main/java/SampleApp.java
import lombok.val;

import org.reveno.atp.core.Engine;

import sample.commands.CheckInItemsToInventory;
import sample.commands.CreateInventoryItem;
import sample.model.InventoryItem;
import sample.model.InventoryItemView;
import sample.events.InventoryItemCreated;
import sample.events.InventoryItemRenamed;
import sample.events.ItemsCheckedInToInventory;

public class SampleApp {
    public static void main(String... args) {
        val reveno = new Engine("db");

        setUp(reveno);

        reveno.startup();

        // 在庫の作成
        long id = reveno.executeSync(new CreateInventoryItem(0, "sample1"));

        System.out.println("id: " + id);

        // 在庫数の更新
        reveno.executeSync(new CheckInItemsToInventory(id, 5));
        // 在庫数の更新
        reveno.executeSync(new CheckInItemsToInventory(id, 3));

        // 検索
        val res = reveno.query().find(InventoryItemView.class, id);

        System.out.println("result: " + res);

        reveno.shutdown();
    }

    private static void setUp(Engine reveno) {
        ・・・
        // 在庫作成コマンドのハンドリング設定
        reveno.domain().command(CreateInventoryItem.class, Long.class, (cmd, ctx) -> {
            long id = ctx.id(InventoryItem.class);
            // id を更新してトランザクションアクションを実行
            ctx.executeTxAction(cmd.withId(id));
            return id;
        });

        // 在庫数の更新コマンドのハンドリング設定
        reveno.domain().command(CheckInItemsToInventory.class, (cmd, ctx) -> ctx.executeTxAction(cmd));

        reveno.domain().transactionAction(CreateInventoryItem.class, (act, ctx) -> {
            // エンティティ(状態)の保存
            ctx.repo().store(act.getId(), new InventoryItem(act.getName(), 0));

            // イベントの発行
            ctx.eventBus().publishEvent(new InventoryItemCreated(act.getId()));
            ctx.eventBus().publishEvent(new InventoryItemRenamed(act.getId(), act.getName()));
        });

        reveno.domain().transactionAction(CheckInItemsToInventory.class, (act, ctx) -> {
            // エンティティ(状態)の更新
            ctx.repo().remap(act.getId(), InventoryItem.class, (id, state) ->
                    new InventoryItem(state.getName(), state.getCount() + act.getCount()));

            // イベントの発行
            ctx.eventBus().publishEvent(new ItemsCheckedInToInventory(act.getId(), act.getCount()));
        });

        ・・・
    }
}

実行

gradle run で実行した結果です。

実行結果
> gradle run

・・・
id: 1
*** create event: InventoryItemCreated(id=1), transactionTime: 1488721568341, isRestore: false
result: InventoryItemView(id=1, name=sample1, count=8)
・・・

Groovy で Cassandra を組み込み実行

Groovy で Apache Cassandra を組み込み実行してみました。

今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170227/

組み込み実行

まずは、設定ファイルを用意しておきます。

今回は実行に必要な最小限の設定を行っています。

embed.conf
cluster_name: 'Test Cluster'

listen_address: localhost

commitlog_sync: periodic
commitlog_sync_period_in_ms: 10000

partitioner: org.apache.cassandra.dht.Murmur3Partitioner
endpoint_snitch: SimpleSnitch

seed_provider:
    - class_name: org.apache.cassandra.locator.SimpleSeedProvider
      parameters:
          - seeds: "127.0.0.1"

# CQL クライアントで接続するために必要
start_native_transport: true

ここで、Apache Cassandra 3.10 では thrift クライアントで接続するためのポート 9160 はデフォルトで有効のようですが、CQL クライアント用のポート 9042 を有効化するには start_native_transport の設定が必要でした。

ポート番号 用途
9042 CQL クライアント用
9160 thrift クライアント用

Cassandra を組み込み実行する Groovy スクリプトは以下の通りです。

cassandra.config で設定ファイル、cassandra.storagedir でデータディレクトリのパスを設定、CassandraDaemonインスタンス化して activate します。(deactivate を実行すると停止します)

cassandra_embed.groovy
@Grab('org.apache.cassandra:cassandra-all:3.10')
import org.apache.cassandra.service.CassandraDaemon

def conf = 'embed.yaml'
def dir = new File(args[0])

if (!dir.exists()) {
    dir.mkdirs()
}

System.setProperty('cassandra.config', conf)
System.setProperty('cassandra-foreground', 'true')
System.setProperty('cassandra.storagedir', dir.absolutePath)

def cassandra = new CassandraDaemon()
// 開始
cassandra.activate()

System.in.read()

// 終了
cassandra.deactivate()

実行結果は以下の通りで、特に問題なく起動できました。

実行
> groovy cassandra_embed.groovy data

・・・
21:30:46.493 [main] INFO  o.apache.cassandra.transport.Server - Starting listening for CQL clients on localhost/127.0.0.1:9042 (unencrypted)...
・・・
21:30:46.790 [Thread-1] INFO  o.a.cassandra.thrift.ThriftServer - Listening for thrift clients...

動作確認

CQL クライアントを使って Cassandra への接続確認を行います。

cqlsh 利用

まずは、Cassandra 3.10 に同梱されている cqlsh コマンドを使って、キースペースとテーブルを作成しデータ登録を行います。

ここで、cqlsh (本体は cqlsh.py) の実行には Python の実行環境が必要です。

cqlsh による操作結果
> cqlsh

・・・
Connected to Test Cluster at 127.0.0.1:9042.
[cqlsh 5.0.1 | Cassandra 3.10 | CQL spec 3.4.4 | Native protocol v4]
・・・

cqlsh> CREATE KEYSPACE sample WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor' : 1};

cqlsh> use sample;

cqlsh:sample> CREATE TABLE data (id text PRIMARY KEY, name text, value int);

cqlsh:sample> INSERT INTO data (id, name, value) values ('d1', 'sample1', 1);
cqlsh:sample> INSERT INTO data (id, name, value) values ('d2', 'sample2', 20);
cqlsh:sample> INSERT INTO data (id, name, value) values ('d3', 'sample3', 300);

cqlsh:sample> SELECT * FROM data;

 id | name    | value
----+---------+-------
 d2 | sample2 |    20
 d1 | sample1 |     1
 d3 | sample3 |   300

(3 rows)

Datastax Java Driver for Apache Cassandra 利用

次に、登録したデータを Datastax Java Driver for Apache Cassandra を使って検索してみます。

netty と jffi モジュールで Error grabbing Grapes -- [download failed: ・・・] となったので、@GrabExclude を使って回避しています。

client_sample.groovy
@Grapes([
    @Grab('com.datastax.cassandra:cassandra-driver-core:3.1.4'),
    @GrabExclude('io.netty#netty-handler;4.0.37'),
    @GrabExclude('com.github.jnr#jffi;1.2.10')
])
@Grab('io.netty:netty-all:4.0.44.Final')
@Grab('org.slf4j:slf4j-nop:1.7.23')
import com.datastax.driver.core.Cluster

Cluster.builder().addContactPoint('localhost').build().withCloseable { cluster ->
    cluster.connect('sample').withCloseable { session ->

        def res = session.execute('select * from data')

        res.each {
            println it
        }
    }
}

実行結果は以下の通りです。

実行結果
> groovy client_sample.groovy

Row[d2, sample2, 20]
Row[d1, sample1, 1]
Row[d3, sample3, 300]

Groovy で Elasticsearch を組み込み実行

Groovy で Elasticsearch を組み込み実行してみました。

今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170203/

(a) クライアント接続しない場合

まずは、クライアント接続が不可な Elasticsearch を起動して、ドキュメント登録や検索を行ってみます。

ポート番号 クライアント接続
9200 (HTTP) ×
9300 (Transport) ×

Elasticsearch の組み込み実行は、適切な設定を行った Settings(もしくは Environment)を使って Node を作成し start を実行するだけです。

path.home の設定は必須で、指定したパスの data ディレクトリを使用します。(無ければ自動的に作成されます)

transport.typelocal へ、http.enabledfalse へ設定すればクライアントの接続を受け付けない状態になります。※

 ※ クライアント接続を受け付けるためのプラグインを適用していない場合、
    このように設定しておかないと実行時にエラーとなります

この場合、Node の client メソッドで取得した Client を使ってインデックス等を操作します。

els_local.groovy
@Grab('org.elasticsearch:elasticsearch:5.2.0')
// log4j のモジュールが必要(無い場合は NoClassDefFoundError が発生)
@Grab('org.apache.logging.log4j:log4j-api:2.8')
@Grab('org.apache.logging.log4j:log4j-core:2.8')
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.node.Node

def index = args[0] // インデックス
def type = args[1]  // タイプ

// 設定
def setting = Settings.builder()
    .put('path.home', '.') // data ディレクトリの配置先を指定
    .put('transport.type', 'local')
    .put('http.enabled', 'false')
    .build()

new Node(setting).withCloseable { node ->
    // Elasticsearch の実行
    node.start()

    node.client().withCloseable { client ->
        // インデックスへのドキュメント登録
        def r1 = client.prepareIndex(index, type)
                    .setSource('time', System.currentTimeMillis())
                    .execute()
                    .get()

        println r1

        // 検索結果へ即時反映されなかったので適度に待機
        sleep(1000)

        println '-----'
        // 検索
        def r2 = client.prepareSearch(index)
                    .setTypes(type)
                    .execute()
                    .get()

        println r2
    }
}

動作確認

実行結果は以下の通りです。

log4j2 の設定ファイルが見つからない旨のエラーログが出力されていますが、 Elasticsearch の組み込み実行は成功しているようです。

実行結果
> groovy els_local.groovy a1 item

ERROR StatusLogger No log4j2 configuration file found. ・・・
IndexResponse[index=a1,type=item,id=AVn6bOp-0Vu_EXj66Fj9,version=1,result=created,shards={"_shards":{"total":2,"successful":1,"failed":0}}]
-----
{"took":140,"timed_out":false,"_shards":{"total":5,"successful":5,"failed":0},"hits":{"total":1,"max_score":1.0,"hits":[{"_index":"a1","_type":"item","_id":"AVn6bOp-0Vu_EXj66Fj9","_score":1.0,"_source":{"time":1485965157491}}]}}

(b) クライアント接続する場合

次に、クライアント接続が可能な Elasticsearch を組み込み実行します。

ポート番号 クライアント接続
9200 (HTTP)
9300 (Transport)

これらのポート番号でクライアント接続を受けるにはプラグインが必要なので、 今回は Netty4Plugin (transport-netty4-client) を使用しました。

Node の public コンストラクタはプラグインクラスを直接指定できないため、今回は protected コンストラクタ ※ を直接呼び出して Netty4Plugin を適用しています。

 ※ 第 2引数で Plugin クラスを指定できるようになっている
    protected Node(Environment environment, Collection<Class<? extends Plugin>> plugins)

なお、close メソッドの実行有無に関係なく Node は終了してしまうので、System.in.read() を使って終了を止めています。

els_netty.groovy
@Grab('org.elasticsearch:elasticsearch:5.2.0')
@Grab('org.elasticsearch.plugin:transport-netty4-client:5.2.0')
@Grab('org.apache.logging.log4j:log4j-api:2.8')
@Grab('org.apache.logging.log4j:log4j-core:2.8')
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.env.Environment
import org.elasticsearch.node.Node
import org.elasticsearch.transport.Netty4Plugin

def setting = Settings.builder()
    .put('path.home', '.')
    .build()

def env = new Environment(setting)

// Netty4Plugin を指定して Node をインスタンス化
new Node(env, [Netty4Plugin]).withCloseable { node ->

    node.start()

    println 'started server ...'

    // 終了するのを止めるための措置
    System.in.read()
}

動作確認

Elasticsearch を実行します。

Elasticsearch 組み込み実行
> groovy els_netty.groovy

ERROR StatusLogger No log4j2 configuration file found. ・・・
started server ...

(1) HTTP 接続(9200 ポート)

curl で 9200 ポートへ接続した結果は以下の通りです。

実行結果(HTTP)
$ curl -s http://localhost:9200/b1/item -d "{\"time\": `date +%s%3N`}"

{"_index":"b1","_type":"item","_id":"AVn6rxw5O4vwdZ1ibdCo","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"created":true}


$ curl -s http://localhost:9200/b1/item/_search

{"took":78,"timed_out":false,"_shards":{"total":5,"successful":5,"failed":0},"hits":{"total":1,"max_score":1.0,"hits":[{"_index":"b1","_type":"item","_id":"AVn6rxw5O4vwdZ1ibdCo","_score":1.0,"_source":{"time": 1485969495792}}]}}

(2) Transport 接続(9300 ポート)

9300 ポートへ接続して (a) と同等のクライアント処理を実施するスクリプトは以下のようになります。

9300 ポートへ接続するために TransportClient を使用しています。

els_client.groovy
@Grab('org.elasticsearch.client:transport:5.2.0')
@Grab('org.apache.logging.log4j:log4j-api:2.8')
@Grab('org.apache.logging.log4j:log4j-core:2.8')
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.transport.client.PreBuiltTransportClient
import org.elasticsearch.common.transport.InetSocketTransportAddress

def index = args[0]
def type = args[1]

def addr = new InetSocketTransportAddress(
                    InetAddress.getLoopbackAddress(), 9300)

// TransportClient の生成
def transportClient = new PreBuiltTransportClient(Settings.EMPTY)
                            .addTransportAddress(addr)

transportClient.withCloseable { client ->
    // インデックスへのドキュメント登録
    def r1 = client.prepareIndex(index, type)
                .setSource('time', System.currentTimeMillis())
                .execute()
                .get()

    println r1

    sleep(1000)

    println '-----'
    // 検索
    def r2 = client.prepareSearch(index)
                .setTypes(type)
                .execute()
                .get()

    println r2
}

実行結果は以下の通りです。

実行結果(Transport)
> groovy els_client.groovy b2 item

・・・
IndexResponse[index=b2,type=item,id=AVn6tvd3WlzxY0bEAQ4r,version=1,result=created,shards={"_shards":{"total":2,"successful":1,"failed":0}}]
-----
{"took":78,"timed_out":false,"_shards":{"total":5,"successful":5,"failed":0},"hits":{"total":1,"max_score":1.0,"hits":[{"_index":"b2","_type":"item","_id":"AVn6tvd3WlzxY0bEAQ4r","_score":1.0,"_source":{"time":1485970011680}}]}}

備考. HTTP 接続の無効化

9200 ポートの接続だけを無効化するには http.enabledfalse にします。

ポート番号 クライアント接続
9200 (HTTP) ×
9300 (Transport)
els_netty_nohttp.groovy
・・・

def setting = Settings.builder()
    .put('path.home', '.')
    .put('http.enabled', 'false') // HTTP 接続(9200 ポート)の無効化
    .build()

・・・

new Node(env, [Netty4Plugin]).withCloseable { node ->
    ・・・
}

Akka の FileIO でファイルを読み書き

Akka (akka-stream) の FileIO を使ってファイルの読み書きを行ってみます。

今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170131/

はじめに

akka-stream では Java 用の APIakka.stream.javadsl パッケージに、Scala 用の APIakka.stream.scaladsl パッケージに定義されています。

主要なクラスやメソッドは javadsl と scaladsl で概ね共通化されているようですが、Sourcerecover メソッドは deprecated の有無が違っていました。

Source の recover/recoverWith メソッドの deprecated 状況
パッケージ deprecated 有り deprecated 無し 備考
akka.stream.scaladsl recoverWith recover ソースは akka/stream/scaladsl/Flow.scala
akka.stream.javadsl recover recoverWith ソースは akka/stream/javadsl/Source.scala

ここで、deprecated の理由が @deprecated("Use recoverWithRetries instead.", "2.4.4") のようなので、scaladsl の方が正しく、javadsl の方は deprecated するメソッドが違っているのだと思います ※。

 ※ recoverWith の代わりに recoverWithRetries を使えるが
    (引数が 1つ増えただけなので)
    recover は引数の型がそもそも違う

Scala の場合

まずは Scala で実装してみます。

ファイル用の Sink は FileIO.toPath で、Source は FileIO.fromPath で取得できます。

ファイルの入出力の型は akka.util.ByteString となっており、以下では map を使って String との変換を行っています。

Source の recover メソッドを使えば、例外が発生していた場合に代用の値へ差し替えて処理を繋げられます。

今回は、ファイルが存在しない等で IOException が発生した際に、"invalid file, ・・・" という文字列へ差し替えるために recover を使っています。

src/main/scala/SampleApp.scala
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{FileIO, Source}
import akka.util.ByteString

import java.nio.file.Paths
import java.io.IOException

object SampleApp extends App {
  implicit val system = ActorSystem()
  import system.dispatcher  // ExecutionContext を implicit
  implicit val materializer = ActorMaterializer()

  // ファイルへの書き込み(sample1.txt へ "sample data" を出力)
  val res1 = Source.single("sample data")
                   .map(ByteString.fromString)
                   .runWith(FileIO.toPath(Paths.get("sample1.txt")))

  // ファイルの読み込み(sample2.txt の内容を println)
  val res2 = FileIO.fromPath(Paths.get("sample2.txt"))
                   .map(_.utf8String)
                   .recover {
                     case e: IOException => s"invalid file, ${e}"
                   }
                   .runForeach(println)

  res1.flatMap(_ => res2).onComplete(_ => system.terminate)
}

ビルドと実行

Gradle のビルド定義ファイルは以下の通りです。

build.gradle
apply plugin: 'scala'
apply plugin: 'application'

mainClassName = 'SampleApp'

repositories {
    jcenter()
}

dependencies {
    compile 'org.scala-lang:scala-library:2.12.1'
    compile 'com.typesafe.akka:akka-stream_2.12:2.5-M1'
}

sample2.txt ファイルの無い状態で実行します。

実行結果1
> gradle -q run

invalid file, java.nio.file.NoSuchFileException: sample2.txt

sample2.txt を作成して実行します。

実行結果2
> echo %time% > sample2.txt

> gradle -q run

22:11:48.03

Java の場合

次に Java で実装してみます。

Scala と概ね同じですが、akka.stream.javadsl.Source でも recover の引数が scala.PartialFunction となっており、多少の工夫が必要です。

recover の引数に合う scala.PartialFunction は Akka の akka.japi.pf.PFBuilder で作れるので、以下では Match.match から PFBuilder を取得して使っています。

なお、Akka では scala.PartialFunction を組み立てるための Java 用の APIakka.japi.pf パッケージにいくつか用意されています。

src/main/java/SampleApp.java
import akka.actor.ActorSystem;

import akka.japi.pf.Match;
import akka.japi.pf.PFBuilder;

import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.stream.javadsl.FileIO;
import akka.stream.javadsl.Source;

import akka.util.ByteString;

import java.io.IOException;
import java.nio.file.Paths;
import java.util.concurrent.CompletableFuture;

public class SampleApp {
    public static void main(String... args) {
        final ActorSystem system = ActorSystem.create();
        final Materializer materializer = ActorMaterializer.create(system);

        // ファイルへの書き込み(sample1.txt へ "sample data" を出力)
        CompletableFuture<?> res1 = Source.single("sample data")
                .map(ByteString::fromString)
                .runWith(FileIO.toPath(Paths.get("sample1.txt")), materializer)
                .toCompletableFuture();

        // scala.PartialFunction のビルダー定義
        PFBuilder<Throwable, String> pfunc = 
            Match.match(IOException.class, e -> "invalid file, " + e);

        // ファイルの読み込み(sample2.txt の内容を println)
        CompletableFuture<?> res2 = FileIO.fromPath(Paths.get("sample2.txt"))
                .map(ByteString::utf8String)
                .recover(pfunc.build())
                .runForeach(System.out::println, materializer)
                .toCompletableFuture();

        CompletableFuture.allOf(res1, res2)
                .handle((v, e) -> system.terminate())
                .join();
    }
}

ビルドと実行

Gradle のビルド定義ファイルは以下の通りです。

build.gradle
apply plugin: 'application'

mainClassName = 'SampleApp'

repositories {
    jcenter()
}

dependencies {
    compile 'com.typesafe.akka:akka-stream_2.12:2.5-M1'
}

sample2.txt ファイルの無い状態で実行します。

実行結果1
> gradle -q run

・・・\src\main\java\SampleApp.javaは非推奨のAPIを使用またはオーバーライドしています。
・・・
invalid file, java.nio.file.NoSuchFileException: sample2.txt

recover が deprecated されている件で警告メッセージが出力されますが、上述したように recover を deprecated しているのが誤りだと思われるので無視します。

sample2.txt を作成して実行します。

実行結果2
> echo %time% > sample2.txt

> gradle -q run

・・・
22:14:16.00

Sourcerer でイベントソーシング

Axon Framework でイベントソーシング」 や 「es4j でイベントソーシング」 と同様の処理を Sourcerer で実装してみました。

今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170110/

はじめに

以下のような Gradle ビルド定義ファイルを使います。

Sourcerer はイベントの保存先として Event Store を使用するため、今回は Event Store クライアントライブラリに esjc を使う sourcerer-esjc モジュールを使用します。

build.gradle
apply plugin: 'application'

mainClassName = 'SampleApp'

repositories {
    jcenter()
}

dependencies {
    compileOnly 'org.projectlombok:lombok:1.16.12'

    compile 'org.elder.sourcerer:sourcerer-esjc:v4.0.1'
    compile 'com.fasterxml.jackson.core:jackson-databind:2.8.5'

    compile 'io.javaslang:javaslang:2.1.0-alpha'
}

lombokjavaslang は必須ではありません。(javaslang はパターンマッチ処理に使いました)

Sourcerer によるイベントソーシング

コマンドの作成

コマンドは特に注意するところはなく、普通の Java クラスとして実装するだけです。

ここで定義したコマンドは、CommandFactory で作成した Command の引数 (Arguments) として使う事になります。

在庫作成コマンド src/main/java/sample/commands/CreateInventoryItem.java
package sample.commands;

import lombok.Value;

// 在庫作成コマンド
@Value
public class CreateInventoryItem {
    private String id;
    private String name;
}
在庫数追加コマンド src/main/java/sample/commands/CheckInItemsToInventory.java
package sample.commands;

import lombok.Value;

// 在庫数追加コマンド
@Value
public class CheckInItemsToInventory {
    private int count;
}

イベントの作成

イベントの型へ @EventType を付与します。

AggregateProjection 等で指定するイベントの型は 1つなので、まずはベースとなるインターフェースを定義します。

イベントの内容は JSON 化して Event Store へ保存する事になりますが、デフォルトでは JSON へ型情報が付与されず、複数のイベント型を使うと JSON からイベントオブジェクトを復元する際に不都合が生じます。

これを回避するために @JsonTypeInfo@JsonSubTypes を使って JSON へ型情報を残すように設定しています ※。

 ※ @JsonTypeInfo の use へ NAME を指定した場合は
    "@type":"クラス名" の情報を JSON へ出力します
イベントのベースインターフェース src/main/java/sample/events/InventoryEvent.java
package sample.events;

import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.elder.sourcerer.EventType;

@EventType
// JSON へ型情報を残すための設定
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME)
@JsonSubTypes({
    @Type(InventoryItemCreated.class),
    @Type(InventoryItemRenamed.class),
    @Type(ItemsCheckedInToInventory.class)
})
public interface InventoryEvent {
}
在庫作成イベント src/main/java/sample/events/InventoryItemCreated.java
package sample.events;

import lombok.Value;

// 在庫作成イベント
@Value
public class InventoryItemCreated implements InventoryEvent {
    private String id;
}
在庫名の変更イベント src/main/java/sample/events/InventoryItemRenamed.java
package sample.events;

import lombok.Value;

// 在庫名の変更イベント
@Value
public class InventoryItemRenamed implements InventoryEvent {
    private String name;
}
在庫数の追加イベント src/main/java/sample/events/ItemsCheckedInToInventory.java
package sample.events;

import lombok.Value;

// 在庫数の追加イベント
@Value
public class ItemsCheckedInToInventory implements InventoryEvent {
    private int count;
}

コマンドハンドラの作成

コマンドを処理してイベントを生成するメソッドを定義します。

引数や戻り値の型は CommandFactoryfromOperation メソッドへ渡す値 (Operations の各種メソッドで生成) に応じて調整します。

単一のコマンドから複数のイベントを生成する場合は戻り値を List<イベント型> とするだけのようです。

src/main/java/sample/InventoryOperations.java
package sample;

import com.google.common.collect.ImmutableList;
import sample.commands.CheckInItemsToInventory;
import sample.commands.CreateInventoryItem;
import sample.events.InventoryEvent;
import sample.events.InventoryItemCreated;
import sample.events.InventoryItemRenamed;
import sample.events.ItemsCheckedInToInventory;

import java.util.List;

public class InventoryOperations {
    // 作成コマンドの処理
    public static List<InventoryEvent> create(CreateInventoryItem cmd) {
        return ImmutableList.of(
            new InventoryItemCreated(cmd.getId()),
            new InventoryItemRenamed(cmd.getName())
        );
    }
    // 在庫数追加コマンドの処理
    public static ItemsCheckedInToInventory checkIn(InventoryItem state, CheckInItemsToInventory cmd) {
        return new ItemsCheckedInToInventory(cmd.getCount());
    }
}

AggregateProjection の作成

AggregateProjection<状態の型, イベントの型> を実装して、イベントから状態を復元する処理を実装します。

今回は javaslang モジュールの MatchCase を使ってパターンマッチで処理しています。

src/main/java/sample/InventoryProjection.java
package sample;

import org.elder.sourcerer.AggregateProjection;
import org.jetbrains.annotations.NotNull;
import sample.events.InventoryItemCreated;
import sample.events.InventoryEvent;
import sample.events.InventoryItemRenamed;
import sample.events.ItemsCheckedInToInventory;

import static javaslang.API.*;
import static javaslang.Predicates.*;

public class InventoryProjection implements AggregateProjection<InventoryItem, InventoryEvent>{

    @NotNull
    @Override
    public InventoryItem empty() {
        return new InventoryItem("", "", 0);
    }

    @NotNull
    @Override
    public InventoryItem apply(@NotNull String id,
                               @NotNull InventoryItem state, 
                               @NotNull InventoryEvent event) {

        return Match(event).of(
            // InventoryItemCreated イベントの場合
            Case(instanceOf(InventoryItemCreated.class), ev -> 
                    state.withId(ev.getId())),
            // InventoryItemRenamed イベントの場合
            Case(instanceOf(InventoryItemRenamed.class), ev -> 
                    state.withName(ev.getName())),
            // ItemsCheckedInToInventory イベントの場合
            Case(instanceOf(ItemsCheckedInToInventory.class), ev ->
                    state.withCount(state.getCount() + ev.getCount())),
            // その他
            Case($(), state)
        );
    }
}

在庫の状態 InventoryItem は以下のように実装しました。

src/main/java/sample/InventoryItem.java
package sample;

import lombok.Value;

@Value
public class InventoryItem {
    private String id;
    private String name;
    private int count;

    public InventoryItem withId(String newId) {
        return new InventoryItem(newId, name, count);
    }

    public InventoryItem withName(String newName) {
        return new InventoryItem(id, newName, count);
    }

    public InventoryItem withCount(int newCount) {
        return new InventoryItem(id, name, newCount);
    }
}

実行クラスの作成

sourcerer-esjc-spring モジュールを使う方が楽だと思われますが、今回は AggregateRepositoryCommandFactory の組み立て処理を自前で実装してみました。

  • (1) EventStore クライアント作成
  • (2) EventRepositoryFactory 作成
  • (3) EventRepository 作成
  • (4) AggregateRepository 作成
  • (5) CommandFactory 作成

CommandFactory へコマンドハンドラのメソッドをそれぞれ指定して Command を作成します。(コマンドハンドラメソッドのシグネチャに対応する Operations のメソッドを使います)

Command へ aggregateId を設定し arguments へコマンドの内容を設定した後、run を実行するとコマンドが適用されます。

src/main/java/SampleApp.java
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.msemys.esjc.EventStoreBuilder;

import lombok.val;

import org.elder.sourcerer.*;
import org.elder.sourcerer.esjc.EventStoreEsjcEventRepositoryFactory;

import sample.*;
import sample.commands.CheckInItemsToInventory;
import sample.commands.CreateInventoryItem;
import sample.events.InventoryEvent;

import java.util.UUID;

public class SampleApp {
    public static void main(String... args) throws JsonProcessingException {
        // (1) EventStore クライアント作成
        val eventStore = EventStoreBuilder.newBuilder()
                // ポート番号に 1113 を使う点に注意 (2113 ではない)
                .singleNodeAddress("localhost", 1113) 
                .build();

        val mapper = new ObjectMapper();
        // (2) EventRepositoryFactory 作成
        val eventRepositoryFactory = new EventStoreEsjcEventRepositoryFactory(
                eventStore, 
                mapper, 
                "sample"
        );
        // (3) EventRepository 作成
        val eventRepository = eventRepositoryFactory.getEventRepository(InventoryEvent.class);
        // (4) AggregateRepository 作成
        val aggregateRepository = new DefaultAggregateRepository<>(
                eventRepository, 
                new InventoryProjection()
        );
        // (5) CommandFactory 作成
        val commandFactory = new DefaultCommandFactory<>(aggregateRepository);

        // 在庫作成処理
        val createCommand = commandFactory.fromOperation(
                Operations.constructorOf(InventoryOperations::create));
        // 在庫数の追加処理
        val checkInCommand = commandFactory.fromOperation(
                Operations.updateOf(InventoryOperations::checkIn));

        val id = UUID.randomUUID().toString();
        // 在庫作成の実行
        val r1 = createCommand.setAggregateId(id)
                .setArguments(new CreateInventoryItem(id, "sample"))
                .run();

        printCommandResult(r1);
        // 在庫数の追加の実行(1回目)
        val r2 = checkInCommand.setAggregateId(id)
                .setArguments(new CheckInItemsToInventory(5))
                .run();

        printCommandResult(r2);

        // 在庫数の追加の実行(2回目)
        val r3 = checkInCommand.setAggregateId(id)
                .setArguments(new CheckInItemsToInventory(3))
                .run();

        printCommandResult(r3);

        // 集約の内容を取得
        val aggregate = aggregateRepository.load(id);
        // 状態を出力
        System.out.println("*** result state: " + aggregate.state());

        eventStore.disconnect();
    }

    private static void printCommandResult(CommandResult<? extends InventoryEvent> result) {
        System.out.printf(
            "*** command result: events=%s, newVersion=%s\n", 
            result.getEvents(), 
            result.getNewVersion()
        );
    }
}

実行

https://geteventstore.com/ から Event Store の実行環境をダウンロードして、起動しておきます。

Event Store 起動
> EventStore.ClusterNode.exe --db ./db --log ./logs

・・・
[08912,14,15:31:00.488] Starting Normal TCP listening on TCP endpoint: 127.0.0.1:1113.
・・・
[08912,09,15:31:07.100] Created stats stream '$stats-127.0.0.1:2113', code = Success
サンプルアプリケーションの実行
> gradle -q run

・・・
*** command result: events=[InventoryItemCreated(id=4184fd14-7725-4d09-8e24-59e9a94859a1), InventoryItemRenamed(name=sample)], newVersion=1
・・・
*** command result: events=[ItemsCheckedInToInventory(count=5)], newVersion=2
・・・
*** command result: events=[ItemsCheckedInToInventory(count=3)], newVersion=3
・・・
*** result state: InventoryItem(id=4184fd14-7725-4d09-8e24-59e9a94859a1, name=sample, count=8)
・・・

Event Store への登録内容を確認するため、http://127.0.0.1:2113/ へアクセスして 「Stream Browser」 で sample:・・・・ の内容を見てみました。

f:id:fits:20170110235412p:plain

備考

今回のサンプルも処理が終わった後(disconnect 実行後)にプロセスが終了しません。

原因を詳しく調べていませんが、Event Store クライアントライブラリの esjc 1.6.0 を単体で試した際に、以下が原因でプロセスが終了しないようだったので、esjc が原因かもしれません。

  • executor 未指定の際に Settings 内で作る ThreadPoolExecutor の shutdown を実行しない
  • EventStoreTcp 内の NioEventLoopGroup の shutdownGracefully を実行しない

es4j でイベントソーシング

Axon Framework でイベントソーシング」 と同様の処理を es4j (Eventsourcing for Java) で実装してみました。

今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170107/

はじめに

今回はイベントをインメモリで保持する eventsourcing-inmem を使うため、以下のような Gradle 用ビルド定義を用意しました。 (lombok は便利なので使います)

build.gradle
apply plugin: 'application'

mainClassName = 'SampleApp'

repositories {
    jcenter()
}

dependencies {
    compileOnly "org.projectlombok:lombok:1.16.12"

    compile 'com.eventsourcing:eventsourcing-inmem:0.4.5'
}

es4j によるイベントソーシング

コマンドの作成

コマンドは StandardCommand<S, R> を継承して作成し、S に状態の型を R に結果の型を指定します。

コマンドを適用することで生じるイベントは events メソッドの戻り値で返し、コマンド適用結果は result メソッドの戻り値で返します。

状態は events メソッドで用意し、result メソッドの引数として渡ってきます。

まずは、在庫作成のコマンドを実装してみます。

状態の型に InventoryItemCreated を、結果の型に InventoryItem を指定しています。

状態を伴う EventStream を返すために EventStream.ofWithState を使用します。

コンストラクタの引数名からプロパティを認識するようなので、-parameters オプションを使ってビルドするか、引数へ @PropertyName を付与します。(今回は @PropertyName を使用)

なお、状態の型は InventoryItemCreated としなければならないわけでもなく、UUID を代わりに使っても動作に支障はなさそうでした。

在庫作成コマンド src/main/java/sample/commands/CreateInventoryItem.java
package sample.commands;

import com.eventsourcing.EventStream;
import com.eventsourcing.Repository;
import com.eventsourcing.StandardCommand;
import com.eventsourcing.layout.PropertyName;

import lombok.EqualsAndHashCode;
import lombok.Value;
import lombok.val;

import sample.domain.InventoryItem;
import sample.events.InventoryItemCreated;
import sample.events.InventoryItemRenamed;

// 在庫作成コマンド
@Value
@EqualsAndHashCode(callSuper = false)
public class CreateInventoryItem extends StandardCommand<InventoryItemCreated, InventoryItem> {
    private String name;

    public CreateInventoryItem(@PropertyName("name") String name) {
        this.name = name;
    }
    // イベントの生成
    @Override
    public EventStream<InventoryItemCreated> events() throws Exception {
        // 在庫作成イベント
        val created = new InventoryItemCreated();
        // 名前変更イベント
        val renamed = new InventoryItemRenamed(created.uuid(), name);

        return EventStream.ofWithState(created, created, renamed);
    }
    // 結果の生成
    @Override
    public InventoryItem result(InventoryItemCreated state, Repository repository) {
        return InventoryItem.lookup(repository, state.uuid()).get();
    }
}

次に、在庫数を加えるためのコマンドです。

ドメインモデルの内部状態を更新するだけなので、状態と結果の型を Void としました。

状態を伴わないので EventStream.of で EventStream を返します。 コマンドの適用結果が Void なので result メソッドをオーバーライドする必要もありません。

在庫数追加コマンド src/main/java/sample/commands/CheckInItemsToInventory.java
package sample.commands;

import com.eventsourcing.EventStream;
import com.eventsourcing.StandardCommand;
import com.eventsourcing.layout.PropertyName;

import lombok.EqualsAndHashCode;
import lombok.Value;

import java.util.UUID;

import sample.events.ItemsCheckedInToInventory;

// 在庫数追加コマンド
@Value
@EqualsAndHashCode(callSuper = false)
public class CheckInItemsToInventory extends StandardCommand<Void, Void> {
    private UUID reference;
    private int count;

    public CheckInItemsToInventory(
            @PropertyName("reference") UUID reference,
            @PropertyName("count") int count) {

        this.reference = reference;
        this.count = count;
    }

    @Override
    public EventStream<Void> events() throws Exception {
        return EventStream.of(new ItemsCheckedInToInventory(reference, count));
    }
}

イベントの作成

イベントは StandardEvent を継承して作成します。

イベントを検索するための条件を SimpleIndex を使って定義します。

以下では、uuid の値で InventoryItemCreated を取得できるように、SimpleIndex<InventoryItemCreated, UUID> を定義しています。

在庫作成イベント src/main/java/sample/events/InventoryItemCreated.java
package sample.events;

import com.eventsourcing.StandardEvent;
import com.eventsourcing.index.SimpleIndex;

import java.util.UUID;

// 在庫作成イベント
public class InventoryItemCreated extends StandardEvent {
    public static SimpleIndex<InventoryItemCreated, UUID> ID = SimpleIndex.as(InventoryItemCreated::uuid);
}

在庫名の更新イベントの場合、最新のものを取得すれば最終的な名称が決定するので、timestamp で比較する SimpleIndex<InventoryItemRenamed, HybridTimestamp> を定義しています。

なお、名前変更のためのイベントは eventsourcing-cep モジュールに予め用意されていますが(NameChanged)、今回は自前で実装しています。

在庫名の変更イベント src/main/java/sample/events/InventoryItemRenamed.java
package sample.events;

import com.eventsourcing.StandardEvent;
import com.eventsourcing.hlc.HybridTimestamp;
import com.eventsourcing.index.Index;
import com.eventsourcing.index.SimpleIndex;
import com.eventsourcing.layout.PropertyName;

import lombok.EqualsAndHashCode;
import lombok.Value;

import java.util.UUID;

import static com.eventsourcing.index.IndexEngine.IndexFeature.*;

// 在庫名の変更イベント
@Value
@EqualsAndHashCode(callSuper = false)
public class InventoryItemRenamed extends StandardEvent {
    private UUID reference;
    private String newName;

    public InventoryItemRenamed(@PropertyName("reference") UUID reference,
                                @PropertyName("newName") String newName) {

        this.reference = reference;
        this.newName = newName;
    }

    public static SimpleIndex<InventoryItemRenamed, UUID> ID =
            SimpleIndex.as(InventoryItemRenamed::uuid);

    public static SimpleIndex<InventoryItemRenamed, UUID> REFERENCE_ID =
            SimpleIndex.as(InventoryItemRenamed::getReference);

    @Index({EQ, LT, GT})
    public static SimpleIndex<InventoryItemRenamed, HybridTimestamp> TIMESTAMP =
            SimpleIndex.as(InventoryItemRenamed::timestamp);
}
在庫数の追加イベント src/main/java/sample/events/ItemsCheckedInToInventory.java
package sample.events;

import com.eventsourcing.StandardEvent;
import com.eventsourcing.index.SimpleIndex;
import com.eventsourcing.layout.PropertyName;

import lombok.EqualsAndHashCode;
import lombok.Value;

import java.util.UUID;

// 在庫数の追加イベント
@Value
@EqualsAndHashCode(callSuper = false)
public class ItemsCheckedInToInventory extends StandardEvent {
    private UUID reference;
    private int count;

    public ItemsCheckedInToInventory(
            @PropertyName("reference") UUID reference,
            @PropertyName("count") int count) {

        this.reference = reference;
        this.count = count;
    }

    public static SimpleIndex<ItemsCheckedInToInventory, UUID> ID =
            SimpleIndex.as(ItemsCheckedInToInventory::uuid);

    public static SimpleIndex<ItemsCheckedInToInventory, UUID> REFERENCE_ID =
            SimpleIndex.as(ItemsCheckedInToInventory::getReference);
}

ドメインプロトコルの作成

ドメインプロトコルという機能を使えば、共通的なプロパティ(例えば name など)をドメインモデルから独立して定義できます。(ドメインモデルへ implements して使う)

Protocol を extends したインターフェースとして定義し、プロパティの値を(イベントの内容から)確定する処理をデフォルトメソッドとして定義します。

まずは、在庫名のドメインプロトコルを実装してみます。

在庫名は一番最後の InventoryItemRenamed の newName で確定するため、latestAssociatedEntity を使って最後の InventoryItemRenamed イベントを取得します。

なお、名前に関するドメインプロトコルも eventsourcing-cep モジュールに予め用意されています(NameProtocol)。

在庫名 src/main/java/sample/protocols/InventoryItemNameProtocol.java
package sample.protocols;

import com.eventsourcing.Protocol;
import com.eventsourcing.queries.ModelQueries;

import sample.events.InventoryItemRenamed;

// 在庫名
public interface InventoryItemNameProtocol extends Protocol, ModelQueries {

    default String name() {
        // reference が id の値に等しい最後の InventoryItemRenamed を取得し
        // newName の値を返す
        return latestAssociatedEntity(
            InventoryItemRenamed.class,
            InventoryItemRenamed.REFERENCE_ID,
            InventoryItemRenamed.TIMESTAMP
        ).map(InventoryItemRenamed::getNewName).orElse("");
    }
}

次に、在庫数の場合は、ItemsCheckedInToInventory を集計するように Repositoryquery メソッドで該当する全ての ItemsCheckedInToInventory を取得し、count の合計値を算出しています。

在庫数 src/main/java/sample/protocols/InventoryItemCountProtocol.java
package sample.protocols;

import com.eventsourcing.EntityHandle;
import com.eventsourcing.Protocol;

import lombok.val;

import java.util.stream.StreamSupport;

import sample.events.ItemsCheckedInToInventory;

import static com.eventsourcing.index.EntityQueryFactory.equal;

// 在庫数
public interface InventoryItemCountProtocol extends Protocol {

    default int count() {
        // reference が id の値に等しい ItemsCheckedInToInventory を全て取得
        val res = getRepository().query(
            ItemsCheckedInToInventory.class,
            equal(ItemsCheckedInToInventory.REFERENCE_ID, getId())
        );

        return StreamSupport.stream(res.spliterator(), false)
                .map(EntityHandle::get)
                .mapToInt(ItemsCheckedInToInventory::getCount)
                .sum();
    }
}

ドメインモデルの作成

ドメインモデルは Model と必要なドメインプロトコルを implements して作成します。

同一インスタンスの判定に id の値だけを使うように equals と hashCode をオーバーライドします。(今回は @EqualsAndHashCode(of = "id") で実施)

CreateInventoryItem コマンドの result メソッドで使った lookup メソッドの実装も行います。

src/main/java/sample/domain/InventoryItem.java
package sample.domain;

import com.eventsourcing.Model;
import com.eventsourcing.Repository;
import com.eventsourcing.queries.ModelQueries;

import lombok.EqualsAndHashCode;
import lombok.Value;

import java.util.Optional;
import java.util.UUID;

import sample.events.InventoryItemCreated;
import sample.protocols.InventoryItemCountProtocol;
import sample.protocols.InventoryItemNameProtocol;

@Value
@EqualsAndHashCode(of = "id")
public class InventoryItem implements Model, InventoryItemNameProtocol, InventoryItemCountProtocol {

    private final Repository repository;
    private final UUID id;

    protected InventoryItem(Repository repository, UUID id) {
        this.repository = repository;
        this.id = id;
    }

    public static Optional<InventoryItem> lookup(Repository repository, UUID id) {
        // InventoryItemCreated.ID インデックス(SimpleIndex)を使って
        // id に合致する InventoryItemCreated を取得
        Optional<InventoryItemCreated> res = ModelQueries.lookup(
            repository,
            InventoryItemCreated.class,
            InventoryItemCreated.ID,
            id
        );

        return res.map(ev -> new InventoryItem(repository, id));
    }
}

実行クラスの作成

es4j では Repository を使ってイベントソーシングの処理を行います。

Repository は StandardRepository へ以下のような設定を行う事で構築できます。

  • (1) Journal と IndexEngine の設定
  • (2) コマンドとイベントの設定(CommandSetProvider と EventSetProvider)

StandardRepository のデフォルト設定では localhost の NTP サーバーを使うようになっているようですが (NTP サーバーが無いとエラーになる)、今回はデフォルト設定の代わりに PhysicalTimeProvider 実装クラスを用意しました。

(2) で PackageXXXSetProvider を使うと指定のパッケージへ属するコマンドやイベントを一括で設定できます。

コマンドを適用する際は publish メソッドを使います。(publish の戻り値は CompletableFuture

src/main/java/SampleApp.java
import com.eventsourcing.*;
import com.eventsourcing.hlc.PhysicalTimeProvider;
import com.eventsourcing.index.MemoryIndexEngine;
import com.eventsourcing.inmem.MemoryJournal;
import com.eventsourcing.repository.StandardRepository;
import com.google.common.util.concurrent.AbstractService;

import java.util.concurrent.*;

import lombok.val;

import sample.commands.CheckInItemsToInventory;
import sample.commands.CreateInventoryItem;
import sample.domain.InventoryItem;
import sample.events.InventoryItemCreated;

public class SampleApp {
    public static void main(String... args) {
        // (1) Journal と IndexEngine の設定
        val repository = StandardRepository.builder()
                .journal(new MemoryJournal())
                .indexEngine(new MemoryIndexEngine())
                .physicalTimeProvider(new SampleTimeProvider())
                .build();

        // (2) コマンドとイベントの設定(CommandSetProvider と EventSetProvider)
        repository.addCommandSetProvider(
            new PackageCommandSetProvider(new Package[] {CreateInventoryItem.class.getPackage()}));

        repository.addEventSetProvider(
            new PackageEventSetProvider(new Package[] {InventoryItemCreated.class.getPackage()}));

        // 開始
        repository.startAsync().awaitRunning();

        try {
            // CreateInventoryItem コマンド適用
            val d = repository.publish(new CreateInventoryItem("sample1")).get();

            dumpInventoryItem(d);

            // CheckInItemsToInventory コマンド適用
            repository.publish(new CheckInItemsToInventory(d.getId(), 5)).get();

            dumpInventoryItem(d);

            // CheckInItemsToInventory コマンド適用
            repository.publish(new CheckInItemsToInventory(d.getId(), 3)).get();

            dumpInventoryItem(d);

        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        } finally {
            // 停止
            stopRepository(repository);
        }
    }

    private static InventoryItem dumpInventoryItem(InventoryItem item) {
        System.out.println("----- InventoryItem -----");

        System.out.println("id: " + item.getId());
        System.out.println("name: " + item.name());
        System.out.println("count: " + item.count());

        return item;
    }

    private static void stopRepository(Repository repository) {
        System.out.println("stop...");

        try {
            repository.stopAsync().awaitTerminated(10, TimeUnit.SECONDS);
        } catch (TimeoutException ex) {
            ex.printStackTrace();
        }
    }
    // PhysicalTimeProvider の実装
    static class SampleTimeProvider extends AbstractService implements PhysicalTimeProvider {

        @Override
        public long getPhysicalTime() {
            return System.currentTimeMillis();
        }

        @Override
        protected void doStart() {
            System.out.println("timeprovider start...");
            notifyStarted();
        }

        @Override
        protected void doStop() {
            System.out.println("timeprovider stop...");
            notifyStopped();
        }
    }
}

publish の呼び出し箇所は CompletableFuture で処理を繋げれば以下のように実装できます。

src/main/java/SampleApp.java(CompletableFuture 活用版)
public class SampleApp {
    public static void main(String... args) {
        ・・・

        repository.publish(new CreateInventoryItem("sample1"))
            .thenApply(SampleApp::dumpInventoryItem)
            .thenCompose(d ->
                repository.publish(new CheckInItemsToInventory(d.getId(), 5))
                    .thenApply(v -> d)
            )
            .thenApply(SampleApp::dumpInventoryItem)
            .thenCompose(d ->
                repository.publish(new CheckInItemsToInventory(d.getId(), 3))
                    .thenApply(v -> d)
            )
            .thenApply(SampleApp::dumpInventoryItem)
            .whenComplete((d, e) -> stopRepository(repository));
    }
    ・・・
}
実行結果
> gradle -q run

[main] INFO org.reflections.Reflections - Reflections took 64 ms to scan 1 urls, producing 2 keys and 4 values
[main] INFO org.reflections.Reflections - Reflections took 17 ms to scan 1 urls, producing 2 keys and 6 values
timeprovider start...
----- InventoryItem -----
id: d7636355-850d-4a15-9e85-ce0f1de514e7
name: sample1
count: 0
----- InventoryItem -----
id: d7636355-850d-4a15-9e85-ce0f1de514e7
name: sample1
count: 5
----- InventoryItem -----
id: d7636355-850d-4a15-9e85-ce0f1de514e7
name: sample1
count: 8
stop...
timeprovider stop...

備考

実は、今回のサンプルを実行すると処理が終わってもプロセスは終了しません。

スレッドダンプを出力してみたところ、StandardEntity に原因がありそうです。

スレッドダンプ出力例
> jcmd 8904 Thread.print

・・・
"Thread-1" #11 prio=5 os_prio=0 ・・・ waiting on condition [・・・]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <・・・> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingDeque.putLast(LinkedBlockingDeque.java:396)
        at java.util.concurrent.LinkedBlockingDeque.put(LinkedBlockingDeque.java:649)
        at com.eventsourcing.StandardEntity.lambda$static$0(StandardEntity.java:27)
        at com.eventsourcing.StandardEntity$$Lambda$14/540159270.run(Unknown Source)
        at java.lang.Thread.run(Thread.java:745)
・・・

StandardEntity のソース(27行目付近)を見てみると、以下のようにスレッドを開始して放置していました。

com.eventsourcing.StandardEntity のソース
・・・
public abstract class StandardEntity<E extends Entity<E>> implements Entity<E> {

    private static LinkedBlockingDeque<UUID> uuids = new LinkedBlockingDeque<>(10_000);

    static {
        new Thread(() -> {
            while (true) {
                try {
                    uuids.put(UUID.randomUUID()); // 27行目
                } catch (InterruptedException e) {
                }
            }
        }).start();
    }
    ・・・

RxJava2 で並列処理

前回 と同じような並列処理を RxJava 2.0 で試してみました。

ソースは http://github.com/fits/try_samples/tree/master/blog/20161226/

はじめに

まずは、map の処理と subscribe の処理を順番に 3回繰り返す処理です。 前回の Reactor との違いは MonoSingle に変わっただけです。

(A) サンプルコード
Single.just("(A)")
    .repeat(3)
    .map(n -> {
        printThread(n + "_map");
        return n;
    })
    .subscribe(n -> printThread(n + "_subscribe"));
(A) サンプルコードの実行結果
(A)_map, thread: Thread[main,5,main]
(A)_subscribe, thread: Thread[main,5,main]
(A)_map, thread: Thread[main,5,main]
(A)_subscribe, thread: Thread[main,5,main]
(A)_map, thread: Thread[main,5,main]
(A)_subscribe, thread: Thread[main,5,main]

observeOn の場合

RxJava 2.0 では Reactor の publishOn の代わりに observeOn が使えます。 main スレッドとは別のスレッドで実行しますが、順番に実行する点は変わりません。

(B) observeOn を使った場合
Single.just("(B)")
    .repeat(3)
    .observeOn(Schedulers.computation())
    // 実質的には以下でも同じ
    //.observeOn(Schedulers.single())
    .map(n -> {
        printThread(n + "_map");
        return n;
    })
    .subscribe(n -> printThread(n + "_subscribe"));
(B) observeOn を使った場合の実行結果
(B)_map, thread: Thread[RxComputationThreadPool-1,5,main]
(B)_subscribe, thread: Thread[RxComputationThreadPool-1,5,main]
(B)_map, thread: Thread[RxComputationThreadPool-1,5,main]
(B)_subscribe, thread: Thread[RxComputationThreadPool-1,5,main]
(B)_map, thread: Thread[RxComputationThreadPool-1,5,main]
(B)_subscribe, thread: Thread[RxComputationThreadPool-1,5,main]

flatMap と subscribeOn を使った並列処理

RxJava 2.0 では Reactor の ParallelFlux 相当の処理は用意されていないようです。

そこで、flatMapsubscribeOn を使ってみました。

(C) flatMap + subscribeOn
Single.just("(C)")
    .repeat(3)
    .flatMap(s ->
        Flowable.just(s)
            .map(n -> {
                printThread(n + "_map");
                return n;
            })
            .subscribeOn(Schedulers.computation())
            //.subscribeOn(Schedulers.newThread())
    )
    .subscribe(n -> printThread(n + "_subscribe"));

ただし、以下の実行結果のように別スレッドで実行してはいるものの、map と subscribe を同じスレッドで実行するわけではありませんでした。

なお、Schedulers.computation() の場合、スレッド数は実行環境のコア数に依存するようです。(newThread は新しいスレッドを使う)

(C) flatMap + subscribeOn の実行結果1
(C)_map, thread: Thread[RxComputationThreadPool-2,5,main]
(C)_map, thread: Thread[RxComputationThreadPool-3,5,main]
(C)_map, thread: Thread[RxComputationThreadPool-1,5,main]
(C)_subscribe, thread: Thread[RxComputationThreadPool-2,5,main]
(C)_subscribe, thread: Thread[RxComputationThreadPool-1,5,main]
(C)_subscribe, thread: Thread[RxComputationThreadPool-1,5,main]
(C) flatMap + subscribeOn の実行結果2
(C)_map, thread: Thread[RxComputationThreadPool-1,5,main]
(C)_map, thread: Thread[RxComputationThreadPool-3,5,main]
(C)_map, thread: Thread[RxComputationThreadPool-2,5,main]
(C)_subscribe, thread: Thread[RxComputationThreadPool-1,5,main]
(C)_subscribe, thread: Thread[RxComputationThreadPool-2,5,main]
(C)_subscribe, thread: Thread[RxComputationThreadPool-2,5,main]

備考

最後に、今回使用したソースを記載しておきます。

build.gradle
apply plugin: 'application'

mainClassName = 'App'

repositories {
    jcenter()
}

dependencies {
    compile 'io.reactivex.rxjava2:rxjava:2.0.3'
}

run {
    standardInput = System.in

    if (project.hasProperty('args')) {
        args project.args
    }
}
src/main/java/App.java
import io.reactivex.Single;
import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;

import java.lang.invoke.MethodHandle;

import static java.lang.invoke.MethodHandles.publicLookup;
import static java.lang.invoke.MethodType.methodType;

public class App {
    public static void main(String... args) throws Throwable {
        MethodHandle mh = publicLookup().findStatic(
            App.class, 
            "sample" + args[0], 
            methodType(void.class)
        );

        mh.invoke();

        System.in.read();
    }

    // (A)
    public static void sampleA() {
        Single.just("(A)")
            .repeat(3)
            .map(n -> {
                printThread(n + "_map");
                return n;
            })
            .subscribe(n -> printThread(n + "_subscribe"));
    }

    // (B)
    public static void sampleB() {
        Single.just("(B)")
            .repeat(3)
            .observeOn(Schedulers.computation())
            //.observeOn(Schedulers.single())
            .map(n -> {
                printThread(n + "_map");
                return n;
            })
            .subscribe(n -> printThread(n + "_subscribe"));
    }

    // (C) 
    public static void sampleC() {
        Single.just("(C)")
            .repeat(3)
            .flatMap(s ->
                Flowable.just(s)
                    .map(n -> {
                        printThread(n + "_map");
                        return n;
                    })
                    .subscribeOn(Schedulers.computation())
                    //.subscribeOn(Schedulers.newThread())

            )
            .subscribe(n -> printThread(n + "_subscribe"));
    }

    private static void printThread(String msg) {
        System.out.println(msg + ", thread: " + Thread.currentThread());
    }
}
実行例
> gradle -q run -Pargs=C

(C)_map, thread: Thread[RxComputationThreadPool-2,5,main]
(C)_map, thread: Thread[RxComputationThreadPool-3,5,main]
(C)_map, thread: Thread[RxComputationThreadPool-1,5,main]
(C)_subscribe, thread: Thread[RxComputationThreadPool-2,5,main]
(C)_subscribe, thread: Thread[RxComputationThreadPool-3,5,main]
(C)_subscribe, thread: Thread[RxComputationThreadPool-3,5,main]