Sourcerer でイベントソーシング
「Axon Framework でイベントソーシング」 や 「es4j でイベントソーシング」 と同様の処理を Sourcerer で実装してみました。
今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170110/
はじめに
以下のような Gradle ビルド定義ファイルを使います。
Sourcerer はイベントの保存先として Event Store を使用するため、今回は Event Store クライアントライブラリに esjc
を使う sourcerer-esjc
モジュールを使用します。
build.gradle
apply plugin: 'application' mainClassName = 'SampleApp' repositories { jcenter() } dependencies { compileOnly 'org.projectlombok:lombok:1.16.12' compile 'org.elder.sourcerer:sourcerer-esjc:v4.0.1' compile 'com.fasterxml.jackson.core:jackson-databind:2.8.5' compile 'io.javaslang:javaslang:2.1.0-alpha' }
lombok
と javaslang
は必須ではありません。(javaslang はパターンマッチ処理に使いました)
Sourcerer によるイベントソーシング
コマンドの作成
コマンドは特に注意するところはなく、普通の Java クラスとして実装するだけです。
ここで定義したコマンドは、CommandFactory で作成した Command の引数 (Arguments) として使う事になります。
在庫作成コマンド src/main/java/sample/commands/CreateInventoryItem.java
package sample.commands; import lombok.Value; // 在庫作成コマンド @Value public class CreateInventoryItem { private String id; private String name; }
在庫数追加コマンド src/main/java/sample/commands/CheckInItemsToInventory.java
package sample.commands; import lombok.Value; // 在庫数追加コマンド @Value public class CheckInItemsToInventory { private int count; }
イベントの作成
イベントの型へ @EventType
を付与します。
AggregateProjection
等で指定するイベントの型は 1つなので、まずはベースとなるインターフェースを定義します。
イベントの内容は JSON 化して Event Store へ保存する事になりますが、デフォルトでは JSON へ型情報が付与されず、複数のイベント型を使うと JSON からイベントオブジェクトを復元する際に不都合が生じます。
これを回避するために @JsonTypeInfo
と @JsonSubTypes
を使って JSON へ型情報を残すように設定しています ※。
※ @JsonTypeInfo の use へ NAME を指定した場合は "@type":"クラス名" の情報を JSON へ出力します
イベントのベースインターフェース src/main/java/sample/events/InventoryEvent.java
package sample.events; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonSubTypes.Type; import com.fasterxml.jackson.annotation.JsonTypeInfo; import org.elder.sourcerer.EventType; @EventType // JSON へ型情報を残すための設定 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME) @JsonSubTypes({ @Type(InventoryItemCreated.class), @Type(InventoryItemRenamed.class), @Type(ItemsCheckedInToInventory.class) }) public interface InventoryEvent { }
在庫作成イベント src/main/java/sample/events/InventoryItemCreated.java
package sample.events; import lombok.Value; // 在庫作成イベント @Value public class InventoryItemCreated implements InventoryEvent { private String id; }
在庫名の変更イベント src/main/java/sample/events/InventoryItemRenamed.java
package sample.events; import lombok.Value; // 在庫名の変更イベント @Value public class InventoryItemRenamed implements InventoryEvent { private String name; }
在庫数の追加イベント src/main/java/sample/events/ItemsCheckedInToInventory.java
package sample.events; import lombok.Value; // 在庫数の追加イベント @Value public class ItemsCheckedInToInventory implements InventoryEvent { private int count; }
コマンドハンドラの作成
コマンドを処理してイベントを生成するメソッドを定義します。
引数や戻り値の型は CommandFactory
の fromOperation
メソッドへ渡す値 (Operations
の各種メソッドで生成) に応じて調整します。
単一のコマンドから複数のイベントを生成する場合は戻り値を List<イベント型>
とするだけのようです。
src/main/java/sample/InventoryOperations.java
package sample; import com.google.common.collect.ImmutableList; import sample.commands.CheckInItemsToInventory; import sample.commands.CreateInventoryItem; import sample.events.InventoryEvent; import sample.events.InventoryItemCreated; import sample.events.InventoryItemRenamed; import sample.events.ItemsCheckedInToInventory; import java.util.List; public class InventoryOperations { // 作成コマンドの処理 public static List<InventoryEvent> create(CreateInventoryItem cmd) { return ImmutableList.of( new InventoryItemCreated(cmd.getId()), new InventoryItemRenamed(cmd.getName()) ); } // 在庫数追加コマンドの処理 public static ItemsCheckedInToInventory checkIn(InventoryItem state, CheckInItemsToInventory cmd) { return new ItemsCheckedInToInventory(cmd.getCount()); } }
AggregateProjection の作成
AggregateProjection<状態の型, イベントの型>
を実装して、イベントから状態を復元する処理を実装します。
今回は javaslang モジュールの Match
と Case
を使ってパターンマッチで処理しています。
src/main/java/sample/InventoryProjection.java
package sample; import org.elder.sourcerer.AggregateProjection; import org.jetbrains.annotations.NotNull; import sample.events.InventoryItemCreated; import sample.events.InventoryEvent; import sample.events.InventoryItemRenamed; import sample.events.ItemsCheckedInToInventory; import static javaslang.API.*; import static javaslang.Predicates.*; public class InventoryProjection implements AggregateProjection<InventoryItem, InventoryEvent>{ @NotNull @Override public InventoryItem empty() { return new InventoryItem("", "", 0); } @NotNull @Override public InventoryItem apply(@NotNull String id, @NotNull InventoryItem state, @NotNull InventoryEvent event) { return Match(event).of( // InventoryItemCreated イベントの場合 Case(instanceOf(InventoryItemCreated.class), ev -> state.withId(ev.getId())), // InventoryItemRenamed イベントの場合 Case(instanceOf(InventoryItemRenamed.class), ev -> state.withName(ev.getName())), // ItemsCheckedInToInventory イベントの場合 Case(instanceOf(ItemsCheckedInToInventory.class), ev -> state.withCount(state.getCount() + ev.getCount())), // その他 Case($(), state) ); } }
在庫の状態 InventoryItem
は以下のように実装しました。
src/main/java/sample/InventoryItem.java
package sample; import lombok.Value; @Value public class InventoryItem { private String id; private String name; private int count; public InventoryItem withId(String newId) { return new InventoryItem(newId, name, count); } public InventoryItem withName(String newName) { return new InventoryItem(id, newName, count); } public InventoryItem withCount(int newCount) { return new InventoryItem(id, name, newCount); } }
実行クラスの作成
sourcerer-esjc-spring モジュールを使う方が楽だと思われますが、今回は AggregateRepository
と CommandFactory
の組み立て処理を自前で実装してみました。
- (1) EventStore クライアント作成
- (2) EventRepositoryFactory 作成
- (3) EventRepository 作成
- (4) AggregateRepository 作成
- (5) CommandFactory 作成
CommandFactory へコマンドハンドラのメソッドをそれぞれ指定して Command
を作成します。(コマンドハンドラメソッドのシグネチャに対応する Operations
のメソッドを使います)
Command へ aggregateId を設定し arguments へコマンドの内容を設定した後、run
を実行するとコマンドが適用されます。
src/main/java/SampleApp.java
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.github.msemys.esjc.EventStoreBuilder; import lombok.val; import org.elder.sourcerer.*; import org.elder.sourcerer.esjc.EventStoreEsjcEventRepositoryFactory; import sample.*; import sample.commands.CheckInItemsToInventory; import sample.commands.CreateInventoryItem; import sample.events.InventoryEvent; import java.util.UUID; public class SampleApp { public static void main(String... args) throws JsonProcessingException { // (1) EventStore クライアント作成 val eventStore = EventStoreBuilder.newBuilder() // ポート番号に 1113 を使う点に注意 (2113 ではない) .singleNodeAddress("localhost", 1113) .build(); val mapper = new ObjectMapper(); // (2) EventRepositoryFactory 作成 val eventRepositoryFactory = new EventStoreEsjcEventRepositoryFactory( eventStore, mapper, "sample" ); // (3) EventRepository 作成 val eventRepository = eventRepositoryFactory.getEventRepository(InventoryEvent.class); // (4) AggregateRepository 作成 val aggregateRepository = new DefaultAggregateRepository<>( eventRepository, new InventoryProjection() ); // (5) CommandFactory 作成 val commandFactory = new DefaultCommandFactory<>(aggregateRepository); // 在庫作成処理 val createCommand = commandFactory.fromOperation( Operations.constructorOf(InventoryOperations::create)); // 在庫数の追加処理 val checkInCommand = commandFactory.fromOperation( Operations.updateOf(InventoryOperations::checkIn)); val id = UUID.randomUUID().toString(); // 在庫作成の実行 val r1 = createCommand.setAggregateId(id) .setArguments(new CreateInventoryItem(id, "sample")) .run(); printCommandResult(r1); // 在庫数の追加の実行(1回目) val r2 = checkInCommand.setAggregateId(id) .setArguments(new CheckInItemsToInventory(5)) .run(); printCommandResult(r2); // 在庫数の追加の実行(2回目) val r3 = checkInCommand.setAggregateId(id) .setArguments(new CheckInItemsToInventory(3)) .run(); printCommandResult(r3); // 集約の内容を取得 val aggregate = aggregateRepository.load(id); // 状態を出力 System.out.println("*** result state: " + aggregate.state()); eventStore.disconnect(); } private static void printCommandResult(CommandResult<? extends InventoryEvent> result) { System.out.printf( "*** command result: events=%s, newVersion=%s\n", result.getEvents(), result.getNewVersion() ); } }
実行
https://geteventstore.com/ から Event Store の実行環境をダウンロードして、起動しておきます。
Event Store 起動
> EventStore.ClusterNode.exe --db ./db --log ./logs ・・・ [08912,14,15:31:00.488] Starting Normal TCP listening on TCP endpoint: 127.0.0.1:1113. ・・・ [08912,09,15:31:07.100] Created stats stream '$stats-127.0.0.1:2113', code = Success
サンプルアプリケーションの実行
> gradle -q run ・・・ *** command result: events=[InventoryItemCreated(id=4184fd14-7725-4d09-8e24-59e9a94859a1), InventoryItemRenamed(name=sample)], newVersion=1 ・・・ *** command result: events=[ItemsCheckedInToInventory(count=5)], newVersion=2 ・・・ *** command result: events=[ItemsCheckedInToInventory(count=3)], newVersion=3 ・・・ *** result state: InventoryItem(id=4184fd14-7725-4d09-8e24-59e9a94859a1, name=sample, count=8) ・・・
Event Store への登録内容を確認するため、http://127.0.0.1:2113/ へアクセスして 「Stream Browser」 で sample:・・・・
の内容を見てみました。
備考
今回のサンプルも処理が終わった後(disconnect 実行後)にプロセスが終了しません。
原因を詳しく調べていませんが、Event Store クライアントライブラリの esjc 1.6.0 を単体で試した際に、以下が原因でプロセスが終了しないようだったので、esjc が原因かもしれません。
- executor 未指定の際に Settings 内で作る ThreadPoolExecutor の shutdown を実行しない
- EventStoreTcp 内の NioEventLoopGroup の shutdownGracefully を実行しない