Sourcerer でイベントソーシング

Axon Framework でイベントソーシング」 や 「es4j でイベントソーシング」 と同様の処理を Sourcerer で実装してみました。

今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170110/

はじめに

以下のような Gradle ビルド定義ファイルを使います。

Sourcerer はイベントの保存先として Event Store を使用するため、今回は Event Store クライアントライブラリに esjc を使う sourcerer-esjc モジュールを使用します。

build.gradle
apply plugin: 'application'

mainClassName = 'SampleApp'

repositories {
    jcenter()
}

dependencies {
    compileOnly 'org.projectlombok:lombok:1.16.12'

    compile 'org.elder.sourcerer:sourcerer-esjc:v4.0.1'
    compile 'com.fasterxml.jackson.core:jackson-databind:2.8.5'

    compile 'io.javaslang:javaslang:2.1.0-alpha'
}

lombokjavaslang は必須ではありません。(javaslang はパターンマッチ処理に使いました)

Sourcerer によるイベントソーシング

コマンドの作成

コマンドは特に注意するところはなく、普通の Java クラスとして実装するだけです。

ここで定義したコマンドは、CommandFactory で作成した Command の引数 (Arguments) として使う事になります。

在庫作成コマンド src/main/java/sample/commands/CreateInventoryItem.java
package sample.commands;

import lombok.Value;

// 在庫作成コマンド
@Value
public class CreateInventoryItem {
    private String id;
    private String name;
}
在庫数追加コマンド src/main/java/sample/commands/CheckInItemsToInventory.java
package sample.commands;

import lombok.Value;

// 在庫数追加コマンド
@Value
public class CheckInItemsToInventory {
    private int count;
}

イベントの作成

イベントの型へ @EventType を付与します。

AggregateProjection 等で指定するイベントの型は 1つなので、まずはベースとなるインターフェースを定義します。

イベントの内容は JSON 化して Event Store へ保存する事になりますが、デフォルトでは JSON へ型情報が付与されず、複数のイベント型を使うと JSON からイベントオブジェクトを復元する際に不都合が生じます。

これを回避するために @JsonTypeInfo@JsonSubTypes を使って JSON へ型情報を残すように設定しています ※。

 ※ @JsonTypeInfo の use へ NAME を指定した場合は
    "@type":"クラス名" の情報を JSON へ出力します
イベントのベースインターフェース src/main/java/sample/events/InventoryEvent.java
package sample.events;

import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.elder.sourcerer.EventType;

@EventType
// JSON へ型情報を残すための設定
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME)
@JsonSubTypes({
    @Type(InventoryItemCreated.class),
    @Type(InventoryItemRenamed.class),
    @Type(ItemsCheckedInToInventory.class)
})
public interface InventoryEvent {
}
在庫作成イベント src/main/java/sample/events/InventoryItemCreated.java
package sample.events;

import lombok.Value;

// 在庫作成イベント
@Value
public class InventoryItemCreated implements InventoryEvent {
    private String id;
}
在庫名の変更イベント src/main/java/sample/events/InventoryItemRenamed.java
package sample.events;

import lombok.Value;

// 在庫名の変更イベント
@Value
public class InventoryItemRenamed implements InventoryEvent {
    private String name;
}
在庫数の追加イベント src/main/java/sample/events/ItemsCheckedInToInventory.java
package sample.events;

import lombok.Value;

// 在庫数の追加イベント
@Value
public class ItemsCheckedInToInventory implements InventoryEvent {
    private int count;
}

コマンドハンドラの作成

コマンドを処理してイベントを生成するメソッドを定義します。

引数や戻り値の型は CommandFactoryfromOperation メソッドへ渡す値 (Operations の各種メソッドで生成) に応じて調整します。

単一のコマンドから複数のイベントを生成する場合は戻り値を List<イベント型> とするだけのようです。

src/main/java/sample/InventoryOperations.java
package sample;

