Groovy で Apache Flink を使用
「Groovy で Apache Spark を使用」と同様の処理を Apache Flink で試してみました。
今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170311/
サンプルスクリプト
今回はローカルで実行するだけなので ExecutionEnvironment.createLocalEnvironment()
で取得した LocalEnvironment
を使用します。
map メソッドの引数へ Groovy のクロージャを使ったところ、org.apache.flink.api.common.InvalidProgramException: The implementation of the MapFunction is not serializable. ・・・
となってしまい駄目でしたので、MapFunction
の実装クラスを定義しました。
その場合、MapFunction の型引数をきちんと指定する必要があります。(そうしないと InvalidTypesException
が発生)
なお、flink-clients_2.10 を使用する場合、scala-library の @Grab 定義は不要でした。(flink-clients_2.11 の場合のみ scala-library が必要)
money_count.groovy
@Grapes([ @Grab('org.apache.flink:flink-java:1.2.0'), @GrabExclude('io.netty#netty;3.7.0.Final') ]) @Grab('org.apache.flink:flink-clients_2.11:1.2.0') @Grab('org.scala-lang:scala-library:2.11.8') @Grab('org.jboss.netty:netty:3.2.10.Final') import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.java.ExecutionEnvironment import org.apache.flink.api.java.tuple.Tuple2 import groovy.transform.CompileStatic // @CompileStatic は必須ではない(無くても動作する) @CompileStatic class ToTuple implements MapFunction<String, Tuple2<String, Integer>> { Tuple2 map(String v) { new Tuple2(v, 1) } } def env = ExecutionEnvironment.createLocalEnvironment() env.readTextFile(args[0]).map(new ToTuple()).groupBy(0).sum(1).print()
groupBy
メソッドではグルーピング対象とする項目を、sum
メソッドでは合計する項目を数値で指定します。
実行
Groovy のデフォルト設定では java.lang.IllegalArgumentException: Size of total memory must be positive.
が発生しましたので、JAVA_OPTS
環境変数で最大メモリサイズ (-Xmx) を変更して実行します。
実行結果
> set JAVA_OPTS=-Xmx512m > groovy money_count.groovy input_sample.txt Connected to JobManager at Actor[akka://flink/user/jobmanager_1#1033779003] 03/08/2017 00:56:11 Job execution switched to status RUNNING. ・・・ (10000,2) (10,2) (100,2) (50,1) (500,1) (1,2) (1000,3) (2000,1) (5,3)
input_sample.txt の内容は以下の通りです。
input_sample.txt
100 1 5 50 500 1000 10000 1000 1 10 5 5 10 100 1000 10000 2000
reveno でイベントソーシング
「sourcerer でイベントソーシング」 等と同様の処理を reveno で実装してみました。
今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170306/
はじめに
使用する Gradle ビルド定義ファイルは以下の通りです。
build.gradle
apply plugin: 'application' mainClassName = 'SampleApp' repositories { jcenter() } dependencies { compileOnly "org.projectlombok:lombok:1.16.12" compile 'org.reveno:reveno-core:1.23' }
lombok は必須ではありません。
(a) transaction 版
reveno では transaction
メソッドを使う方法と transactionAction
メソッドを使う方法が用意されているようなので、まずは transaction メソッドを使ってみます。
イベントクラスの作成
各種イベント用のクラスを作成します。
reveno はこれまでに試したフレームワークとは異なり、イベントからエンティティの状態を復元したりはしないのでイベントクラスは必須ではありません。(EventBus へ publishEvent しないのであれば不要)
そのため、reveno の場合はイベントソーシングではなくコマンドソーシングと呼べるのかもしれません。
在庫作成イベント src/main/java/sample/events/InventoryItemCreated.java
package sample.events; import lombok.Value; @Value public class InventoryItemCreated { private long id; }
在庫名の変更イベント src/main/java/sample/events/InventoryItemRenamed.java
package sample.events; import lombok.Value; @Value public class InventoryItemRenamed { private long id; private String newName; }
在庫数の変更イベント src/main/java/sample/events/ItemsCheckedInToInventory.java
package sample.events; import lombok.Value; @Value public class ItemsCheckedInToInventory { private long id; private int count; }
モデルクラスの作成
エンティティクラスとビュークラスを作成します。
エンティティクラスは状態を保存するため、ビュークラスはクエリーの結果としてエンティティクラスから変換して返す事になります。
エンティティクラス src/main/java/sample/model/InventoryItem.java
package sample.model; import lombok.Value; @Value public class InventoryItem { private String name; private int count; }
ビュークラス src/main/java/sample/model/InventoryItemView.java
package sample.model; import lombok.Value; @Value public class InventoryItemView { private long id; private String name; private int count; }
実行クラスの作成
今回はトランザクションやイベントのハンドリング処理等をこのクラスへ実装する事にしました。
transaction メソッドを使用する場合、文字列でトランザクションのアクションを定義し、アクションの実行時に Map でパラメータを渡せばよさそうです。
永続化したデータは Engine
のコンストラクタ引数で指定したディレクトリ内のファイルへ保存されるようになっており、store
や remap
した内容は tx-xxx
ファイルへ、publishEvent
した内容は evn-xxx
ファイルへ保存されるようです。※
※ publishEvent を実行しなかった場合、 evn-xxx ファイルの内容は空になりました
QueryManager
を使ってデータを取得する場合、エンティティを直接取得する事はできないので、viewMapper
メソッドを使ってエンティティクラスからビュークラスへのマッピングを設定します。
イベントのハンドリングは events
メソッドで取得した EventsManager
に対して実施します。
今回は、executeSync
のような同期用メソッドのみを使っていますが、非同期用のメソッドも用意されています。
実行クラス src/main/java/SampleApp.java
import lombok.val; import org.reveno.atp.core.Engine; import org.reveno.atp.utils.MapUtils; import sample.model.InventoryItem; import sample.model.InventoryItemView; import sample.events.InventoryItemCreated; import sample.events.InventoryItemRenamed; import sample.events.ItemsCheckedInToInventory; public class SampleApp { public static void main(String... args) { val reveno = new Engine("db"); // 各種ハンドラやマッピング等の設定を実施 setUp(reveno); reveno.startup(); // 在庫の作成 long id = reveno.executeSync("createInventoryItem", MapUtils.map("name", "sample1")); System.out.println("id: " + id); // 在庫数の更新 reveno.executeSync("checkInItemsToInventory", MapUtils.map("id", id, "count", 5)); // 在庫数の更新 reveno.executeSync("checkInItemsToInventory", MapUtils.map("id", id, "count", 3)); // 検索 val res = reveno.query().find(InventoryItemView.class, id); System.out.println("result: " + res); reveno.shutdown(); } private static void setUp(Engine reveno) { // エンティティクラスをビュークラスへ変換する設定 reveno.domain().viewMapper( InventoryItem.class, InventoryItemView.class, (id, e, r) -> new InventoryItemView(id, e.getName(), e.getCount()) ); // 在庫の作成処理 reveno.domain().transaction("createInventoryItem", (t, ctx) -> { long id = t.id(); String name = t.arg("name"); // エンティティ(状態)の保存 ctx.repo().store(id, new InventoryItem(name, 0)); // イベントの発行 ctx.eventBus().publishEvent(new InventoryItemCreated(id)); ctx.eventBus().publishEvent(new InventoryItemRenamed(id, name)); }).uniqueIdFor(InventoryItem.class).command(); // 在庫数の更新処理 reveno.domain().transaction("checkInItemsToInventory", (t, ctx) -> { long id = t.longArg("id"); int count = t.intArg("count"); // エンティティ(状態)の更新 ctx.repo().remap(id, InventoryItem.class, (rid, state) -> new InventoryItem(state.getName(), state.getCount() + count)); // イベントの発行 ctx.eventBus().publishEvent(new ItemsCheckedInToInventory(id, count)); }).command(); // InventoryItemCreated イベントのハンドリング設定 reveno.events().eventHandler(InventoryItemCreated.class, (event, meta) -> System.out.println("*** create event: " + event + ", transactionTime: " + meta.getTransactionTime() + ", isRestore: " + meta.isRestore())); } }
実行
gradle run で実行した結果です。
実行結果
> gradle run ・・・ id: 1 result: InventoryItemView(id=1, name=sample1, count=8) ・・・ *** create event: InventoryItemCreated(id=1), transactionTime: 1488718421288, isRestore: false ・・・
(b) transactionAction 版
イベントクラス等は同じものを使用して transactionAction を使った処理を作成してみます。
コマンドクラスの作成
transactionAction ではコマンドクラスを使う事になるので作成します。
id の値はインスタンス化の時点では決定せず、コマンドハンドラ内で設定することになるので、@Wither
を使って id の値のみ変更したコピーを返すメソッド (withId
) を用意するようにしています。
在庫作成コマンド src/main/java/sample/commands/CreateInventoryItem.java
package sample.commands; import lombok.Value; import lombok.experimental.NonFinal; import lombok.experimental.Wither; @Value public class CreateInventoryItem { @Wither @NonFinal private long id; private String name; }
在庫数の更新コマンド src/main/java/sample/commands/CheckInItemsToInventory.java
package sample.commands; import lombok.Value; @Value public class CheckInItemsToInventory { private long id; private int count; }
実行クラスの作成
command
メソッドでコマンドハンドラを設定し、コマンド毎のトランザクションアクションを transactionAction
で設定します。
コマンドハンドラ内で executeTxAction
へコマンドを渡せば該当するトランザクションアクションが実行されます。
実行クラス src/main/java/SampleApp.java
import lombok.val; import org.reveno.atp.core.Engine; import sample.commands.CheckInItemsToInventory; import sample.commands.CreateInventoryItem; import sample.model.InventoryItem; import sample.model.InventoryItemView; import sample.events.InventoryItemCreated; import sample.events.InventoryItemRenamed; import sample.events.ItemsCheckedInToInventory; public class SampleApp { public static void main(String... args) { val reveno = new Engine("db"); setUp(reveno); reveno.startup(); // 在庫の作成 long id = reveno.executeSync(new CreateInventoryItem(0, "sample1")); System.out.println("id: " + id); // 在庫数の更新 reveno.executeSync(new CheckInItemsToInventory(id, 5)); // 在庫数の更新 reveno.executeSync(new CheckInItemsToInventory(id, 3)); // 検索 val res = reveno.query().find(InventoryItemView.class, id); System.out.println("result: " + res); reveno.shutdown(); } private static void setUp(Engine reveno) { ・・・ // 在庫作成コマンドのハンドリング設定 reveno.domain().command(CreateInventoryItem.class, Long.class, (cmd, ctx) -> { long id = ctx.id(InventoryItem.class); // id を更新してトランザクションアクションを実行 ctx.executeTxAction(cmd.withId(id)); return id; }); // 在庫数の更新コマンドのハンドリング設定 reveno.domain().command(CheckInItemsToInventory.class, (cmd, ctx) -> ctx.executeTxAction(cmd)); reveno.domain().transactionAction(CreateInventoryItem.class, (act, ctx) -> { // エンティティ(状態)の保存 ctx.repo().store(act.getId(), new InventoryItem(act.getName(), 0)); // イベントの発行 ctx.eventBus().publishEvent(new InventoryItemCreated(act.getId())); ctx.eventBus().publishEvent(new InventoryItemRenamed(act.getId(), act.getName())); }); reveno.domain().transactionAction(CheckInItemsToInventory.class, (act, ctx) -> { // エンティティ(状態)の更新 ctx.repo().remap(act.getId(), InventoryItem.class, (id, state) -> new InventoryItem(state.getName(), state.getCount() + act.getCount())); // イベントの発行 ctx.eventBus().publishEvent(new ItemsCheckedInToInventory(act.getId(), act.getCount())); }); ・・・ } }
実行
gradle run で実行した結果です。
実行結果
> gradle run ・・・ id: 1 *** create event: InventoryItemCreated(id=1), transactionTime: 1488721568341, isRestore: false result: InventoryItemView(id=1, name=sample1, count=8) ・・・
Groovy で Cassandra を組み込み実行
Groovy で Apache Cassandra を組み込み実行してみました。
今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170227/
組み込み実行
まずは、設定ファイルを用意しておきます。
今回は実行に必要な最小限の設定を行っています。
embed.conf
cluster_name: 'Test Cluster' listen_address: localhost commitlog_sync: periodic commitlog_sync_period_in_ms: 10000 partitioner: org.apache.cassandra.dht.Murmur3Partitioner endpoint_snitch: SimpleSnitch seed_provider: - class_name: org.apache.cassandra.locator.SimpleSeedProvider parameters: - seeds: "127.0.0.1" # CQL クライアントで接続するために必要 start_native_transport: true
ここで、Apache Cassandra 3.10 では thrift クライアントで接続するためのポート 9160 はデフォルトで有効のようですが、CQL クライアント用のポート 9042 を有効化するには start_native_transport
の設定が必要でした。
ポート番号 | 用途 |
---|---|
9042 | CQL クライアント用 |
9160 | thrift クライアント用 |
Cassandra を組み込み実行する Groovy スクリプトは以下の通りです。
cassandra.config
で設定ファイル、cassandra.storagedir
でデータディレクトリのパスを設定、CassandraDaemon
をインスタンス化して activate
します。(deactivate
を実行すると停止します)
cassandra_embed.groovy
@Grab('org.apache.cassandra:cassandra-all:3.10') import org.apache.cassandra.service.CassandraDaemon def conf = 'embed.yaml' def dir = new File(args[0]) if (!dir.exists()) { dir.mkdirs() } System.setProperty('cassandra.config', conf) System.setProperty('cassandra-foreground', 'true') System.setProperty('cassandra.storagedir', dir.absolutePath) def cassandra = new CassandraDaemon() // 開始 cassandra.activate() System.in.read() // 終了 cassandra.deactivate()
実行結果は以下の通りで、特に問題なく起動できました。
実行
> groovy cassandra_embed.groovy data ・・・ 21:30:46.493 [main] INFO o.apache.cassandra.transport.Server - Starting listening for CQL clients on localhost/127.0.0.1:9042 (unencrypted)... ・・・ 21:30:46.790 [Thread-1] INFO o.a.cassandra.thrift.ThriftServer - Listening for thrift clients...
動作確認
CQL クライアントを使って Cassandra への接続確認を行います。
cqlsh 利用
まずは、Cassandra 3.10 に同梱されている cqlsh
コマンドを使って、キースペースとテーブルを作成しデータ登録を行います。
ここで、cqlsh (本体は cqlsh.py) の実行には Python の実行環境が必要です。
cqlsh による操作結果
> cqlsh ・・・ Connected to Test Cluster at 127.0.0.1:9042. [cqlsh 5.0.1 | Cassandra 3.10 | CQL spec 3.4.4 | Native protocol v4] ・・・ cqlsh> CREATE KEYSPACE sample WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor' : 1}; cqlsh> use sample; cqlsh:sample> CREATE TABLE data (id text PRIMARY KEY, name text, value int); cqlsh:sample> INSERT INTO data (id, name, value) values ('d1', 'sample1', 1); cqlsh:sample> INSERT INTO data (id, name, value) values ('d2', 'sample2', 20); cqlsh:sample> INSERT INTO data (id, name, value) values ('d3', 'sample3', 300); cqlsh:sample> SELECT * FROM data; id | name | value ----+---------+------- d2 | sample2 | 20 d1 | sample1 | 1 d3 | sample3 | 300 (3 rows)
Datastax Java Driver for Apache Cassandra 利用
次に、登録したデータを Datastax Java Driver for Apache Cassandra を使って検索してみます。
netty と jffi モジュールで Error grabbing Grapes -- [download failed: ・・・]
となったので、@GrabExclude を使って回避しています。
client_sample.groovy
@Grapes([ @Grab('com.datastax.cassandra:cassandra-driver-core:3.1.4'), @GrabExclude('io.netty#netty-handler;4.0.37'), @GrabExclude('com.github.jnr#jffi;1.2.10') ]) @Grab('io.netty:netty-all:4.0.44.Final') @Grab('org.slf4j:slf4j-nop:1.7.23') import com.datastax.driver.core.Cluster Cluster.builder().addContactPoint('localhost').build().withCloseable { cluster -> cluster.connect('sample').withCloseable { session -> def res = session.execute('select * from data') res.each { println it } } }
実行結果は以下の通りです。
実行結果
> groovy client_sample.groovy Row[d2, sample2, 20] Row[d1, sample1, 1] Row[d3, sample3, 300]
Groovy で Elasticsearch を組み込み実行
Groovy で Elasticsearch を組み込み実行してみました。
今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170203/
(a) クライアント接続しない場合
まずは、クライアント接続が不可な Elasticsearch を起動して、ドキュメント登録や検索を行ってみます。
ポート番号 | クライアント接続 |
---|---|
9200 (HTTP) | × |
9300 (Transport) | × |
Elasticsearch の組み込み実行は、適切な設定を行った Settings
(もしくは Environment
)を使って Node
を作成し start
を実行するだけです。
path.home
の設定は必須で、指定したパスの data ディレクトリを使用します。(無ければ自動的に作成されます)
transport.type
を local
へ、http.enabled
を false
へ設定すればクライアントの接続を受け付けない状態になります。※
※ クライアント接続を受け付けるためのプラグインを適用していない場合、 このように設定しておかないと実行時にエラーとなります
この場合、Node の client
メソッドで取得した Client
を使ってインデックス等を操作します。
els_local.groovy
@Grab('org.elasticsearch:elasticsearch:5.2.0') // log4j のモジュールが必要(無い場合は NoClassDefFoundError が発生) @Grab('org.apache.logging.log4j:log4j-api:2.8') @Grab('org.apache.logging.log4j:log4j-core:2.8') import org.elasticsearch.common.settings.Settings import org.elasticsearch.node.Node def index = args[0] // インデックス def type = args[1] // タイプ // 設定 def setting = Settings.builder() .put('path.home', '.') // data ディレクトリの配置先を指定 .put('transport.type', 'local') .put('http.enabled', 'false') .build() new Node(setting).withCloseable { node -> // Elasticsearch の実行 node.start() node.client().withCloseable { client -> // インデックスへのドキュメント登録 def r1 = client.prepareIndex(index, type) .setSource('time', System.currentTimeMillis()) .execute() .get() println r1 // 検索結果へ即時反映されなかったので適度に待機 sleep(1000) println '-----' // 検索 def r2 = client.prepareSearch(index) .setTypes(type) .execute() .get() println r2 } }
動作確認
実行結果は以下の通りです。
log4j2 の設定ファイルが見つからない旨のエラーログが出力されていますが、 Elasticsearch の組み込み実行は成功しているようです。
実行結果
> groovy els_local.groovy a1 item ERROR StatusLogger No log4j2 configuration file found. ・・・ IndexResponse[index=a1,type=item,id=AVn6bOp-0Vu_EXj66Fj9,version=1,result=created,shards={"_shards":{"total":2,"successful":1,"failed":0}}] ----- {"took":140,"timed_out":false,"_shards":{"total":5,"successful":5,"failed":0},"hits":{"total":1,"max_score":1.0,"hits":[{"_index":"a1","_type":"item","_id":"AVn6bOp-0Vu_EXj66Fj9","_score":1.0,"_source":{"time":1485965157491}}]}}
(b) クライアント接続する場合
次に、クライアント接続が可能な Elasticsearch を組み込み実行します。
ポート番号 | クライアント接続 |
---|---|
9200 (HTTP) | ○ |
9300 (Transport) | ○ |
これらのポート番号でクライアント接続を受けるにはプラグインが必要なので、
今回は Netty4Plugin
(transport-netty4-client) を使用しました。
Node の public コンストラクタはプラグインクラスを直接指定できないため、今回は protected コンストラクタ ※ を直接呼び出して Netty4Plugin を適用しています。
※ 第 2引数で Plugin クラスを指定できるようになっている protected Node(Environment environment, Collection<Class<? extends Plugin>> plugins)
なお、close
メソッドの実行有無に関係なく Node は終了してしまうので、System.in.read()
を使って終了を止めています。
els_netty.groovy
@Grab('org.elasticsearch:elasticsearch:5.2.0') @Grab('org.elasticsearch.plugin:transport-netty4-client:5.2.0') @Grab('org.apache.logging.log4j:log4j-api:2.8') @Grab('org.apache.logging.log4j:log4j-core:2.8') import org.elasticsearch.common.settings.Settings import org.elasticsearch.env.Environment import org.elasticsearch.node.Node import org.elasticsearch.transport.Netty4Plugin def setting = Settings.builder() .put('path.home', '.') .build() def env = new Environment(setting) // Netty4Plugin を指定して Node をインスタンス化 new Node(env, [Netty4Plugin]).withCloseable { node -> node.start() println 'started server ...' // 終了するのを止めるための措置 System.in.read() }
動作確認
Elasticsearch を実行します。
Elasticsearch 組み込み実行
> groovy els_netty.groovy ERROR StatusLogger No log4j2 configuration file found. ・・・ started server ...
(1) HTTP 接続(9200 ポート)
curl で 9200 ポートへ接続した結果は以下の通りです。
実行結果(HTTP)
$ curl -s http://localhost:9200/b1/item -d "{\"time\": `date +%s%3N`}" {"_index":"b1","_type":"item","_id":"AVn6rxw5O4vwdZ1ibdCo","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"created":true} $ curl -s http://localhost:9200/b1/item/_search {"took":78,"timed_out":false,"_shards":{"total":5,"successful":5,"failed":0},"hits":{"total":1,"max_score":1.0,"hits":[{"_index":"b1","_type":"item","_id":"AVn6rxw5O4vwdZ1ibdCo","_score":1.0,"_source":{"time": 1485969495792}}]}}
(2) Transport 接続(9300 ポート)
9300 ポートへ接続して (a) と同等のクライアント処理を実施するスクリプトは以下のようになります。
9300 ポートへ接続するために TransportClient
を使用しています。
els_client.groovy
@Grab('org.elasticsearch.client:transport:5.2.0') @Grab('org.apache.logging.log4j:log4j-api:2.8') @Grab('org.apache.logging.log4j:log4j-core:2.8') import org.elasticsearch.common.settings.Settings import org.elasticsearch.transport.client.PreBuiltTransportClient import org.elasticsearch.common.transport.InetSocketTransportAddress def index = args[0] def type = args[1] def addr = new InetSocketTransportAddress( InetAddress.getLoopbackAddress(), 9300) // TransportClient の生成 def transportClient = new PreBuiltTransportClient(Settings.EMPTY) .addTransportAddress(addr) transportClient.withCloseable { client -> // インデックスへのドキュメント登録 def r1 = client.prepareIndex(index, type) .setSource('time', System.currentTimeMillis()) .execute() .get() println r1 sleep(1000) println '-----' // 検索 def r2 = client.prepareSearch(index) .setTypes(type) .execute() .get() println r2 }
実行結果は以下の通りです。
実行結果(Transport)
> groovy els_client.groovy b2 item ・・・ IndexResponse[index=b2,type=item,id=AVn6tvd3WlzxY0bEAQ4r,version=1,result=created,shards={"_shards":{"total":2,"successful":1,"failed":0}}] ----- {"took":78,"timed_out":false,"_shards":{"total":5,"successful":5,"failed":0},"hits":{"total":1,"max_score":1.0,"hits":[{"_index":"b2","_type":"item","_id":"AVn6tvd3WlzxY0bEAQ4r","_score":1.0,"_source":{"time":1485970011680}}]}}
備考. HTTP 接続の無効化
9200 ポートの接続だけを無効化するには http.enabled
を false
にします。
ポート番号 | クライアント接続 |
---|---|
9200 (HTTP) | × |
9300 (Transport) | ○ |
els_netty_nohttp.groovy
・・・ def setting = Settings.builder() .put('path.home', '.') .put('http.enabled', 'false') // HTTP 接続(9200 ポート)の無効化 .build() ・・・ new Node(env, [Netty4Plugin]).withCloseable { node -> ・・・ }
Akka の FileIO でファイルを読み書き
Akka (akka-stream) の FileIO
を使ってファイルの読み書きを行ってみます。
今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170131/
はじめに
akka-stream では Java 用の API は akka.stream.javadsl
パッケージに、Scala 用の API は akka.stream.scaladsl
パッケージに定義されています。
主要なクラスやメソッドは javadsl と scaladsl で概ね共通化されているようですが、Source
の recover
メソッドは deprecated の有無が違っていました。
Source の recover/recoverWith メソッドの deprecated 状況
パッケージ | deprecated 有り | deprecated 無し | 備考 |
---|---|---|---|
akka.stream.scaladsl | recoverWith | recover | ソースは akka/stream/scaladsl/Flow.scala |
akka.stream.javadsl | recover | recoverWith | ソースは akka/stream/javadsl/Source.scala |
ここで、deprecated の理由が @deprecated("Use recoverWithRetries instead.", "2.4.4")
のようなので、scaladsl の方が正しく、javadsl の方は deprecated するメソッドが違っているのだと思います ※。
※ recoverWith の代わりに recoverWithRetries を使えるが (引数が 1つ増えただけなので) recover は引数の型がそもそも違う
Scala の場合
まずは Scala で実装してみます。
ファイル用の Sink は FileIO.toPath
で、Source は FileIO.fromPath
で取得できます。
ファイルの入出力の型は akka.util.ByteString
となっており、以下では map を使って String との変換を行っています。
Source の recover
メソッドを使えば、例外が発生していた場合に代用の値へ差し替えて処理を繋げられます。
今回は、ファイルが存在しない等で IOException が発生した際に、"invalid file, ・・・" という文字列へ差し替えるために recover を使っています。
src/main/scala/SampleApp.scala
import akka.actor.ActorSystem import akka.stream.ActorMaterializer import akka.stream.scaladsl.{FileIO, Source} import akka.util.ByteString import java.nio.file.Paths import java.io.IOException object SampleApp extends App { implicit val system = ActorSystem() import system.dispatcher // ExecutionContext を implicit implicit val materializer = ActorMaterializer() // ファイルへの書き込み(sample1.txt へ "sample data" を出力) val res1 = Source.single("sample data") .map(ByteString.fromString) .runWith(FileIO.toPath(Paths.get("sample1.txt"))) // ファイルの読み込み(sample2.txt の内容を println) val res2 = FileIO.fromPath(Paths.get("sample2.txt")) .map(_.utf8String) .recover { case e: IOException => s"invalid file, ${e}" } .runForeach(println) res1.flatMap(_ => res2).onComplete(_ => system.terminate) }
ビルドと実行
Gradle のビルド定義ファイルは以下の通りです。
build.gradle
apply plugin: 'scala' apply plugin: 'application' mainClassName = 'SampleApp' repositories { jcenter() } dependencies { compile 'org.scala-lang:scala-library:2.12.1' compile 'com.typesafe.akka:akka-stream_2.12:2.5-M1' }
sample2.txt ファイルの無い状態で実行します。
実行結果1
> gradle -q run invalid file, java.nio.file.NoSuchFileException: sample2.txt
sample2.txt を作成して実行します。
実行結果2
> echo %time% > sample2.txt > gradle -q run 22:11:48.03
Java の場合
次に Java で実装してみます。
Scala と概ね同じですが、akka.stream.javadsl.Source でも recover
の引数が scala.PartialFunction
となっており、多少の工夫が必要です。
recover の引数に合う scala.PartialFunction は Akka の akka.japi.pf.PFBuilder
で作れるので、以下では Match.match
から PFBuilder を取得して使っています。
なお、Akka では scala.PartialFunction を組み立てるための Java 用の API が akka.japi.pf
パッケージにいくつか用意されています。
src/main/java/SampleApp.java
import akka.actor.ActorSystem; import akka.japi.pf.Match; import akka.japi.pf.PFBuilder; import akka.stream.ActorMaterializer; import akka.stream.Materializer; import akka.stream.javadsl.FileIO; import akka.stream.javadsl.Source; import akka.util.ByteString; import java.io.IOException; import java.nio.file.Paths; import java.util.concurrent.CompletableFuture; public class SampleApp { public static void main(String... args) { final ActorSystem system = ActorSystem.create(); final Materializer materializer = ActorMaterializer.create(system); // ファイルへの書き込み(sample1.txt へ "sample data" を出力) CompletableFuture<?> res1 = Source.single("sample data") .map(ByteString::fromString) .runWith(FileIO.toPath(Paths.get("sample1.txt")), materializer) .toCompletableFuture(); // scala.PartialFunction のビルダー定義 PFBuilder<Throwable, String> pfunc = Match.match(IOException.class, e -> "invalid file, " + e); // ファイルの読み込み(sample2.txt の内容を println) CompletableFuture<?> res2 = FileIO.fromPath(Paths.get("sample2.txt")) .map(ByteString::utf8String) .recover(pfunc.build()) .runForeach(System.out::println, materializer) .toCompletableFuture(); CompletableFuture.allOf(res1, res2) .handle((v, e) -> system.terminate()) .join(); } }
ビルドと実行
Gradle のビルド定義ファイルは以下の通りです。
build.gradle
apply plugin: 'application' mainClassName = 'SampleApp' repositories { jcenter() } dependencies { compile 'com.typesafe.akka:akka-stream_2.12:2.5-M1' }
sample2.txt ファイルの無い状態で実行します。
実行結果1
> gradle -q run ・・・\src\main\java\SampleApp.javaは非推奨のAPIを使用またはオーバーライドしています。 ・・・ invalid file, java.nio.file.NoSuchFileException: sample2.txt
recover が deprecated されている件で警告メッセージが出力されますが、上述したように recover を deprecated しているのが誤りだと思われるので無視します。
sample2.txt を作成して実行します。
実行結果2
> echo %time% > sample2.txt > gradle -q run ・・・ 22:14:16.00
Sourcerer でイベントソーシング
「Axon Framework でイベントソーシング」 や 「es4j でイベントソーシング」 と同様の処理を Sourcerer で実装してみました。
今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170110/
はじめに
以下のような Gradle ビルド定義ファイルを使います。
Sourcerer はイベントの保存先として Event Store を使用するため、今回は Event Store クライアントライブラリに esjc
を使う sourcerer-esjc
モジュールを使用します。
build.gradle
apply plugin: 'application' mainClassName = 'SampleApp' repositories { jcenter() } dependencies { compileOnly 'org.projectlombok:lombok:1.16.12' compile 'org.elder.sourcerer:sourcerer-esjc:v4.0.1' compile 'com.fasterxml.jackson.core:jackson-databind:2.8.5' compile 'io.javaslang:javaslang:2.1.0-alpha' }
lombok
と javaslang
は必須ではありません。(javaslang はパターンマッチ処理に使いました)
Sourcerer によるイベントソーシング
コマンドの作成
コマンドは特に注意するところはなく、普通の Java クラスとして実装するだけです。
ここで定義したコマンドは、CommandFactory で作成した Command の引数 (Arguments) として使う事になります。
在庫作成コマンド src/main/java/sample/commands/CreateInventoryItem.java
package sample.commands; import lombok.Value; // 在庫作成コマンド @Value public class CreateInventoryItem { private String id; private String name; }
在庫数追加コマンド src/main/java/sample/commands/CheckInItemsToInventory.java
package sample.commands; import lombok.Value; // 在庫数追加コマンド @Value public class CheckInItemsToInventory { private int count; }
イベントの作成
イベントの型へ @EventType
を付与します。
AggregateProjection
等で指定するイベントの型は 1つなので、まずはベースとなるインターフェースを定義します。
イベントの内容は JSON 化して Event Store へ保存する事になりますが、デフォルトでは JSON へ型情報が付与されず、複数のイベント型を使うと JSON からイベントオブジェクトを復元する際に不都合が生じます。
これを回避するために @JsonTypeInfo
と @JsonSubTypes
を使って JSON へ型情報を残すように設定しています ※。
※ @JsonTypeInfo の use へ NAME を指定した場合は "@type":"クラス名" の情報を JSON へ出力します
イベントのベースインターフェース src/main/java/sample/events/InventoryEvent.java
package sample.events; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonSubTypes.Type; import com.fasterxml.jackson.annotation.JsonTypeInfo; import org.elder.sourcerer.EventType; @EventType // JSON へ型情報を残すための設定 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME) @JsonSubTypes({ @Type(InventoryItemCreated.class), @Type(InventoryItemRenamed.class), @Type(ItemsCheckedInToInventory.class) }) public interface InventoryEvent { }
在庫作成イベント src/main/java/sample/events/InventoryItemCreated.java
package sample.events; import lombok.Value; // 在庫作成イベント @Value public class InventoryItemCreated implements InventoryEvent { private String id; }
在庫名の変更イベント src/main/java/sample/events/InventoryItemRenamed.java
package sample.events; import lombok.Value; // 在庫名の変更イベント @Value public class InventoryItemRenamed implements InventoryEvent { private String name; }
在庫数の追加イベント src/main/java/sample/events/ItemsCheckedInToInventory.java
package sample.events; import lombok.Value; // 在庫数の追加イベント @Value public class ItemsCheckedInToInventory implements InventoryEvent { private int count; }
コマンドハンドラの作成
コマンドを処理してイベントを生成するメソッドを定義します。
引数や戻り値の型は CommandFactory
の fromOperation
メソッドへ渡す値 (Operations
の各種メソッドで生成) に応じて調整します。
単一のコマンドから複数のイベントを生成する場合は戻り値を List<イベント型>
とするだけのようです。
src/main/java/sample/InventoryOperations.java
package sample; import com.google.common.collect.ImmutableList; import sample.commands.CheckInItemsToInventory; import sample.commands.CreateInventoryItem; import sample.events.InventoryEvent; import sample.events.InventoryItemCreated; import sample.events.InventoryItemRenamed; import sample.events.ItemsCheckedInToInventory; import java.util.List; public class InventoryOperations { // 作成コマンドの処理 public static List<InventoryEvent> create(CreateInventoryItem cmd) { return ImmutableList.of( new InventoryItemCreated(cmd.getId()), new InventoryItemRenamed(cmd.getName()) ); } // 在庫数追加コマンドの処理 public static ItemsCheckedInToInventory checkIn(InventoryItem state, CheckInItemsToInventory cmd) { return new ItemsCheckedInToInventory(cmd.getCount()); } }
AggregateProjection の作成
AggregateProjection<状態の型, イベントの型>
を実装して、イベントから状態を復元する処理を実装します。
今回は javaslang モジュールの Match
と Case
を使ってパターンマッチで処理しています。
src/main/java/sample/InventoryProjection.java
package sample; import org.elder.sourcerer.AggregateProjection; import org.jetbrains.annotations.NotNull; import sample.events.InventoryItemCreated; import sample.events.InventoryEvent; import sample.events.InventoryItemRenamed; import sample.events.ItemsCheckedInToInventory; import static javaslang.API.*; import static javaslang.Predicates.*; public class InventoryProjection implements AggregateProjection<InventoryItem, InventoryEvent>{ @NotNull @Override public InventoryItem empty() { return new InventoryItem("", "", 0); } @NotNull @Override public InventoryItem apply(@NotNull String id, @NotNull InventoryItem state, @NotNull InventoryEvent event) { return Match(event).of( // InventoryItemCreated イベントの場合 Case(instanceOf(InventoryItemCreated.class), ev -> state.withId(ev.getId())), // InventoryItemRenamed イベントの場合 Case(instanceOf(InventoryItemRenamed.class), ev -> state.withName(ev.getName())), // ItemsCheckedInToInventory イベントの場合 Case(instanceOf(ItemsCheckedInToInventory.class), ev -> state.withCount(state.getCount() + ev.getCount())), // その他 Case($(), state) ); } }
在庫の状態 InventoryItem
は以下のように実装しました。
src/main/java/sample/InventoryItem.java
package sample; import lombok.Value; @Value public class InventoryItem { private String id; private String name; private int count; public InventoryItem withId(String newId) { return new InventoryItem(newId, name, count); } public InventoryItem withName(String newName) { return new InventoryItem(id, newName, count); } public InventoryItem withCount(int newCount) { return new InventoryItem(id, name, newCount); } }
実行クラスの作成
sourcerer-esjc-spring モジュールを使う方が楽だと思われますが、今回は AggregateRepository
と CommandFactory
の組み立て処理を自前で実装してみました。
- (1) EventStore クライアント作成
- (2) EventRepositoryFactory 作成
- (3) EventRepository 作成
- (4) AggregateRepository 作成
- (5) CommandFactory 作成
CommandFactory へコマンドハンドラのメソッドをそれぞれ指定して Command
を作成します。(コマンドハンドラメソッドのシグネチャに対応する Operations
のメソッドを使います)
Command へ aggregateId を設定し arguments へコマンドの内容を設定した後、run
を実行するとコマンドが適用されます。
src/main/java/SampleApp.java
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.github.msemys.esjc.EventStoreBuilder; import lombok.val; import org.elder.sourcerer.*; import org.elder.sourcerer.esjc.EventStoreEsjcEventRepositoryFactory; import sample.*; import sample.commands.CheckInItemsToInventory; import sample.commands.CreateInventoryItem; import sample.events.InventoryEvent; import java.util.UUID; public class SampleApp { public static void main(String... args) throws JsonProcessingException { // (1) EventStore クライアント作成 val eventStore = EventStoreBuilder.newBuilder() // ポート番号に 1113 を使う点に注意 (2113 ではない) .singleNodeAddress("localhost", 1113) .build(); val mapper = new ObjectMapper(); // (2) EventRepositoryFactory 作成 val eventRepositoryFactory = new EventStoreEsjcEventRepositoryFactory( eventStore, mapper, "sample" ); // (3) EventRepository 作成 val eventRepository = eventRepositoryFactory.getEventRepository(InventoryEvent.class); // (4) AggregateRepository 作成 val aggregateRepository = new DefaultAggregateRepository<>( eventRepository, new InventoryProjection() ); // (5) CommandFactory 作成 val commandFactory = new DefaultCommandFactory<>(aggregateRepository); // 在庫作成処理 val createCommand = commandFactory.fromOperation( Operations.constructorOf(InventoryOperations::create)); // 在庫数の追加処理 val checkInCommand = commandFactory.fromOperation( Operations.updateOf(InventoryOperations::checkIn)); val id = UUID.randomUUID().toString(); // 在庫作成の実行 val r1 = createCommand.setAggregateId(id) .setArguments(new CreateInventoryItem(id, "sample")) .run(); printCommandResult(r1); // 在庫数の追加の実行(1回目) val r2 = checkInCommand.setAggregateId(id) .setArguments(new CheckInItemsToInventory(5)) .run(); printCommandResult(r2); // 在庫数の追加の実行(2回目) val r3 = checkInCommand.setAggregateId(id) .setArguments(new CheckInItemsToInventory(3)) .run(); printCommandResult(r3); // 集約の内容を取得 val aggregate = aggregateRepository.load(id); // 状態を出力 System.out.println("*** result state: " + aggregate.state()); eventStore.disconnect(); } private static void printCommandResult(CommandResult<? extends InventoryEvent> result) { System.out.printf( "*** command result: events=%s, newVersion=%s\n", result.getEvents(), result.getNewVersion() ); } }
実行
https://geteventstore.com/ から Event Store の実行環境をダウンロードして、起動しておきます。
Event Store 起動
> EventStore.ClusterNode.exe --db ./db --log ./logs ・・・ [08912,14,15:31:00.488] Starting Normal TCP listening on TCP endpoint: 127.0.0.1:1113. ・・・ [08912,09,15:31:07.100] Created stats stream '$stats-127.0.0.1:2113', code = Success
サンプルアプリケーションの実行
> gradle -q run ・・・ *** command result: events=[InventoryItemCreated(id=4184fd14-7725-4d09-8e24-59e9a94859a1), InventoryItemRenamed(name=sample)], newVersion=1 ・・・ *** command result: events=[ItemsCheckedInToInventory(count=5)], newVersion=2 ・・・ *** command result: events=[ItemsCheckedInToInventory(count=3)], newVersion=3 ・・・ *** result state: InventoryItem(id=4184fd14-7725-4d09-8e24-59e9a94859a1, name=sample, count=8) ・・・
Event Store への登録内容を確認するため、http://127.0.0.1:2113/ へアクセスして 「Stream Browser」 で sample:・・・・
の内容を見てみました。
備考
今回のサンプルも処理が終わった後(disconnect 実行後)にプロセスが終了しません。
原因を詳しく調べていませんが、Event Store クライアントライブラリの esjc 1.6.0 を単体で試した際に、以下が原因でプロセスが終了しないようだったので、esjc が原因かもしれません。
- executor 未指定の際に Settings 内で作る ThreadPoolExecutor の shutdown を実行しない
- EventStoreTcp 内の NioEventLoopGroup の shutdownGracefully を実行しない
es4j でイベントソーシング
「Axon Framework でイベントソーシング」 と同様の処理を es4j (Eventsourcing for Java) で実装してみました。
今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170107/
はじめに
今回はイベントをインメモリで保持する eventsourcing-inmem
を使うため、以下のような Gradle 用ビルド定義を用意しました。 (lombok は便利なので使います)
build.gradle
apply plugin: 'application' mainClassName = 'SampleApp' repositories { jcenter() } dependencies { compileOnly "org.projectlombok:lombok:1.16.12" compile 'com.eventsourcing:eventsourcing-inmem:0.4.5' }
es4j によるイベントソーシング
コマンドの作成
コマンドは StandardCommand<S, R>
を継承して作成し、S に状態の型を R に結果の型を指定します。
コマンドを適用することで生じるイベントは events
メソッドの戻り値で返し、コマンド適用結果は result
メソッドの戻り値で返します。
状態は events
メソッドで用意し、result
メソッドの引数として渡ってきます。
まずは、在庫作成のコマンドを実装してみます。
状態の型に InventoryItemCreated
を、結果の型に InventoryItem
を指定しています。
状態を伴う EventStream
を返すために EventStream.ofWithState
を使用します。
コンストラクタの引数名からプロパティを認識するようなので、-parameters
オプションを使ってビルドするか、引数へ @PropertyName
を付与します。(今回は @PropertyName を使用)
なお、状態の型は InventoryItemCreated としなければならないわけでもなく、UUID
を代わりに使っても動作に支障はなさそうでした。
在庫作成コマンド src/main/java/sample/commands/CreateInventoryItem.java
package sample.commands; import com.eventsourcing.EventStream; import com.eventsourcing.Repository; import com.eventsourcing.StandardCommand; import com.eventsourcing.layout.PropertyName; import lombok.EqualsAndHashCode; import lombok.Value; import lombok.val; import sample.domain.InventoryItem; import sample.events.InventoryItemCreated; import sample.events.InventoryItemRenamed; // 在庫作成コマンド @Value @EqualsAndHashCode(callSuper = false) public class CreateInventoryItem extends StandardCommand<InventoryItemCreated, InventoryItem> { private String name; public CreateInventoryItem(@PropertyName("name") String name) { this.name = name; } // イベントの生成 @Override public EventStream<InventoryItemCreated> events() throws Exception { // 在庫作成イベント val created = new InventoryItemCreated(); // 名前変更イベント val renamed = new InventoryItemRenamed(created.uuid(), name); return EventStream.ofWithState(created, created, renamed); } // 結果の生成 @Override public InventoryItem result(InventoryItemCreated state, Repository repository) { return InventoryItem.lookup(repository, state.uuid()).get(); } }
次に、在庫数を加えるためのコマンドです。
ドメインモデルの内部状態を更新するだけなので、状態と結果の型を Void
としました。
状態を伴わないので EventStream.of
で EventStream を返します。
コマンドの適用結果が Void なので result メソッドをオーバーライドする必要もありません。
在庫数追加コマンド src/main/java/sample/commands/CheckInItemsToInventory.java
package sample.commands; import com.eventsourcing.EventStream; import com.eventsourcing.StandardCommand; import com.eventsourcing.layout.PropertyName; import lombok.EqualsAndHashCode; import lombok.Value; import java.util.UUID; import sample.events.ItemsCheckedInToInventory; // 在庫数追加コマンド @Value @EqualsAndHashCode(callSuper = false) public class CheckInItemsToInventory extends StandardCommand<Void, Void> { private UUID reference; private int count; public CheckInItemsToInventory( @PropertyName("reference") UUID reference, @PropertyName("count") int count) { this.reference = reference; this.count = count; } @Override public EventStream<Void> events() throws Exception { return EventStream.of(new ItemsCheckedInToInventory(reference, count)); } }
イベントの作成
イベントは StandardEvent
を継承して作成します。
イベントを検索するための条件を SimpleIndex
を使って定義します。
以下では、uuid の値で InventoryItemCreated を取得できるように、SimpleIndex<InventoryItemCreated, UUID> を定義しています。
在庫作成イベント src/main/java/sample/events/InventoryItemCreated.java
package sample.events; import com.eventsourcing.StandardEvent; import com.eventsourcing.index.SimpleIndex; import java.util.UUID; // 在庫作成イベント public class InventoryItemCreated extends StandardEvent { public static SimpleIndex<InventoryItemCreated, UUID> ID = SimpleIndex.as(InventoryItemCreated::uuid); }
在庫名の更新イベントの場合、最新のものを取得すれば最終的な名称が決定するので、timestamp で比較する SimpleIndex<InventoryItemRenamed, HybridTimestamp>
を定義しています。
なお、名前変更のためのイベントは eventsourcing-cep モジュールに予め用意されていますが(NameChanged
)、今回は自前で実装しています。
在庫名の変更イベント src/main/java/sample/events/InventoryItemRenamed.java
package sample.events; import com.eventsourcing.StandardEvent; import com.eventsourcing.hlc.HybridTimestamp; import com.eventsourcing.index.Index; import com.eventsourcing.index.SimpleIndex; import com.eventsourcing.layout.PropertyName; import lombok.EqualsAndHashCode; import lombok.Value; import java.util.UUID; import static com.eventsourcing.index.IndexEngine.IndexFeature.*; // 在庫名の変更イベント @Value @EqualsAndHashCode(callSuper = false) public class InventoryItemRenamed extends StandardEvent { private UUID reference; private String newName; public InventoryItemRenamed(@PropertyName("reference") UUID reference, @PropertyName("newName") String newName) { this.reference = reference; this.newName = newName; } public static SimpleIndex<InventoryItemRenamed, UUID> ID = SimpleIndex.as(InventoryItemRenamed::uuid); public static SimpleIndex<InventoryItemRenamed, UUID> REFERENCE_ID = SimpleIndex.as(InventoryItemRenamed::getReference); @Index({EQ, LT, GT}) public static SimpleIndex<InventoryItemRenamed, HybridTimestamp> TIMESTAMP = SimpleIndex.as(InventoryItemRenamed::timestamp); }
在庫数の追加イベント src/main/java/sample/events/ItemsCheckedInToInventory.java
package sample.events; import com.eventsourcing.StandardEvent; import com.eventsourcing.index.SimpleIndex; import com.eventsourcing.layout.PropertyName; import lombok.EqualsAndHashCode; import lombok.Value; import java.util.UUID; // 在庫数の追加イベント @Value @EqualsAndHashCode(callSuper = false) public class ItemsCheckedInToInventory extends StandardEvent { private UUID reference; private int count; public ItemsCheckedInToInventory( @PropertyName("reference") UUID reference, @PropertyName("count") int count) { this.reference = reference; this.count = count; } public static SimpleIndex<ItemsCheckedInToInventory, UUID> ID = SimpleIndex.as(ItemsCheckedInToInventory::uuid); public static SimpleIndex<ItemsCheckedInToInventory, UUID> REFERENCE_ID = SimpleIndex.as(ItemsCheckedInToInventory::getReference); }
ドメインプロトコルの作成
ドメインプロトコルという機能を使えば、共通的なプロパティ(例えば name など)をドメインモデルから独立して定義できます。(ドメインモデルへ implements して使う)
Protocol
を extends したインターフェースとして定義し、プロパティの値を(イベントの内容から)確定する処理をデフォルトメソッドとして定義します。
在庫名は一番最後の InventoryItemRenamed の newName で確定するため、latestAssociatedEntity
を使って最後の InventoryItemRenamed イベントを取得します。
なお、名前に関するドメインプロトコルも eventsourcing-cep モジュールに予め用意されています(NameProtocol
)。
在庫名 src/main/java/sample/protocols/InventoryItemNameProtocol.java
package sample.protocols; import com.eventsourcing.Protocol; import com.eventsourcing.queries.ModelQueries; import sample.events.InventoryItemRenamed; // 在庫名 public interface InventoryItemNameProtocol extends Protocol, ModelQueries { default String name() { // reference が id の値に等しい最後の InventoryItemRenamed を取得し // newName の値を返す return latestAssociatedEntity( InventoryItemRenamed.class, InventoryItemRenamed.REFERENCE_ID, InventoryItemRenamed.TIMESTAMP ).map(InventoryItemRenamed::getNewName).orElse(""); } }
次に、在庫数の場合は、ItemsCheckedInToInventory を集計するように Repository
の query
メソッドで該当する全ての ItemsCheckedInToInventory を取得し、count の合計値を算出しています。
在庫数 src/main/java/sample/protocols/InventoryItemCountProtocol.java
package sample.protocols; import com.eventsourcing.EntityHandle; import com.eventsourcing.Protocol; import lombok.val; import java.util.stream.StreamSupport; import sample.events.ItemsCheckedInToInventory; import static com.eventsourcing.index.EntityQueryFactory.equal; // 在庫数 public interface InventoryItemCountProtocol extends Protocol { default int count() { // reference が id の値に等しい ItemsCheckedInToInventory を全て取得 val res = getRepository().query( ItemsCheckedInToInventory.class, equal(ItemsCheckedInToInventory.REFERENCE_ID, getId()) ); return StreamSupport.stream(res.spliterator(), false) .map(EntityHandle::get) .mapToInt(ItemsCheckedInToInventory::getCount) .sum(); } }
ドメインモデルの作成
ドメインモデルは Model
と必要なドメインプロトコルを implements して作成します。
同一インスタンスの判定に id の値だけを使うように equals と hashCode をオーバーライドします。(今回は @EqualsAndHashCode(of = "id")
で実施)
CreateInventoryItem コマンドの result メソッドで使った lookup メソッドの実装も行います。
src/main/java/sample/domain/InventoryItem.java
package sample.domain; import com.eventsourcing.Model; import com.eventsourcing.Repository; import com.eventsourcing.queries.ModelQueries; import lombok.EqualsAndHashCode; import lombok.Value; import java.util.Optional; import java.util.UUID; import sample.events.InventoryItemCreated; import sample.protocols.InventoryItemCountProtocol; import sample.protocols.InventoryItemNameProtocol; @Value @EqualsAndHashCode(of = "id") public class InventoryItem implements Model, InventoryItemNameProtocol, InventoryItemCountProtocol { private final Repository repository; private final UUID id; protected InventoryItem(Repository repository, UUID id) { this.repository = repository; this.id = id; } public static Optional<InventoryItem> lookup(Repository repository, UUID id) { // InventoryItemCreated.ID インデックス(SimpleIndex)を使って // id に合致する InventoryItemCreated を取得 Optional<InventoryItemCreated> res = ModelQueries.lookup( repository, InventoryItemCreated.class, InventoryItemCreated.ID, id ); return res.map(ev -> new InventoryItem(repository, id)); } }
実行クラスの作成
es4j では Repository
を使ってイベントソーシングの処理を行います。
Repository は StandardRepository
へ以下のような設定を行う事で構築できます。
- (1) Journal と IndexEngine の設定
- (2) コマンドとイベントの設定(CommandSetProvider と EventSetProvider)
StandardRepository のデフォルト設定では localhost の NTP サーバーを使うようになっているようですが (NTP サーバーが無いとエラーになる)、今回はデフォルト設定の代わりに PhysicalTimeProvider 実装クラスを用意しました。
(2) で PackageXXXSetProvider を使うと指定のパッケージへ属するコマンドやイベントを一括で設定できます。
コマンドを適用する際は publish
メソッドを使います。(publish の戻り値は CompletableFuture
)
src/main/java/SampleApp.java
import com.eventsourcing.*; import com.eventsourcing.hlc.PhysicalTimeProvider; import com.eventsourcing.index.MemoryIndexEngine; import com.eventsourcing.inmem.MemoryJournal; import com.eventsourcing.repository.StandardRepository; import com.google.common.util.concurrent.AbstractService; import java.util.concurrent.*; import lombok.val; import sample.commands.CheckInItemsToInventory; import sample.commands.CreateInventoryItem; import sample.domain.InventoryItem; import sample.events.InventoryItemCreated; public class SampleApp { public static void main(String... args) { // (1) Journal と IndexEngine の設定 val repository = StandardRepository.builder() .journal(new MemoryJournal()) .indexEngine(new MemoryIndexEngine()) .physicalTimeProvider(new SampleTimeProvider()) .build(); // (2) コマンドとイベントの設定(CommandSetProvider と EventSetProvider) repository.addCommandSetProvider( new PackageCommandSetProvider(new Package[] {CreateInventoryItem.class.getPackage()})); repository.addEventSetProvider( new PackageEventSetProvider(new Package[] {InventoryItemCreated.class.getPackage()})); // 開始 repository.startAsync().awaitRunning(); try { // CreateInventoryItem コマンド適用 val d = repository.publish(new CreateInventoryItem("sample1")).get(); dumpInventoryItem(d); // CheckInItemsToInventory コマンド適用 repository.publish(new CheckInItemsToInventory(d.getId(), 5)).get(); dumpInventoryItem(d); // CheckInItemsToInventory コマンド適用 repository.publish(new CheckInItemsToInventory(d.getId(), 3)).get(); dumpInventoryItem(d); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } finally { // 停止 stopRepository(repository); } } private static InventoryItem dumpInventoryItem(InventoryItem item) { System.out.println("----- InventoryItem -----"); System.out.println("id: " + item.getId()); System.out.println("name: " + item.name()); System.out.println("count: " + item.count()); return item; } private static void stopRepository(Repository repository) { System.out.println("stop..."); try { repository.stopAsync().awaitTerminated(10, TimeUnit.SECONDS); } catch (TimeoutException ex) { ex.printStackTrace(); } } // PhysicalTimeProvider の実装 static class SampleTimeProvider extends AbstractService implements PhysicalTimeProvider { @Override public long getPhysicalTime() { return System.currentTimeMillis(); } @Override protected void doStart() { System.out.println("timeprovider start..."); notifyStarted(); } @Override protected void doStop() { System.out.println("timeprovider stop..."); notifyStopped(); } } }
publish の呼び出し箇所は CompletableFuture で処理を繋げれば以下のように実装できます。
src/main/java/SampleApp.java(CompletableFuture 活用版)
public class SampleApp { public static void main(String... args) { ・・・ repository.publish(new CreateInventoryItem("sample1")) .thenApply(SampleApp::dumpInventoryItem) .thenCompose(d -> repository.publish(new CheckInItemsToInventory(d.getId(), 5)) .thenApply(v -> d) ) .thenApply(SampleApp::dumpInventoryItem) .thenCompose(d -> repository.publish(new CheckInItemsToInventory(d.getId(), 3)) .thenApply(v -> d) ) .thenApply(SampleApp::dumpInventoryItem) .whenComplete((d, e) -> stopRepository(repository)); } ・・・ }
実行結果
> gradle -q run [main] INFO org.reflections.Reflections - Reflections took 64 ms to scan 1 urls, producing 2 keys and 4 values [main] INFO org.reflections.Reflections - Reflections took 17 ms to scan 1 urls, producing 2 keys and 6 values timeprovider start... ----- InventoryItem ----- id: d7636355-850d-4a15-9e85-ce0f1de514e7 name: sample1 count: 0 ----- InventoryItem ----- id: d7636355-850d-4a15-9e85-ce0f1de514e7 name: sample1 count: 5 ----- InventoryItem ----- id: d7636355-850d-4a15-9e85-ce0f1de514e7 name: sample1 count: 8 stop... timeprovider stop...
備考
実は、今回のサンプルを実行すると処理が終わってもプロセスは終了しません。
スレッドダンプを出力してみたところ、StandardEntity に原因がありそうです。
スレッドダンプ出力例
> jcmd 8904 Thread.print ・・・ "Thread-1" #11 prio=5 os_prio=0 ・・・ waiting on condition [・・・] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <・・・> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) at java.util.concurrent.LinkedBlockingDeque.putLast(LinkedBlockingDeque.java:396) at java.util.concurrent.LinkedBlockingDeque.put(LinkedBlockingDeque.java:649) at com.eventsourcing.StandardEntity.lambda$static$0(StandardEntity.java:27) at com.eventsourcing.StandardEntity$$Lambda$14/540159270.run(Unknown Source) at java.lang.Thread.run(Thread.java:745) ・・・
StandardEntity のソース(27行目付近)を見てみると、以下のようにスレッドを開始して放置していました。
com.eventsourcing.StandardEntity のソース
・・・ public abstract class StandardEntity<E extends Entity<E>> implements Entity<E> { private static LinkedBlockingDeque<UUID> uuids = new LinkedBlockingDeque<>(10_000); static { new Thread(() -> { while (true) { try { uuids.put(UUID.randomUUID()); // 27行目 } catch (InterruptedException e) { } } }).start(); } ・・・