es4j でイベントソーシング
「Axon Framework でイベントソーシング」 と同様の処理を es4j (Eventsourcing for Java) で実装してみました。
今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170107/
はじめに
今回はイベントをインメモリで保持する eventsourcing-inmem
を使うため、以下のような Gradle 用ビルド定義を用意しました。 (lombok は便利なので使います)
build.gradle
apply plugin: 'application' mainClassName = 'SampleApp' repositories { jcenter() } dependencies { compileOnly "org.projectlombok:lombok:1.16.12" compile 'com.eventsourcing:eventsourcing-inmem:0.4.5' }
es4j によるイベントソーシング
コマンドの作成
コマンドは StandardCommand<S, R>
を継承して作成し、S に状態の型を R に結果の型を指定します。
コマンドを適用することで生じるイベントは events
メソッドの戻り値で返し、コマンド適用結果は result
メソッドの戻り値で返します。
状態は events
メソッドで用意し、result
メソッドの引数として渡ってきます。
まずは、在庫作成のコマンドを実装してみます。
状態の型に InventoryItemCreated
を、結果の型に InventoryItem
を指定しています。
状態を伴う EventStream
を返すために EventStream.ofWithState
を使用します。
コンストラクタの引数名からプロパティを認識するようなので、-parameters
オプションを使ってビルドするか、引数へ @PropertyName
を付与します。(今回は @PropertyName を使用)
なお、状態の型は InventoryItemCreated としなければならないわけでもなく、UUID
を代わりに使っても動作に支障はなさそうでした。
在庫作成コマンド src/main/java/sample/commands/CreateInventoryItem.java
package sample.commands; import com.eventsourcing.EventStream; import com.eventsourcing.Repository; import com.eventsourcing.StandardCommand; import com.eventsourcing.layout.PropertyName; import lombok.EqualsAndHashCode; import lombok.Value; import lombok.val; import sample.domain.InventoryItem; import sample.events.InventoryItemCreated; import sample.events.InventoryItemRenamed; // 在庫作成コマンド @Value @EqualsAndHashCode(callSuper = false) public class CreateInventoryItem extends StandardCommand<InventoryItemCreated, InventoryItem> { private String name; public CreateInventoryItem(@PropertyName("name") String name) { this.name = name; } // イベントの生成 @Override public EventStream<InventoryItemCreated> events() throws Exception { // 在庫作成イベント val created = new InventoryItemCreated(); // 名前変更イベント val renamed = new InventoryItemRenamed(created.uuid(), name); return EventStream.ofWithState(created, created, renamed); } // 結果の生成 @Override public InventoryItem result(InventoryItemCreated state, Repository repository) { return InventoryItem.lookup(repository, state.uuid()).get(); } }
次に、在庫数を加えるためのコマンドです。
ドメインモデルの内部状態を更新するだけなので、状態と結果の型を Void
としました。
状態を伴わないので EventStream.of
で EventStream を返します。
コマンドの適用結果が Void なので result メソッドをオーバーライドする必要もありません。
在庫数追加コマンド src/main/java/sample/commands/CheckInItemsToInventory.java
package sample.commands; import com.eventsourcing.EventStream; import com.eventsourcing.StandardCommand; import com.eventsourcing.layout.PropertyName; import lombok.EqualsAndHashCode; import lombok.Value; import java.util.UUID; import sample.events.ItemsCheckedInToInventory; // 在庫数追加コマンド @Value @EqualsAndHashCode(callSuper = false) public class CheckInItemsToInventory extends StandardCommand<Void, Void> { private UUID reference; private int count; public CheckInItemsToInventory( @PropertyName("reference") UUID reference, @PropertyName("count") int count) { this.reference = reference; this.count = count; } @Override public EventStream<Void> events() throws Exception { return EventStream.of(new ItemsCheckedInToInventory(reference, count)); } }
イベントの作成
イベントは StandardEvent
を継承して作成します。
イベントを検索するための条件を SimpleIndex
を使って定義します。
以下では、uuid の値で InventoryItemCreated を取得できるように、SimpleIndex<InventoryItemCreated, UUID> を定義しています。
在庫作成イベント src/main/java/sample/events/InventoryItemCreated.java
package sample.events; import com.eventsourcing.StandardEvent; import com.eventsourcing.index.SimpleIndex; import java.util.UUID; // 在庫作成イベント public class InventoryItemCreated extends StandardEvent { public static SimpleIndex<InventoryItemCreated, UUID> ID = SimpleIndex.as(InventoryItemCreated::uuid); }
在庫名の更新イベントの場合、最新のものを取得すれば最終的な名称が決定するので、timestamp で比較する SimpleIndex<InventoryItemRenamed, HybridTimestamp>
を定義しています。
なお、名前変更のためのイベントは eventsourcing-cep モジュールに予め用意されていますが(NameChanged
)、今回は自前で実装しています。
在庫名の変更イベント src/main/java/sample/events/InventoryItemRenamed.java
package sample.events; import com.eventsourcing.StandardEvent; import com.eventsourcing.hlc.HybridTimestamp; import com.eventsourcing.index.Index; import com.eventsourcing.index.SimpleIndex; import com.eventsourcing.layout.PropertyName; import lombok.EqualsAndHashCode; import lombok.Value; import java.util.UUID; import static com.eventsourcing.index.IndexEngine.IndexFeature.*; // 在庫名の変更イベント @Value @EqualsAndHashCode(callSuper = false) public class InventoryItemRenamed extends StandardEvent { private UUID reference; private String newName; public InventoryItemRenamed(@PropertyName("reference") UUID reference, @PropertyName("newName") String newName) { this.reference = reference; this.newName = newName; } public static SimpleIndex<InventoryItemRenamed, UUID> ID = SimpleIndex.as(InventoryItemRenamed::uuid); public static SimpleIndex<InventoryItemRenamed, UUID> REFERENCE_ID = SimpleIndex.as(InventoryItemRenamed::getReference); @Index({EQ, LT, GT}) public static SimpleIndex<InventoryItemRenamed, HybridTimestamp> TIMESTAMP = SimpleIndex.as(InventoryItemRenamed::timestamp); }
在庫数の追加イベント src/main/java/sample/events/ItemsCheckedInToInventory.java
package sample.events; import com.eventsourcing.StandardEvent; import com.eventsourcing.index.SimpleIndex; import com.eventsourcing.layout.PropertyName; import lombok.EqualsAndHashCode; import lombok.Value; import java.util.UUID; // 在庫数の追加イベント @Value @EqualsAndHashCode(callSuper = false) public class ItemsCheckedInToInventory extends StandardEvent { private UUID reference; private int count; public ItemsCheckedInToInventory( @PropertyName("reference") UUID reference, @PropertyName("count") int count) { this.reference = reference; this.count = count; } public static SimpleIndex<ItemsCheckedInToInventory, UUID> ID = SimpleIndex.as(ItemsCheckedInToInventory::uuid); public static SimpleIndex<ItemsCheckedInToInventory, UUID> REFERENCE_ID = SimpleIndex.as(ItemsCheckedInToInventory::getReference); }
ドメインプロトコルの作成
ドメインプロトコルという機能を使えば、共通的なプロパティ(例えば name など)をドメインモデルから独立して定義できます。(ドメインモデルへ implements して使う)
Protocol
を extends したインターフェースとして定義し、プロパティの値を(イベントの内容から)確定する処理をデフォルトメソッドとして定義します。
在庫名は一番最後の InventoryItemRenamed の newName で確定するため、latestAssociatedEntity
を使って最後の InventoryItemRenamed イベントを取得します。
なお、名前に関するドメインプロトコルも eventsourcing-cep モジュールに予め用意されています(NameProtocol
)。
在庫名 src/main/java/sample/protocols/InventoryItemNameProtocol.java
package sample.protocols; import com.eventsourcing.Protocol; import com.eventsourcing.queries.ModelQueries; import sample.events.InventoryItemRenamed; // 在庫名 public interface InventoryItemNameProtocol extends Protocol, ModelQueries { default String name() { // reference が id の値に等しい最後の InventoryItemRenamed を取得し // newName の値を返す return latestAssociatedEntity( InventoryItemRenamed.class, InventoryItemRenamed.REFERENCE_ID, InventoryItemRenamed.TIMESTAMP ).map(InventoryItemRenamed::getNewName).orElse(""); } }
次に、在庫数の場合は、ItemsCheckedInToInventory を集計するように Repository
の query
メソッドで該当する全ての ItemsCheckedInToInventory を取得し、count の合計値を算出しています。
在庫数 src/main/java/sample/protocols/InventoryItemCountProtocol.java
package sample.protocols; import com.eventsourcing.EntityHandle; import com.eventsourcing.Protocol; import lombok.val; import java.util.stream.StreamSupport; import sample.events.ItemsCheckedInToInventory; import static com.eventsourcing.index.EntityQueryFactory.equal; // 在庫数 public interface InventoryItemCountProtocol extends Protocol { default int count() { // reference が id の値に等しい ItemsCheckedInToInventory を全て取得 val res = getRepository().query( ItemsCheckedInToInventory.class, equal(ItemsCheckedInToInventory.REFERENCE_ID, getId()) ); return StreamSupport.stream(res.spliterator(), false) .map(EntityHandle::get) .mapToInt(ItemsCheckedInToInventory::getCount) .sum(); } }
ドメインモデルの作成
ドメインモデルは Model
と必要なドメインプロトコルを implements して作成します。
同一インスタンスの判定に id の値だけを使うように equals と hashCode をオーバーライドします。(今回は @EqualsAndHashCode(of = "id")
で実施)
CreateInventoryItem コマンドの result メソッドで使った lookup メソッドの実装も行います。
src/main/java/sample/domain/InventoryItem.java
package sample.domain; import com.eventsourcing.Model; import com.eventsourcing.Repository; import com.eventsourcing.queries.ModelQueries; import lombok.EqualsAndHashCode; import lombok.Value; import java.util.Optional; import java.util.UUID; import sample.events.InventoryItemCreated; import sample.protocols.InventoryItemCountProtocol; import sample.protocols.InventoryItemNameProtocol; @Value @EqualsAndHashCode(of = "id") public class InventoryItem implements Model, InventoryItemNameProtocol, InventoryItemCountProtocol { private final Repository repository; private final UUID id; protected InventoryItem(Repository repository, UUID id) { this.repository = repository; this.id = id; } public static Optional<InventoryItem> lookup(Repository repository, UUID id) { // InventoryItemCreated.ID インデックス(SimpleIndex)を使って // id に合致する InventoryItemCreated を取得 Optional<InventoryItemCreated> res = ModelQueries.lookup( repository, InventoryItemCreated.class, InventoryItemCreated.ID, id ); return res.map(ev -> new InventoryItem(repository, id)); } }
実行クラスの作成
es4j では Repository
を使ってイベントソーシングの処理を行います。
Repository は StandardRepository
へ以下のような設定を行う事で構築できます。
- (1) Journal と IndexEngine の設定
- (2) コマンドとイベントの設定(CommandSetProvider と EventSetProvider)
StandardRepository のデフォルト設定では localhost の NTP サーバーを使うようになっているようですが (NTP サーバーが無いとエラーになる)、今回はデフォルト設定の代わりに PhysicalTimeProvider 実装クラスを用意しました。
(2) で PackageXXXSetProvider を使うと指定のパッケージへ属するコマンドやイベントを一括で設定できます。
コマンドを適用する際は publish
メソッドを使います。(publish の戻り値は CompletableFuture
)
src/main/java/SampleApp.java
import com.eventsourcing.*; import com.eventsourcing.hlc.PhysicalTimeProvider; import com.eventsourcing.index.MemoryIndexEngine; import com.eventsourcing.inmem.MemoryJournal; import com.eventsourcing.repository.StandardRepository; import com.google.common.util.concurrent.AbstractService; import java.util.concurrent.*; import lombok.val; import sample.commands.CheckInItemsToInventory; import sample.commands.CreateInventoryItem; import sample.domain.InventoryItem; import sample.events.InventoryItemCreated; public class SampleApp { public static void main(String... args) { // (1) Journal と IndexEngine の設定 val repository = StandardRepository.builder() .journal(new MemoryJournal()) .indexEngine(new MemoryIndexEngine()) .physicalTimeProvider(new SampleTimeProvider()) .build(); // (2) コマンドとイベントの設定(CommandSetProvider と EventSetProvider) repository.addCommandSetProvider( new PackageCommandSetProvider(new Package[] {CreateInventoryItem.class.getPackage()})); repository.addEventSetProvider( new PackageEventSetProvider(new Package[] {InventoryItemCreated.class.getPackage()})); // 開始 repository.startAsync().awaitRunning(); try { // CreateInventoryItem コマンド適用 val d = repository.publish(new CreateInventoryItem("sample1")).get(); dumpInventoryItem(d); // CheckInItemsToInventory コマンド適用 repository.publish(new CheckInItemsToInventory(d.getId(), 5)).get(); dumpInventoryItem(d); // CheckInItemsToInventory コマンド適用 repository.publish(new CheckInItemsToInventory(d.getId(), 3)).get(); dumpInventoryItem(d); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } finally { // 停止 stopRepository(repository); } } private static InventoryItem dumpInventoryItem(InventoryItem item) { System.out.println("----- InventoryItem -----"); System.out.println("id: " + item.getId()); System.out.println("name: " + item.name()); System.out.println("count: " + item.count()); return item; } private static void stopRepository(Repository repository) { System.out.println("stop..."); try { repository.stopAsync().awaitTerminated(10, TimeUnit.SECONDS); } catch (TimeoutException ex) { ex.printStackTrace(); } } // PhysicalTimeProvider の実装 static class SampleTimeProvider extends AbstractService implements PhysicalTimeProvider { @Override public long getPhysicalTime() { return System.currentTimeMillis(); } @Override protected void doStart() { System.out.println("timeprovider start..."); notifyStarted(); } @Override protected void doStop() { System.out.println("timeprovider stop..."); notifyStopped(); } } }
publish の呼び出し箇所は CompletableFuture で処理を繋げれば以下のように実装できます。
src/main/java/SampleApp.java(CompletableFuture 活用版)
public class SampleApp { public static void main(String... args) { ・・・ repository.publish(new CreateInventoryItem("sample1")) .thenApply(SampleApp::dumpInventoryItem) .thenCompose(d -> repository.publish(new CheckInItemsToInventory(d.getId(), 5)) .thenApply(v -> d) ) .thenApply(SampleApp::dumpInventoryItem) .thenCompose(d -> repository.publish(new CheckInItemsToInventory(d.getId(), 3)) .thenApply(v -> d) ) .thenApply(SampleApp::dumpInventoryItem) .whenComplete((d, e) -> stopRepository(repository)); } ・・・ }
実行結果
> gradle -q run [main] INFO org.reflections.Reflections - Reflections took 64 ms to scan 1 urls, producing 2 keys and 4 values [main] INFO org.reflections.Reflections - Reflections took 17 ms to scan 1 urls, producing 2 keys and 6 values timeprovider start... ----- InventoryItem ----- id: d7636355-850d-4a15-9e85-ce0f1de514e7 name: sample1 count: 0 ----- InventoryItem ----- id: d7636355-850d-4a15-9e85-ce0f1de514e7 name: sample1 count: 5 ----- InventoryItem ----- id: d7636355-850d-4a15-9e85-ce0f1de514e7 name: sample1 count: 8 stop... timeprovider stop...
備考
実は、今回のサンプルを実行すると処理が終わってもプロセスは終了しません。
スレッドダンプを出力してみたところ、StandardEntity に原因がありそうです。
スレッドダンプ出力例
> jcmd 8904 Thread.print ・・・ "Thread-1" #11 prio=5 os_prio=0 ・・・ waiting on condition [・・・] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <・・・> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) at java.util.concurrent.LinkedBlockingDeque.putLast(LinkedBlockingDeque.java:396) at java.util.concurrent.LinkedBlockingDeque.put(LinkedBlockingDeque.java:649) at com.eventsourcing.StandardEntity.lambda$static$0(StandardEntity.java:27) at com.eventsourcing.StandardEntity$$Lambda$14/540159270.run(Unknown Source) at java.lang.Thread.run(Thread.java:745) ・・・
StandardEntity のソース(27行目付近)を見てみると、以下のようにスレッドを開始して放置していました。
com.eventsourcing.StandardEntity のソース
・・・ public abstract class StandardEntity<E extends Entity<E>> implements Entity<E> { private static LinkedBlockingDeque<UUID> uuids = new LinkedBlockingDeque<>(10_000); static { new Thread(() -> { while (true) { try { uuids.put(UUID.randomUUID()); // 27行目 } catch (InterruptedException e) { } } }).start(); } ・・・