import com.google.common.collect.ImmutableList;
import sample.commands.CheckInItemsToInventory;
import sample.commands.CreateInventoryItem;
import sample.events.InventoryEvent;
import sample.events.InventoryItemCreated;
import sample.events.InventoryItemRenamed;
import sample.events.ItemsCheckedInToInventory;

import java.util.List;

public class InventoryOperations {
    // 作成コマンドの処理
    public static List<InventoryEvent> create(CreateInventoryItem cmd) {
        return ImmutableList.of(
            new InventoryItemCreated(cmd.getId()),
            new InventoryItemRenamed(cmd.getName())
        );
    }
    // 在庫数追加コマンドの処理
    public static ItemsCheckedInToInventory checkIn(InventoryItem state, CheckInItemsToInventory cmd) {
        return new ItemsCheckedInToInventory(cmd.getCount());
    }
}

AggregateProjection の作成

AggregateProjection<状態の型, イベントの型> を実装して、イベントから状態を復元する処理を実装します。

今回は javaslang モジュールの MatchCase を使ってパターンマッチで処理しています。

src/main/java/sample/InventoryProjection.java
package sample;

import org.elder.sourcerer.AggregateProjection;
import org.jetbrains.annotations.NotNull;
import sample.events.InventoryItemCreated;
import sample.events.InventoryEvent;
import sample.events.InventoryItemRenamed;
import sample.events.ItemsCheckedInToInventory;

import static javaslang.API.*;
import static javaslang.Predicates.*;

public class InventoryProjection implements AggregateProjection<InventoryItem, InventoryEvent>{

    @NotNull
    @Override
    public InventoryItem empty() {
        return new InventoryItem("", "", 0);
    }

    @NotNull
    @Override
    public InventoryItem apply(@NotNull String id,
                               @NotNull InventoryItem state, 
                               @NotNull InventoryEvent event) {

        return Match(event).of(
            // InventoryItemCreated イベントの場合
            Case(instanceOf(InventoryItemCreated.class), ev -> 
                    state.withId(ev.getId())),
            // InventoryItemRenamed イベントの場合
            Case(instanceOf(InventoryItemRenamed.class), ev -> 
                    state.withName(ev.getName())),
            // ItemsCheckedInToInventory イベントの場合
            Case(instanceOf(ItemsCheckedInToInventory.class), ev ->
                    state.withCount(state.getCount() + ev.getCount())),
            // その他
            Case($(), state)
        );
    }
}

在庫の状態 InventoryItem は以下のように実装しました。

src/main/java/sample/InventoryItem.java
package sample;

import lombok.Value;

@Value
public class InventoryItem {
    private String id;
    private String name;
    private int count;

    public InventoryItem withId(String newId) {
        return new InventoryItem(newId, name, count);
    }

    public InventoryItem withName(String newName) {
        return new InventoryItem(id, newName, count);
    }

    public InventoryItem withCount(int newCount) {
        return new InventoryItem(id, name, newCount);
    }
}

実行クラスの作成

sourcerer-esjc-spring モジュールを使う方が楽だと思われますが、今回は AggregateRepositoryCommandFactory の組み立て処理を自前で実装してみました。

  • (1) EventStore クライアント作成
  • (2) EventRepositoryFactory 作成
  • (3) EventRepository 作成
  • (4) AggregateRepository 作成
  • (5) CommandFactory 作成

CommandFactory へコマンドハンドラのメソッドをそれぞれ指定して Command を作成します。(コマンドハンドラメソッドのシグネチャに対応する Operations のメソッドを使います)

Command へ aggregateId を設定し arguments へコマンドの内容を設定した後、run を実行するとコマンドが適用されます。

src/main/java/SampleApp.java
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.msemys.esjc.EventStoreBuilder;

import lombok.val;

import org.elder.sourcerer.*;
import org.elder.sourcerer.esjc.EventStoreEsjcEventRepositoryFactory;

import sample.*;
import sample.commands.CheckInItemsToInventory;
import sample.commands.CreateInventoryItem;
import sample.events.InventoryEvent;

import java.util.UUID;

