Axon Framework でイベントソーシング
Axon Fraework のイベントソーシング機能を軽く試してみました。
今回のソースは http://github.com/fits/try_samples/tree/master/blog/20160815/
はじめに
今回は以下のような Gradle 用ビルド定義を使います。
lombok は必須ではありませんが、便利なので使っています。
compileOnly
でコンパイル時にのみ使用するモジュールを指定できます。
build.gradle
apply plugin: 'application' repositories { jcenter() } dependencies { compileOnly 'org.projectlombok:lombok:1.16.10' compile 'org.axonframework:axon-core:3.0-M3' runtime 'org.slf4j:slf4j-simple:1.7.21' } mainClassName = 'SampleApp'
Axon Framework によるイベントソーシング
DDD や CQRS におけるイベントソーシングでは、永続化したイベント群を順次適用して集約の状態を復元します。
Axon Framework では以下のように処理すれば、イベントソーシングを実現できるようです。
- (a) CommandHandler でコマンドからイベントを生成し適用
- (b) EventSourcingHandler でイベントの内容からモデルの状態を変更
なお、これらのハンドラはアノテーションで指定できるようになっています。
コマンドの作成
まずは、在庫作成のコマンドを実装します。
処理対象の識別子を設定するフィールドに @TargetAggregateIdentifier
を付けますが、CreateInventoryItem には特に付けなくても問題無さそうでした。(次の CheckInItemsToInventory では必須)
なお、lombok.Value を使っているため、コンストラクタや getter メソッドが自動的に生成されます。
src/main/java/sample/commands/CreateInventoryItem.java
package sample.commands; import org.axonframework.commandhandling.TargetAggregateIdentifier; import lombok.Value; // 在庫作成コマンド @Value public class CreateInventoryItem { @TargetAggregateIdentifier // このアノテーションは必須では無さそう private String id; private String name; }
次に、在庫数を加えるためのコマンドです。
src/main/java/sample/commands/CheckInItemsToInventory.java
package sample.commands; import org.axonframework.commandhandling.TargetAggregateIdentifier; import lombok.Value; // 在庫数追加コマンド @Value public class CheckInItemsToInventory { @TargetAggregateIdentifier // このアノテーションは必須 private String id; private int count; }
イベントの作成
次は、コマンドによって生じるイベントを作成します。
今回は、CreateInventoryItem コマンドから 2つのイベント (InventoryItemCreated と InventoryItemRenamed) が生じるような仕様で考えてみました。
src/main/java/sample/events/InventoryItemCreated.java
package sample.events; import lombok.Value; // 在庫作成イベント @Value public class InventoryItemCreated { private String id; }
src/main/java/sample/events/InventoryItemRenamed.java
package sample.events; import lombok.Value; // 名前変更イベント @Value public class InventoryItemRenamed { private String newName; }
src/main/java/sample/events/ItemsCheckedInToInventory.java
package sample.events; import lombok.Value; // 在庫数追加イベント @Value public class ItemsCheckedInToInventory { private int count; }
エンティティの作成
在庫エンティティを作成します。
一意の識別子を設定するフィールドへ @AggregateIdentifier
を付与します。
コマンドを処理するコンストラクタ ※ やメソッドへ @CommandHandler
を付与し、その処理内でイベントを作成して AggregateLifecycle.apply
メソッドへ渡せば、@EventSourcingHandler
を付与した該当メソッドが呼び出されます。
イベントの内容に合わせてエンティティの内部状態を更新する事で、イベントソーシングを実現できます。
※ 新規作成のコマンドを処理する場合に、コンストラクタを使います
なお、引数なしのデフォルトコンストラクタは必須なようです。
src/main/java/sample/models/InventoryItem.java
package sample.models; import org.axonframework.commandhandling.CommandHandler; import org.axonframework.commandhandling.model.AggregateIdentifier; import org.axonframework.commandhandling.model.AggregateLifecycle; import org.axonframework.commandhandling.model.ApplyMore; import org.axonframework.eventsourcing.EventSourcingHandler; import lombok.Getter; import lombok.ToString; import sample.commands.CreateInventoryItem; import sample.commands.CheckInItemsToInventory; import sample.events.InventoryItemCreated; import sample.events.InventoryItemRenamed; import sample.events.ItemsCheckedInToInventory; @ToString public class InventoryItem { @AggregateIdentifier @Getter private String id; @Getter private String name; @Getter private int count; // デフォルトコンストラクタは必須 public InventoryItem() { } // 在庫作成コマンド処理 @CommandHandler public InventoryItem(CreateInventoryItem cmd) { System.out.println("C call new: " + cmd); // 在庫作成イベントの作成と適用 AggregateLifecycle.apply(new InventoryItemCreated(cmd.getId())); // 名前変更イベントの作成と適用 AggregateLifecycle.apply(new InventoryItemRenamed(cmd.getName())); } // 在庫数追加コマンド処理 @CommandHandler private ApplyMore updateCount(CheckInItemsToInventory cmd) { System.out.println("C call updateCount: " + cmd); // 在庫数追加イベントの作成と適用 return AggregateLifecycle.apply(new ItemsCheckedInToInventory(cmd.getCount())); } // 在庫作成イベントの適用処理 @EventSourcingHandler private void applyCreated(InventoryItemCreated event) { System.out.println("E call applyCreated: " + event); this.id = event.getId(); } // 名前変更イベントの適用処理 @EventSourcingHandler private void applyRenamed(InventoryItemRenamed event) { System.out.println("E call applyRenamed: " + event); this.name = event.getNewName(); } // 在庫数追加イベントの適用処理 @EventSourcingHandler private void applyCheckedIn(ItemsCheckedInToInventory event) { System.out.println("E call applyCheckedIn: " + event); this.count += event.getCount(); } }
実行クラスの作成
動作確認のための実行クラスを作成します。
CommandGateway へコマンドを send すれば処理が流れるように CommandBus・Repository・EventStore 等を組み合わせます。
イベントソーシングには EventSourcingRepository
を使用します。
アノテーションを使ったコマンドハンドラを適用するには AggregateAnnotationCommandHandler
を使用します。
また、今回はインメモリでイベントを保持する InMemoryEventStorageEngine
を使っています。
src/main/java/SampleApp.java
import org.axonframework.commandhandling.AggregateAnnotationCommandHandler; import org.axonframework.commandhandling.SimpleCommandBus; import org.axonframework.commandhandling.gateway.DefaultCommandGateway; import org.axonframework.eventsourcing.EventSourcedAggregate; import org.axonframework.eventsourcing.EventSourcingRepository; import org.axonframework.eventsourcing.eventstore.EmbeddedEventStore; import org.axonframework.eventsourcing.eventstore.inmemory.InMemoryEventStorageEngine; import lombok.val; import sample.commands.CreateInventoryItem; import sample.commands.CheckInItemsToInventory; import sample.models.InventoryItem; public class SampleApp { public static void main(String... args) { val cmdBus = new SimpleCommandBus(); val gateway = new DefaultCommandGateway(cmdBus); val es = new EmbeddedEventStore(new InMemoryEventStorageEngine()); // イベントソーシング用の Repository val repository = new EventSourcingRepository<>(InventoryItem.class, es); // アノテーションによるコマンドハンドラを適用 new AggregateAnnotationCommandHandler<>(InventoryItem.class, repository).subscribe(cmdBus); String r1 = gateway.sendAndWait(new CreateInventoryItem("s1", "sample1")); System.out.println("id: " + r1); System.out.println("----------"); EventSourcedAggregate<InventoryItem> r2 = gateway.sendAndWait(new CheckInItemsToInventory("s1", 5)); printAggregate(r2); System.out.println("----------"); EventSourcedAggregate<InventoryItem> r3 = gateway.sendAndWait(new CheckInItemsToInventory("s1", 3)); printAggregate(r3); } private static void printAggregate(EventSourcedAggregate<InventoryItem> esag) { System.out.println(esag.getAggregateRoot()); } }
なお、CreateInventoryItem を send した後に、同じ ID (今回は "s1")を使って再度 CreateInventoryItem を send してみたところ、特に重複チェックなどが実施されるわけではなく、普通にイベント (InventoryItemCreated と InventoryItemRenamed) が追加されました。
実行
実行結果は以下の通りです。
コマンドを送信する度に、これまでのイベントを適用してエンティティの状態を復元した後、新しいコマンドを処理している動作を確認できました。
> gradle run ・・・ :run C call new: CreateInventoryItem(id=s1, name=sample1) E call applyCreated: InventoryItemCreated(id=s1) E call applyRenamed: InventoryItemRenamed(newName=sample1) id: s1 ---------- E call applyCreated: InventoryItemCreated(id=s1) E call applyRenamed: InventoryItemRenamed(newName=sample1) C call updateCount: CheckInItemsToInventory(id=s1, count=5) E call applyCheckedIn: ItemsCheckedInToInventory(count=5) InventoryItem(id=s1, name=sample1, count=5) ---------- E call applyCreated: InventoryItemCreated(id=s1) E call applyRenamed: InventoryItemRenamed(newName=sample1) E call applyCheckedIn: ItemsCheckedInToInventory(count=5) C call updateCount: CheckInItemsToInventory(id=s1, count=3) E call applyCheckedIn: ItemsCheckedInToInventory(count=3) InventoryItem(id=s1, name=sample1, count=8)
備考 - スナップショット
イベント数が多くなると、イベントを毎回初めから適用して状態を復元するのはパフォーマンス的に厳しくなるため、スナップショットを使います。
Axon Framework にもスナップショットの機能が用意されており、EventCountSnapshotterTrigger
等を EventSourcingRepository へ設定すれば使えるようです。
ただし、今回のサンプル (SampleApp.java) では EventCountSnapshotterTrigger を機能させられませんでした。
というのも、EventCountSnapshotterTrigger では decorateForAppend
メソッドの実行時にスナップショット化を行うようですが ※、今回のサンプルでは decorateForAppend は一度も実行せず decorateForRead
メソッドのみが実行されるようでした。
※ ソースをざっと見た限りでは、カウンターのカウントアップ等も decorateForAppend の中でのみ実施しているようだった