読者です 読者をやめる 読者になる 読者になる

Axon Framework でイベントソーシング

Axon Fraework のイベントソーシング機能を軽く試してみました。

今回のソースは http://github.com/fits/try_samples/tree/master/blog/20160815/

はじめに

今回は以下のような Gradle 用ビルド定義を使います。

lombok は必須ではありませんが、便利なので使っています。 compileOnlyコンパイル時にのみ使用するモジュールを指定できます。

build.gradle
apply plugin: 'application'

repositories {
    jcenter()
}

dependencies {
    compileOnly 'org.projectlombok:lombok:1.16.10'

    compile 'org.axonframework:axon-core:3.0-M3'

    runtime 'org.slf4j:slf4j-simple:1.7.21'
}

mainClassName = 'SampleApp'

Axon Framework によるイベントソーシング

DDD や CQRS におけるイベントソーシングでは、永続化したイベント群を順次適用して集約の状態を復元します。

Axon Framework では以下のように処理すれば、イベントソーシングを実現できるようです。

  • (a) CommandHandler でコマンドからイベントを生成し適用
  • (b) EventSourcingHandler でイベントの内容からモデルの状態を変更

なお、これらのハンドラはアノテーションで指定できるようになっています。

コマンドの作成

まずは、在庫作成のコマンドを実装します。

処理対象の識別子を設定するフィールドに @TargetAggregateIdentifier を付けますが、CreateInventoryItem には特に付けなくても問題無さそうでした。(次の CheckInItemsToInventory では必須)

なお、lombok.Value を使っているため、コンストラクタや getter メソッドが自動的に生成されます。

src/main/java/sample/commands/CreateInventoryItem.java
package sample.commands;

import org.axonframework.commandhandling.TargetAggregateIdentifier;
import lombok.Value;

// 在庫作成コマンド
@Value
public class CreateInventoryItem {

    @TargetAggregateIdentifier // このアノテーションは必須では無さそう
    private String id;

    private String name;
}

次に、在庫数を加えるためのコマンドです。

src/main/java/sample/commands/CheckInItemsToInventory.java
package sample.commands;

import org.axonframework.commandhandling.TargetAggregateIdentifier;
import lombok.Value;

// 在庫数追加コマンド
@Value
public class CheckInItemsToInventory {

    @TargetAggregateIdentifier // このアノテーションは必須
    private String id;

    private int count;
}

イベントの作成

次は、コマンドによって生じるイベントを作成します。

今回は、CreateInventoryItem コマンドから 2つのイベント (InventoryItemCreated と InventoryItemRenamed) が生じるような仕様で考えてみました。

src/main/java/sample/events/InventoryItemCreated.java
package sample.events;

import lombok.Value;

// 在庫作成イベント
@Value
public class InventoryItemCreated {
    private String id;
}
src/main/java/sample/events/InventoryItemRenamed.java
package sample.events;

import lombok.Value;

// 名前変更イベント
@Value
public class InventoryItemRenamed {
    private String newName;
}
src/main/java/sample/events/ItemsCheckedInToInventory.java
package sample.events;

import lombok.Value;

// 在庫数追加イベント
@Value
public class ItemsCheckedInToInventory {
    private int count;
}

エンティティの作成

在庫エンティティを作成します。

一意の識別子を設定するフィールドへ @AggregateIdentifier を付与します。

コマンドを処理するコンストラクタ ※ やメソッドへ @CommandHandler を付与し、その処理内でイベントを作成して AggregateLifecycle.apply メソッドへ渡せば、@EventSourcingHandler を付与した該当メソッドが呼び出されます。

イベントの内容に合わせてエンティティの内部状態を更新する事で、イベントソーシングを実現できます。

 ※ 新規作成のコマンドを処理する場合に、コンストラクタを使います

なお、引数なしのデフォルトコンストラクタは必須なようです。

src/main/java/sample/models/InventoryItem.java
package sample.models;

import org.axonframework.commandhandling.CommandHandler;
import org.axonframework.commandhandling.model.AggregateIdentifier;
import org.axonframework.commandhandling.model.AggregateLifecycle;
import org.axonframework.commandhandling.model.ApplyMore;

import org.axonframework.eventsourcing.EventSourcingHandler;

import lombok.Getter;
import lombok.ToString;

import sample.commands.CreateInventoryItem;
import sample.commands.CheckInItemsToInventory;
import sample.events.InventoryItemCreated;
import sample.events.InventoryItemRenamed;
import sample.events.ItemsCheckedInToInventory;

@ToString
public class InventoryItem {

    @AggregateIdentifier
    @Getter
    private String id;

    @Getter
    private String name;

    @Getter
    private int count;

    // デフォルトコンストラクタは必須
    public InventoryItem() {
    }

    // 在庫作成コマンド処理
    @CommandHandler
    public InventoryItem(CreateInventoryItem cmd) {
        System.out.println("C call new: " + cmd);
        // 在庫作成イベントの作成と適用
        AggregateLifecycle.apply(new InventoryItemCreated(cmd.getId()));
        // 名前変更イベントの作成と適用
        AggregateLifecycle.apply(new InventoryItemRenamed(cmd.getName()));
    }

    // 在庫数追加コマンド処理
    @CommandHandler
    private ApplyMore updateCount(CheckInItemsToInventory cmd) {
        System.out.println("C call updateCount: " + cmd);
        // 在庫数追加イベントの作成と適用
        return AggregateLifecycle.apply(new ItemsCheckedInToInventory(cmd.getCount()));
    }