public class SampleApp {
    public static void main(String... args) throws JsonProcessingException {
        // (1) EventStore クライアント作成
        val eventStore = EventStoreBuilder.newBuilder()
                // ポート番号に 1113 を使う点に注意 (2113 ではない)
                .singleNodeAddress("localhost", 1113) 
                .build();

        val mapper = new ObjectMapper();
        // (2) EventRepositoryFactory 作成
        val eventRepositoryFactory = new EventStoreEsjcEventRepositoryFactory(
                eventStore, 
                mapper, 
                "sample"
        );
        // (3) EventRepository 作成
        val eventRepository = eventRepositoryFactory.getEventRepository(InventoryEvent.class);
        // (4) AggregateRepository 作成
        val aggregateRepository = new DefaultAggregateRepository<>(
                eventRepository, 
                new InventoryProjection()
        );
        // (5) CommandFactory 作成
        val commandFactory = new DefaultCommandFactory<>(aggregateRepository);

        // 在庫作成処理
        val createCommand = commandFactory.fromOperation(
                Operations.constructorOf(InventoryOperations::create));
        // 在庫数の追加処理
        val checkInCommand = commandFactory.fromOperation(
                Operations.updateOf(InventoryOperations::checkIn));

        val id = UUID.randomUUID().toString();
        // 在庫作成の実行
        val r1 = createCommand.setAggregateId(id)
                .setArguments(new CreateInventoryItem(id, "sample"))
                .run();

        printCommandResult(r1);
        // 在庫数の追加の実行(1回目)
        val r2 = checkInCommand.setAggregateId(id)
                .setArguments(new CheckInItemsToInventory(5))
                .run();

        printCommandResult(r2);

        // 在庫数の追加の実行(2回目)
        val r3 = checkInCommand.setAggregateId(id)
                .setArguments(new CheckInItemsToInventory(3))
                .run();

        printCommandResult(r3);

        // 集約の内容を取得
        val aggregate = aggregateRepository.load(id);
        // 状態を出力
        System.out.println("*** result state: " + aggregate.state());

        eventStore.disconnect();
    }

    private static void printCommandResult(CommandResult<? extends InventoryEvent> result) {
        System.out.printf(
            "*** command result: events=%s, newVersion=%s\n", 
            result.getEvents(), 
            result.getNewVersion()
        );
    }
}

実行

https://geteventstore.com/ から Event Store の実行環境をダウンロードして、起動しておきます。

Event Store 起動
> EventStore.ClusterNode.exe --db ./db --log ./logs

・・・
[08912,14,15:31:00.488] Starting Normal TCP listening on TCP endpoint: 127.0.0.1:1113.
・・・
[08912,09,15:31:07.100] Created stats stream '$stats-127.0.0.1:2113', code = Success
サンプルアプリケーションの実行
> gradle -q run

・・・
*** command result: events=[InventoryItemCreated(id=4184fd14-7725-4d09-8e24-59e9a94859a1), InventoryItemRenamed(name=sample)], newVersion=1
・・・
*** command result: events=[ItemsCheckedInToInventory(count=5)], newVersion=2
・・・
*** command result: events=[ItemsCheckedInToInventory(count=3)], newVersion=3
・・・
*** result state: InventoryItem(id=4184fd14-7725-4d09-8e24-59e9a94859a1, name=sample, count=8)
・・・

Event Store への登録内容を確認するため、http://127.0.0.1:2113/ へアクセスして 「Stream Browser」 で sample:・・・・ の内容を見てみました。

f:id:fits:20170110235412p:plain

備考

今回のサンプルも処理が終わった後(disconnect 実行後)にプロセスが終了しません。

原因を詳しく調べていませんが、Event Store クライアントライブラリの esjc 1.6.0 を単体で試した際に、以下が原因でプロセスが終了しないようだったので、esjc が原因かもしれません。

  • executor 未指定の際に Settings 内で作る ThreadPoolExecutor の shutdown を実行しない
  • EventStoreTcp 内の NioEventLoopGroup の shutdownGracefully を実行しない