    // 在庫作成イベントの適用処理
    @EventSourcingHandler
    private void applyCreated(InventoryItemCreated event) {
        System.out.println("E call applyCreated: " + event);

        this.id = event.getId();
    }

    // 名前変更イベントの適用処理
    @EventSourcingHandler
    private void applyRenamed(InventoryItemRenamed event) {
        System.out.println("E call applyRenamed: " + event);

        this.name = event.getNewName();
    }

    // 在庫数追加イベントの適用処理
    @EventSourcingHandler
    private void applyCheckedIn(ItemsCheckedInToInventory event) {
        System.out.println("E call applyCheckedIn: " + event);

        this.count += event.getCount();
    }
}

実行クラスの作成

動作確認のための実行クラスを作成します。

CommandGateway へコマンドを send すれば処理が流れるように CommandBus・Repository・EventStore 等を組み合わせます。

イベントソーシングには EventSourcingRepository を使用します。 アノテーションを使ったコマンドハンドラを適用するには AggregateAnnotationCommandHandler を使用します。

また、今回はインメモリでイベントを保持する InMemoryEventStorageEngine を使っています。

src/main/java/SampleApp.java
import org.axonframework.commandhandling.AggregateAnnotationCommandHandler;
import org.axonframework.commandhandling.SimpleCommandBus;
import org.axonframework.commandhandling.gateway.DefaultCommandGateway;

import org.axonframework.eventsourcing.EventSourcedAggregate;
import org.axonframework.eventsourcing.EventSourcingRepository;
import org.axonframework.eventsourcing.eventstore.EmbeddedEventStore;
import org.axonframework.eventsourcing.eventstore.inmemory.InMemoryEventStorageEngine;

import lombok.val;

import sample.commands.CreateInventoryItem;
import sample.commands.CheckInItemsToInventory;
import sample.models.InventoryItem;

public class SampleApp {
    public static void main(String... args) {

        val cmdBus = new SimpleCommandBus();
        val gateway = new DefaultCommandGateway(cmdBus);

        val es = new EmbeddedEventStore(new InMemoryEventStorageEngine());

        // イベントソーシング用の Repository
        val repository = new EventSourcingRepository<>(InventoryItem.class, es);

        // アノテーションによるコマンドハンドラを適用
        new AggregateAnnotationCommandHandler<>(InventoryItem.class, 
                                                repository).subscribe(cmdBus);

        String r1 = gateway.sendAndWait(new CreateInventoryItem("s1", "sample1"));
        System.out.println("id: " + r1);

        System.out.println("----------");

        EventSourcedAggregate<InventoryItem> r2 = 
            gateway.sendAndWait(new CheckInItemsToInventory("s1", 5));

        printAggregate(r2);

        System.out.println("----------");

        EventSourcedAggregate<InventoryItem> r3 = 
            gateway.sendAndWait(new CheckInItemsToInventory("s1", 3));

        printAggregate(r3);
    }

    private static void printAggregate(EventSourcedAggregate<InventoryItem> esag) {
        System.out.println(esag.getAggregateRoot());
    }
}

なお、CreateInventoryItem を send した後に、同じ ID (今回は "s1")を使って再度 CreateInventoryItem を send してみたところ、特に重複チェックなどが実施されるわけではなく、普通にイベント (InventoryItemCreated と InventoryItemRenamed) が追加されました。

実行

実行結果は以下の通りです。

コマンドを送信する度に、これまでのイベントを適用してエンティティの状態を復元した後、新しいコマンドを処理している動作を確認できました。

> gradle run

・・・
:run
C call new: CreateInventoryItem(id=s1, name=sample1)
E call applyCreated: InventoryItemCreated(id=s1)
E call applyRenamed: InventoryItemRenamed(newName=sample1)
id: s1
----------
E call applyCreated: InventoryItemCreated(id=s1)
E call applyRenamed: InventoryItemRenamed(newName=sample1)
C call updateCount: CheckInItemsToInventory(id=s1, count=5)
E call applyCheckedIn: ItemsCheckedInToInventory(count=5)
InventoryItem(id=s1, name=sample1, count=5)
----------
E call applyCreated: InventoryItemCreated(id=s1)
E call applyRenamed: InventoryItemRenamed(newName=sample1)
E call applyCheckedIn: ItemsCheckedInToInventory(count=5)
C call updateCount: CheckInItemsToInventory(id=s1, count=3)
E call applyCheckedIn: ItemsCheckedInToInventory(count=3)
InventoryItem(id=s1, name=sample1, count=8)

備考 - スナップショット

イベント数が多くなると、イベントを毎回初めから適用して状態を復元するのはパフォーマンス的に厳しくなるため、スナップショットを使います。

Axon Framework にもスナップショットの機能が用意されており、EventCountSnapshotterTrigger 等を EventSourcingRepository へ設定すれば使えるようです。

ただし、今回のサンプル (SampleApp.java) では EventCountSnapshotterTrigger を機能させられませんでした。

というのも、EventCountSnapshotterTrigger では decorateForAppend メソッドの実行時にスナップショット化を行うようですが ※、今回のサンプルでは decorateForAppend は一度も実行せず decorateForRead メソッドのみが実行されるようでした。

 ※ ソースをざっと見た限りでは、カウンターのカウントアップ等も
    decorateForAppend の中でのみ実施しているようだった