Groovy で Elasticsearch を組み込み実行
Groovy で Elasticsearch を組み込み実行してみました。
今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170203/
(a) クライアント接続しない場合
まずは、クライアント接続が不可な Elasticsearch を起動して、ドキュメント登録や検索を行ってみます。
ポート番号 | クライアント接続 |
---|---|
9200 (HTTP) | × |
9300 (Transport) | × |
Elasticsearch の組み込み実行は、適切な設定を行った Settings
(もしくは Environment
)を使って Node
を作成し start
を実行するだけです。
path.home
の設定は必須で、指定したパスの data ディレクトリを使用します。(無ければ自動的に作成されます)
transport.type
を local
へ、http.enabled
を false
へ設定すればクライアントの接続を受け付けない状態になります。※
※ クライアント接続を受け付けるためのプラグインを適用していない場合、 このように設定しておかないと実行時にエラーとなります
この場合、Node の client
メソッドで取得した Client
を使ってインデックス等を操作します。
els_local.groovy
@Grab('org.elasticsearch:elasticsearch:5.2.0') // log4j のモジュールが必要(無い場合は NoClassDefFoundError が発生) @Grab('org.apache.logging.log4j:log4j-api:2.8') @Grab('org.apache.logging.log4j:log4j-core:2.8') import org.elasticsearch.common.settings.Settings import org.elasticsearch.node.Node def index = args[0] // インデックス def type = args[1] // タイプ // 設定 def setting = Settings.builder() .put('path.home', '.') // data ディレクトリの配置先を指定 .put('transport.type', 'local') .put('http.enabled', 'false') .build() new Node(setting).withCloseable { node -> // Elasticsearch の実行 node.start() node.client().withCloseable { client -> // インデックスへのドキュメント登録 def r1 = client.prepareIndex(index, type) .setSource('time', System.currentTimeMillis()) .execute() .get() println r1 // 検索結果へ即時反映されなかったので適度に待機 sleep(1000) println '-----' // 検索 def r2 = client.prepareSearch(index) .setTypes(type) .execute() .get() println r2 } }
動作確認
実行結果は以下の通りです。
log4j2 の設定ファイルが見つからない旨のエラーログが出力されていますが、 Elasticsearch の組み込み実行は成功しているようです。
実行結果
> groovy els_local.groovy a1 item ERROR StatusLogger No log4j2 configuration file found. ・・・ IndexResponse[index=a1,type=item,id=AVn6bOp-0Vu_EXj66Fj9,version=1,result=created,shards={"_shards":{"total":2,"successful":1,"failed":0}}] ----- {"took":140,"timed_out":false,"_shards":{"total":5,"successful":5,"failed":0},"hits":{"total":1,"max_score":1.0,"hits":[{"_index":"a1","_type":"item","_id":"AVn6bOp-0Vu_EXj66Fj9","_score":1.0,"_source":{"time":1485965157491}}]}}
(b) クライアント接続する場合
次に、クライアント接続が可能な Elasticsearch を組み込み実行します。
ポート番号 | クライアント接続 |
---|---|
9200 (HTTP) | ○ |
9300 (Transport) | ○ |
これらのポート番号でクライアント接続を受けるにはプラグインが必要なので、
今回は Netty4Plugin
(transport-netty4-client) を使用しました。
Node の public コンストラクタはプラグインクラスを直接指定できないため、今回は protected コンストラクタ ※ を直接呼び出して Netty4Plugin を適用しています。
※ 第 2引数で Plugin クラスを指定できるようになっている protected Node(Environment environment, Collection<Class<? extends Plugin>> plugins)
なお、close
メソッドの実行有無に関係なく Node は終了してしまうので、System.in.read()
を使って終了を止めています。
els_netty.groovy
@Grab('org.elasticsearch:elasticsearch:5.2.0') @Grab('org.elasticsearch.plugin:transport-netty4-client:5.2.0') @Grab('org.apache.logging.log4j:log4j-api:2.8') @Grab('org.apache.logging.log4j:log4j-core:2.8') import org.elasticsearch.common.settings.Settings import org.elasticsearch.env.Environment import org.elasticsearch.node.Node import org.elasticsearch.transport.Netty4Plugin def setting = Settings.builder() .put('path.home', '.') .build() def env = new Environment(setting) // Netty4Plugin を指定して Node をインスタンス化 new Node(env, [Netty4Plugin]).withCloseable { node -> node.start() println 'started server ...' // 終了するのを止めるための措置 System.in.read() }
動作確認
Elasticsearch を実行します。
Elasticsearch 組み込み実行
> groovy els_netty.groovy ERROR StatusLogger No log4j2 configuration file found. ・・・ started server ...
(1) HTTP 接続(9200 ポート)
curl で 9200 ポートへ接続した結果は以下の通りです。
実行結果(HTTP)
$ curl -s http://localhost:9200/b1/item -d "{\"time\": `date +%s%3N`}" {"_index":"b1","_type":"item","_id":"AVn6rxw5O4vwdZ1ibdCo","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"created":true} $ curl -s http://localhost:9200/b1/item/_search {"took":78,"timed_out":false,"_shards":{"total":5,"successful":5,"failed":0},"hits":{"total":1,"max_score":1.0,"hits":[{"_index":"b1","_type":"item","_id":"AVn6rxw5O4vwdZ1ibdCo","_score":1.0,"_source":{"time": 1485969495792}}]}}
(2) Transport 接続(9300 ポート)
9300 ポートへ接続して (a) と同等のクライアント処理を実施するスクリプトは以下のようになります。
9300 ポートへ接続するために TransportClient
を使用しています。
els_client.groovy
@Grab('org.elasticsearch.client:transport:5.2.0') @Grab('org.apache.logging.log4j:log4j-api:2.8') @Grab('org.apache.logging.log4j:log4j-core:2.8') import org.elasticsearch.common.settings.Settings import org.elasticsearch.transport.client.PreBuiltTransportClient import org.elasticsearch.common.transport.InetSocketTransportAddress def index = args[0] def type = args[1] def addr = new InetSocketTransportAddress( InetAddress.getLoopbackAddress(), 9300) // TransportClient の生成 def transportClient = new PreBuiltTransportClient(Settings.EMPTY) .addTransportAddress(addr) transportClient.withCloseable { client -> // インデックスへのドキュメント登録 def r1 = client.prepareIndex(index, type) .setSource('time', System.currentTimeMillis()) .execute() .get() println r1 sleep(1000) println '-----' // 検索 def r2 = client.prepareSearch(index) .setTypes(type) .execute() .get() println r2 }
実行結果は以下の通りです。
実行結果(Transport)
> groovy els_client.groovy b2 item ・・・ IndexResponse[index=b2,type=item,id=AVn6tvd3WlzxY0bEAQ4r,version=1,result=created,shards={"_shards":{"total":2,"successful":1,"failed":0}}] ----- {"took":78,"timed_out":false,"_shards":{"total":5,"successful":5,"failed":0},"hits":{"total":1,"max_score":1.0,"hits":[{"_index":"b2","_type":"item","_id":"AVn6tvd3WlzxY0bEAQ4r","_score":1.0,"_source":{"time":1485970011680}}]}}
備考. HTTP 接続の無効化
9200 ポートの接続だけを無効化するには http.enabled
を false
にします。
ポート番号 | クライアント接続 |
---|---|
9200 (HTTP) | × |
9300 (Transport) | ○ |
els_netty_nohttp.groovy
・・・ def setting = Settings.builder() .put('path.home', '.') .put('http.enabled', 'false') // HTTP 接続(9200 ポート)の無効化 .build() ・・・ new Node(env, [Netty4Plugin]).withCloseable { node -> ・・・ }
Akka の FileIO でファイルを読み書き
Akka (akka-stream) の FileIO
を使ってファイルの読み書きを行ってみます。
今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170131/
はじめに
akka-stream では Java 用の API は akka.stream.javadsl
パッケージに、Scala 用の API は akka.stream.scaladsl
パッケージに定義されています。
主要なクラスやメソッドは javadsl と scaladsl で概ね共通化されているようですが、Source
の recover
メソッドは deprecated の有無が違っていました。
Source の recover/recoverWith メソッドの deprecated 状況
パッケージ | deprecated 有り | deprecated 無し | 備考 |
---|---|---|---|
akka.stream.scaladsl | recoverWith | recover | ソースは akka/stream/scaladsl/Flow.scala |
akka.stream.javadsl | recover | recoverWith | ソースは akka/stream/javadsl/Source.scala |
ここで、deprecated の理由が @deprecated("Use recoverWithRetries instead.", "2.4.4")
のようなので、scaladsl の方が正しく、javadsl の方は deprecated するメソッドが違っているのだと思います ※。
※ recoverWith の代わりに recoverWithRetries を使えるが (引数が 1つ増えただけなので) recover は引数の型がそもそも違う
Scala の場合
まずは Scala で実装してみます。
ファイル用の Sink は FileIO.toPath
で、Source は FileIO.fromPath
で取得できます。
ファイルの入出力の型は akka.util.ByteString
となっており、以下では map を使って String との変換を行っています。
Source の recover
メソッドを使えば、例外が発生していた場合に代用の値へ差し替えて処理を繋げられます。
今回は、ファイルが存在しない等で IOException が発生した際に、"invalid file, ・・・" という文字列へ差し替えるために recover を使っています。
src/main/scala/SampleApp.scala
import akka.actor.ActorSystem import akka.stream.ActorMaterializer import akka.stream.scaladsl.{FileIO, Source} import akka.util.ByteString import java.nio.file.Paths import java.io.IOException object SampleApp extends App { implicit val system = ActorSystem() import system.dispatcher // ExecutionContext を implicit implicit val materializer = ActorMaterializer() // ファイルへの書き込み(sample1.txt へ "sample data" を出力) val res1 = Source.single("sample data") .map(ByteString.fromString) .runWith(FileIO.toPath(Paths.get("sample1.txt"))) // ファイルの読み込み(sample2.txt の内容を println) val res2 = FileIO.fromPath(Paths.get("sample2.txt")) .map(_.utf8String) .recover { case e: IOException => s"invalid file, ${e}" } .runForeach(println) res1.flatMap(_ => res2).onComplete(_ => system.terminate) }
ビルドと実行
Gradle のビルド定義ファイルは以下の通りです。
build.gradle
apply plugin: 'scala' apply plugin: 'application' mainClassName = 'SampleApp' repositories { jcenter() } dependencies { compile 'org.scala-lang:scala-library:2.12.1' compile 'com.typesafe.akka:akka-stream_2.12:2.5-M1' }
sample2.txt ファイルの無い状態で実行します。
実行結果1
> gradle -q run invalid file, java.nio.file.NoSuchFileException: sample2.txt
sample2.txt を作成して実行します。
実行結果2
> echo %time% > sample2.txt > gradle -q run 22:11:48.03
Java の場合
次に Java で実装してみます。
Scala と概ね同じですが、akka.stream.javadsl.Source でも recover
の引数が scala.PartialFunction
となっており、多少の工夫が必要です。
recover の引数に合う scala.PartialFunction は Akka の akka.japi.pf.PFBuilder
で作れるので、以下では Match.match
から PFBuilder を取得して使っています。
なお、Akka では scala.PartialFunction を組み立てるための Java 用の API が akka.japi.pf
パッケージにいくつか用意されています。
src/main/java/SampleApp.java
import akka.actor.ActorSystem; import akka.japi.pf.Match; import akka.japi.pf.PFBuilder; import akka.stream.ActorMaterializer; import akka.stream.Materializer; import akka.stream.javadsl.FileIO; import akka.stream.javadsl.Source; import akka.util.ByteString; import java.io.IOException; import java.nio.file.Paths; import java.util.concurrent.CompletableFuture; public class SampleApp { public static void main(String... args) { final ActorSystem system = ActorSystem.create(); final Materializer materializer = ActorMaterializer.create(system); // ファイルへの書き込み(sample1.txt へ "sample data" を出力) CompletableFuture<?> res1 = Source.single("sample data") .map(ByteString::fromString) .runWith(FileIO.toPath(Paths.get("sample1.txt")), materializer) .toCompletableFuture(); // scala.PartialFunction のビルダー定義 PFBuilder<Throwable, String> pfunc = Match.match(IOException.class, e -> "invalid file, " + e); // ファイルの読み込み(sample2.txt の内容を println) CompletableFuture<?> res2 = FileIO.fromPath(Paths.get("sample2.txt")) .map(ByteString::utf8String) .recover(pfunc.build()) .runForeach(System.out::println, materializer) .toCompletableFuture(); CompletableFuture.allOf(res1, res2) .handle((v, e) -> system.terminate()) .join(); } }
ビルドと実行
Gradle のビルド定義ファイルは以下の通りです。
build.gradle
apply plugin: 'application' mainClassName = 'SampleApp' repositories { jcenter() } dependencies { compile 'com.typesafe.akka:akka-stream_2.12:2.5-M1' }
sample2.txt ファイルの無い状態で実行します。
実行結果1
> gradle -q run ・・・\src\main\java\SampleApp.javaは非推奨のAPIを使用またはオーバーライドしています。 ・・・ invalid file, java.nio.file.NoSuchFileException: sample2.txt
recover が deprecated されている件で警告メッセージが出力されますが、上述したように recover を deprecated しているのが誤りだと思われるので無視します。
sample2.txt を作成して実行します。
実行結果2
> echo %time% > sample2.txt > gradle -q run ・・・ 22:14:16.00
Sourcerer でイベントソーシング
「Axon Framework でイベントソーシング」 や 「es4j でイベントソーシング」 と同様の処理を Sourcerer で実装してみました。
今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170110/
はじめに
以下のような Gradle ビルド定義ファイルを使います。
Sourcerer はイベントの保存先として Event Store を使用するため、今回は Event Store クライアントライブラリに esjc
を使う sourcerer-esjc
モジュールを使用します。
build.gradle
apply plugin: 'application' mainClassName = 'SampleApp' repositories { jcenter() } dependencies { compileOnly 'org.projectlombok:lombok:1.16.12' compile 'org.elder.sourcerer:sourcerer-esjc:v4.0.1' compile 'com.fasterxml.jackson.core:jackson-databind:2.8.5' compile 'io.javaslang:javaslang:2.1.0-alpha' }
lombok
と javaslang
は必須ではありません。(javaslang はパターンマッチ処理に使いました)
Sourcerer によるイベントソーシング
コマンドの作成
コマンドは特に注意するところはなく、普通の Java クラスとして実装するだけです。
ここで定義したコマンドは、CommandFactory で作成した Command の引数 (Arguments) として使う事になります。
在庫作成コマンド src/main/java/sample/commands/CreateInventoryItem.java
package sample.commands; import lombok.Value; // 在庫作成コマンド @Value public class CreateInventoryItem { private String id; private String name; }
在庫数追加コマンド src/main/java/sample/commands/CheckInItemsToInventory.java
package sample.commands; import lombok.Value; // 在庫数追加コマンド @Value public class CheckInItemsToInventory { private int count; }
イベントの作成
イベントの型へ @EventType
を付与します。
AggregateProjection
等で指定するイベントの型は 1つなので、まずはベースとなるインターフェースを定義します。
イベントの内容は JSON 化して Event Store へ保存する事になりますが、デフォルトでは JSON へ型情報が付与されず、複数のイベント型を使うと JSON からイベントオブジェクトを復元する際に不都合が生じます。
これを回避するために @JsonTypeInfo
と @JsonSubTypes
を使って JSON へ型情報を残すように設定しています ※。
※ @JsonTypeInfo の use へ NAME を指定した場合は "@type":"クラス名" の情報を JSON へ出力します
イベントのベースインターフェース src/main/java/sample/events/InventoryEvent.java
package sample.events; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonSubTypes.Type; import com.fasterxml.jackson.annotation.JsonTypeInfo; import org.elder.sourcerer.EventType; @EventType // JSON へ型情報を残すための設定 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME) @JsonSubTypes({ @Type(InventoryItemCreated.class), @Type(InventoryItemRenamed.class), @Type(ItemsCheckedInToInventory.class) }) public interface InventoryEvent { }
在庫作成イベント src/main/java/sample/events/InventoryItemCreated.java
package sample.events; import lombok.Value; // 在庫作成イベント @Value public class InventoryItemCreated implements InventoryEvent { private String id; }
在庫名の変更イベント src/main/java/sample/events/InventoryItemRenamed.java
package sample.events; import lombok.Value; // 在庫名の変更イベント @Value public class InventoryItemRenamed implements InventoryEvent { private String name; }
在庫数の追加イベント src/main/java/sample/events/ItemsCheckedInToInventory.java
package sample.events; import lombok.Value; // 在庫数の追加イベント @Value public class ItemsCheckedInToInventory implements InventoryEvent { private int count; }
コマンドハンドラの作成
コマンドを処理してイベントを生成するメソッドを定義します。
引数や戻り値の型は CommandFactory
の fromOperation
メソッドへ渡す値 (Operations
の各種メソッドで生成) に応じて調整します。
単一のコマンドから複数のイベントを生成する場合は戻り値を List<イベント型>
とするだけのようです。
src/main/java/sample/InventoryOperations.java
package sample; import com.google.common.collect.ImmutableList; import sample.commands.CheckInItemsToInventory; import sample.commands.CreateInventoryItem; import sample.events.InventoryEvent; import sample.events.InventoryItemCreated; import sample.events.InventoryItemRenamed; import sample.events.ItemsCheckedInToInventory; import java.util.List; public class InventoryOperations { // 作成コマンドの処理 public static List<InventoryEvent> create(CreateInventoryItem cmd) { return ImmutableList.of( new InventoryItemCreated(cmd.getId()), new InventoryItemRenamed(cmd.getName()) ); } // 在庫数追加コマンドの処理 public static ItemsCheckedInToInventory checkIn(InventoryItem state, CheckInItemsToInventory cmd) { return new ItemsCheckedInToInventory(cmd.getCount()); } }
AggregateProjection の作成
AggregateProjection<状態の型, イベントの型>
を実装して、イベントから状態を復元する処理を実装します。
今回は javaslang モジュールの Match
と Case
を使ってパターンマッチで処理しています。
src/main/java/sample/InventoryProjection.java
package sample; import org.elder.sourcerer.AggregateProjection; import org.jetbrains.annotations.NotNull; import sample.events.InventoryItemCreated; import sample.events.InventoryEvent; import sample.events.InventoryItemRenamed; import sample.events.ItemsCheckedInToInventory; import static javaslang.API.*; import static javaslang.Predicates.*; public class InventoryProjection implements AggregateProjection<InventoryItem, InventoryEvent>{ @NotNull @Override public InventoryItem empty() { return new InventoryItem("", "", 0); } @NotNull @Override public InventoryItem apply(@NotNull String id, @NotNull InventoryItem state, @NotNull InventoryEvent event) { return Match(event).of( // InventoryItemCreated イベントの場合 Case(instanceOf(InventoryItemCreated.class), ev -> state.withId(ev.getId())), // InventoryItemRenamed イベントの場合 Case(instanceOf(InventoryItemRenamed.class), ev -> state.withName(ev.getName())), // ItemsCheckedInToInventory イベントの場合 Case(instanceOf(ItemsCheckedInToInventory.class), ev -> state.withCount(state.getCount() + ev.getCount())), // その他 Case($(), state) ); } }
在庫の状態 InventoryItem
は以下のように実装しました。
src/main/java/sample/InventoryItem.java
package sample; import lombok.Value; @Value public class InventoryItem { private String id; private String name; private int count; public InventoryItem withId(String newId) { return new InventoryItem(newId, name, count); } public InventoryItem withName(String newName) { return new InventoryItem(id, newName, count); } public InventoryItem withCount(int newCount) { return new InventoryItem(id, name, newCount); } }
実行クラスの作成
sourcerer-esjc-spring モジュールを使う方が楽だと思われますが、今回は AggregateRepository
と CommandFactory
の組み立て処理を自前で実装してみました。
- (1) EventStore クライアント作成
- (2) EventRepositoryFactory 作成
- (3) EventRepository 作成
- (4) AggregateRepository 作成
- (5) CommandFactory 作成
CommandFactory へコマンドハンドラのメソッドをそれぞれ指定して Command
を作成します。(コマンドハンドラメソッドのシグネチャに対応する Operations
のメソッドを使います)
Command へ aggregateId を設定し arguments へコマンドの内容を設定した後、run
を実行するとコマンドが適用されます。
src/main/java/SampleApp.java
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.github.msemys.esjc.EventStoreBuilder; import lombok.val; import org.elder.sourcerer.*; import org.elder.sourcerer.esjc.EventStoreEsjcEventRepositoryFactory; import sample.*; import sample.commands.CheckInItemsToInventory; import sample.commands.CreateInventoryItem; import sample.events.InventoryEvent; import java.util.UUID; public class SampleApp { public static void main(String... args) throws JsonProcessingException { // (1) EventStore クライアント作成 val eventStore = EventStoreBuilder.newBuilder() // ポート番号に 1113 を使う点に注意 (2113 ではない) .singleNodeAddress("localhost", 1113) .build(); val mapper = new ObjectMapper(); // (2) EventRepositoryFactory 作成 val eventRepositoryFactory = new EventStoreEsjcEventRepositoryFactory( eventStore, mapper, "sample" ); // (3) EventRepository 作成 val eventRepository = eventRepositoryFactory.getEventRepository(InventoryEvent.class); // (4) AggregateRepository 作成 val aggregateRepository = new DefaultAggregateRepository<>( eventRepository, new InventoryProjection() ); // (5) CommandFactory 作成 val commandFactory = new DefaultCommandFactory<>(aggregateRepository); // 在庫作成処理 val createCommand = commandFactory.fromOperation( Operations.constructorOf(InventoryOperations::create)); // 在庫数の追加処理 val checkInCommand = commandFactory.fromOperation( Operations.updateOf(InventoryOperations::checkIn)); val id = UUID.randomUUID().toString(); // 在庫作成の実行 val r1 = createCommand.setAggregateId(id) .setArguments(new CreateInventoryItem(id, "sample")) .run(); printCommandResult(r1); // 在庫数の追加の実行(1回目) val r2 = checkInCommand.setAggregateId(id) .setArguments(new CheckInItemsToInventory(5)) .run(); printCommandResult(r2); // 在庫数の追加の実行(2回目) val r3 = checkInCommand.setAggregateId(id) .setArguments(new CheckInItemsToInventory(3)) .run(); printCommandResult(r3); // 集約の内容を取得 val aggregate = aggregateRepository.load(id); // 状態を出力 System.out.println("*** result state: " + aggregate.state()); eventStore.disconnect(); } private static void printCommandResult(CommandResult<? extends InventoryEvent> result) { System.out.printf( "*** command result: events=%s, newVersion=%s\n", result.getEvents(), result.getNewVersion() ); } }
実行
https://geteventstore.com/ から Event Store の実行環境をダウンロードして、起動しておきます。
Event Store 起動
> EventStore.ClusterNode.exe --db ./db --log ./logs ・・・ [08912,14,15:31:00.488] Starting Normal TCP listening on TCP endpoint: 127.0.0.1:1113. ・・・ [08912,09,15:31:07.100] Created stats stream '$stats-127.0.0.1:2113', code = Success
サンプルアプリケーションの実行
> gradle -q run ・・・ *** command result: events=[InventoryItemCreated(id=4184fd14-7725-4d09-8e24-59e9a94859a1), InventoryItemRenamed(name=sample)], newVersion=1 ・・・ *** command result: events=[ItemsCheckedInToInventory(count=5)], newVersion=2 ・・・ *** command result: events=[ItemsCheckedInToInventory(count=3)], newVersion=3 ・・・ *** result state: InventoryItem(id=4184fd14-7725-4d09-8e24-59e9a94859a1, name=sample, count=8) ・・・
Event Store への登録内容を確認するため、http://127.0.0.1:2113/ へアクセスして 「Stream Browser」 で sample:・・・・
の内容を見てみました。
備考
今回のサンプルも処理が終わった後(disconnect 実行後)にプロセスが終了しません。
原因を詳しく調べていませんが、Event Store クライアントライブラリの esjc 1.6.0 を単体で試した際に、以下が原因でプロセスが終了しないようだったので、esjc が原因かもしれません。
- executor 未指定の際に Settings 内で作る ThreadPoolExecutor の shutdown を実行しない
- EventStoreTcp 内の NioEventLoopGroup の shutdownGracefully を実行しない
es4j でイベントソーシング
「Axon Framework でイベントソーシング」 と同様の処理を es4j (Eventsourcing for Java) で実装してみました。
今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170107/
はじめに
今回はイベントをインメモリで保持する eventsourcing-inmem
を使うため、以下のような Gradle 用ビルド定義を用意しました。 (lombok は便利なので使います)
build.gradle
apply plugin: 'application' mainClassName = 'SampleApp' repositories { jcenter() } dependencies { compileOnly "org.projectlombok:lombok:1.16.12" compile 'com.eventsourcing:eventsourcing-inmem:0.4.5' }
es4j によるイベントソーシング
コマンドの作成
コマンドは StandardCommand<S, R>
を継承して作成し、S に状態の型を R に結果の型を指定します。
コマンドを適用することで生じるイベントは events
メソッドの戻り値で返し、コマンド適用結果は result
メソッドの戻り値で返します。
状態は events
メソッドで用意し、result
メソッドの引数として渡ってきます。
まずは、在庫作成のコマンドを実装してみます。
状態の型に InventoryItemCreated
を、結果の型に InventoryItem
を指定しています。
状態を伴う EventStream
を返すために EventStream.ofWithState
を使用します。
コンストラクタの引数名からプロパティを認識するようなので、-parameters
オプションを使ってビルドするか、引数へ @PropertyName
を付与します。(今回は @PropertyName を使用)
なお、状態の型は InventoryItemCreated としなければならないわけでもなく、UUID
を代わりに使っても動作に支障はなさそうでした。
在庫作成コマンド src/main/java/sample/commands/CreateInventoryItem.java
package sample.commands; import com.eventsourcing.EventStream; import com.eventsourcing.Repository; import com.eventsourcing.StandardCommand; import com.eventsourcing.layout.PropertyName; import lombok.EqualsAndHashCode; import lombok.Value; import lombok.val; import sample.domain.InventoryItem; import sample.events.InventoryItemCreated; import sample.events.InventoryItemRenamed; // 在庫作成コマンド @Value @EqualsAndHashCode(callSuper = false) public class CreateInventoryItem extends StandardCommand<InventoryItemCreated, InventoryItem> { private String name; public CreateInventoryItem(@PropertyName("name") String name) { this.name = name; } // イベントの生成 @Override public EventStream<InventoryItemCreated> events() throws Exception { // 在庫作成イベント val created = new InventoryItemCreated(); // 名前変更イベント val renamed = new InventoryItemRenamed(created.uuid(), name); return EventStream.ofWithState(created, created, renamed); } // 結果の生成 @Override public InventoryItem result(InventoryItemCreated state, Repository repository) { return InventoryItem.lookup(repository, state.uuid()).get(); } }
次に、在庫数を加えるためのコマンドです。
ドメインモデルの内部状態を更新するだけなので、状態と結果の型を Void
としました。
状態を伴わないので EventStream.of
で EventStream を返します。
コマンドの適用結果が Void なので result メソッドをオーバーライドする必要もありません。
在庫数追加コマンド src/main/java/sample/commands/CheckInItemsToInventory.java
package sample.commands; import com.eventsourcing.EventStream; import com.eventsourcing.StandardCommand; import com.eventsourcing.layout.PropertyName; import lombok.EqualsAndHashCode; import lombok.Value; import java.util.UUID; import sample.events.ItemsCheckedInToInventory; // 在庫数追加コマンド @Value @EqualsAndHashCode(callSuper = false) public class CheckInItemsToInventory extends StandardCommand<Void, Void> { private UUID reference; private int count; public CheckInItemsToInventory( @PropertyName("reference") UUID reference, @PropertyName("count") int count) { this.reference = reference; this.count = count; } @Override public EventStream<Void> events() throws Exception { return EventStream.of(new ItemsCheckedInToInventory(reference, count)); } }
イベントの作成
イベントは StandardEvent
を継承して作成します。
イベントを検索するための条件を SimpleIndex
を使って定義します。
以下では、uuid の値で InventoryItemCreated を取得できるように、SimpleIndex<InventoryItemCreated, UUID> を定義しています。
在庫作成イベント src/main/java/sample/events/InventoryItemCreated.java
package sample.events; import com.eventsourcing.StandardEvent; import com.eventsourcing.index.SimpleIndex; import java.util.UUID; // 在庫作成イベント public class InventoryItemCreated extends StandardEvent { public static SimpleIndex<InventoryItemCreated, UUID> ID = SimpleIndex.as(InventoryItemCreated::uuid); }
在庫名の更新イベントの場合、最新のものを取得すれば最終的な名称が決定するので、timestamp で比較する SimpleIndex<InventoryItemRenamed, HybridTimestamp>
を定義しています。
なお、名前変更のためのイベントは eventsourcing-cep モジュールに予め用意されていますが(NameChanged
)、今回は自前で実装しています。
在庫名の変更イベント src/main/java/sample/events/InventoryItemRenamed.java
package sample.events; import com.eventsourcing.StandardEvent; import com.eventsourcing.hlc.HybridTimestamp; import com.eventsourcing.index.Index; import com.eventsourcing.index.SimpleIndex; import com.eventsourcing.layout.PropertyName; import lombok.EqualsAndHashCode; import lombok.Value; import java.util.UUID; import static com.eventsourcing.index.IndexEngine.IndexFeature.*; // 在庫名の変更イベント @Value @EqualsAndHashCode(callSuper = false) public class InventoryItemRenamed extends StandardEvent { private UUID reference; private String newName; public InventoryItemRenamed(@PropertyName("reference") UUID reference, @PropertyName("newName") String newName) { this.reference = reference; this.newName = newName; } public static SimpleIndex<InventoryItemRenamed, UUID> ID = SimpleIndex.as(InventoryItemRenamed::uuid); public static SimpleIndex<InventoryItemRenamed, UUID> REFERENCE_ID = SimpleIndex.as(InventoryItemRenamed::getReference); @Index({EQ, LT, GT}) public static SimpleIndex<InventoryItemRenamed, HybridTimestamp> TIMESTAMP = SimpleIndex.as(InventoryItemRenamed::timestamp); }
在庫数の追加イベント src/main/java/sample/events/ItemsCheckedInToInventory.java
package sample.events; import com.eventsourcing.StandardEvent; import com.eventsourcing.index.SimpleIndex; import com.eventsourcing.layout.PropertyName; import lombok.EqualsAndHashCode; import lombok.Value; import java.util.UUID; // 在庫数の追加イベント @Value @EqualsAndHashCode(callSuper = false) public class ItemsCheckedInToInventory extends StandardEvent { private UUID reference; private int count; public ItemsCheckedInToInventory( @PropertyName("reference") UUID reference, @PropertyName("count") int count) { this.reference = reference; this.count = count; } public static SimpleIndex<ItemsCheckedInToInventory, UUID> ID = SimpleIndex.as(ItemsCheckedInToInventory::uuid); public static SimpleIndex<ItemsCheckedInToInventory, UUID> REFERENCE_ID = SimpleIndex.as(ItemsCheckedInToInventory::getReference); }
ドメインプロトコルの作成
ドメインプロトコルという機能を使えば、共通的なプロパティ(例えば name など)をドメインモデルから独立して定義できます。(ドメインモデルへ implements して使う)
Protocol
を extends したインターフェースとして定義し、プロパティの値を(イベントの内容から)確定する処理をデフォルトメソッドとして定義します。
在庫名は一番最後の InventoryItemRenamed の newName で確定するため、latestAssociatedEntity
を使って最後の InventoryItemRenamed イベントを取得します。
なお、名前に関するドメインプロトコルも eventsourcing-cep モジュールに予め用意されています(NameProtocol
)。
在庫名 src/main/java/sample/protocols/InventoryItemNameProtocol.java
package sample.protocols; import com.eventsourcing.Protocol; import com.eventsourcing.queries.ModelQueries; import sample.events.InventoryItemRenamed; // 在庫名 public interface InventoryItemNameProtocol extends Protocol, ModelQueries { default String name() { // reference が id の値に等しい最後の InventoryItemRenamed を取得し // newName の値を返す return latestAssociatedEntity( InventoryItemRenamed.class, InventoryItemRenamed.REFERENCE_ID, InventoryItemRenamed.TIMESTAMP ).map(InventoryItemRenamed::getNewName).orElse(""); } }
次に、在庫数の場合は、ItemsCheckedInToInventory を集計するように Repository
の query
メソッドで該当する全ての ItemsCheckedInToInventory を取得し、count の合計値を算出しています。
在庫数 src/main/java/sample/protocols/InventoryItemCountProtocol.java
package sample.protocols; import com.eventsourcing.EntityHandle; import com.eventsourcing.Protocol; import lombok.val; import java.util.stream.StreamSupport; import sample.events.ItemsCheckedInToInventory; import static com.eventsourcing.index.EntityQueryFactory.equal; // 在庫数 public interface InventoryItemCountProtocol extends Protocol { default int count() { // reference が id の値に等しい ItemsCheckedInToInventory を全て取得 val res = getRepository().query( ItemsCheckedInToInventory.class, equal(ItemsCheckedInToInventory.REFERENCE_ID, getId()) ); return StreamSupport.stream(res.spliterator(), false) .map(EntityHandle::get) .mapToInt(ItemsCheckedInToInventory::getCount) .sum(); } }
ドメインモデルの作成
ドメインモデルは Model
と必要なドメインプロトコルを implements して作成します。
同一インスタンスの判定に id の値だけを使うように equals と hashCode をオーバーライドします。(今回は @EqualsAndHashCode(of = "id")
で実施)
CreateInventoryItem コマンドの result メソッドで使った lookup メソッドの実装も行います。
src/main/java/sample/domain/InventoryItem.java
package sample.domain; import com.eventsourcing.Model; import com.eventsourcing.Repository; import com.eventsourcing.queries.ModelQueries; import lombok.EqualsAndHashCode; import lombok.Value; import java.util.Optional; import java.util.UUID; import sample.events.InventoryItemCreated; import sample.protocols.InventoryItemCountProtocol; import sample.protocols.InventoryItemNameProtocol; @Value @EqualsAndHashCode(of = "id") public class InventoryItem implements Model, InventoryItemNameProtocol, InventoryItemCountProtocol { private final Repository repository; private final UUID id; protected InventoryItem(Repository repository, UUID id) { this.repository = repository; this.id = id; } public static Optional<InventoryItem> lookup(Repository repository, UUID id) { // InventoryItemCreated.ID インデックス(SimpleIndex)を使って // id に合致する InventoryItemCreated を取得 Optional<InventoryItemCreated> res = ModelQueries.lookup( repository, InventoryItemCreated.class, InventoryItemCreated.ID, id ); return res.map(ev -> new InventoryItem(repository, id)); } }
実行クラスの作成
es4j では Repository
を使ってイベントソーシングの処理を行います。
Repository は StandardRepository
へ以下のような設定を行う事で構築できます。
- (1) Journal と IndexEngine の設定
- (2) コマンドとイベントの設定(CommandSetProvider と EventSetProvider)
StandardRepository のデフォルト設定では localhost の NTP サーバーを使うようになっているようですが (NTP サーバーが無いとエラーになる)、今回はデフォルト設定の代わりに PhysicalTimeProvider 実装クラスを用意しました。
(2) で PackageXXXSetProvider を使うと指定のパッケージへ属するコマンドやイベントを一括で設定できます。
コマンドを適用する際は publish
メソッドを使います。(publish の戻り値は CompletableFuture
)
src/main/java/SampleApp.java
import com.eventsourcing.*; import com.eventsourcing.hlc.PhysicalTimeProvider; import com.eventsourcing.index.MemoryIndexEngine; import com.eventsourcing.inmem.MemoryJournal; import com.eventsourcing.repository.StandardRepository; import com.google.common.util.concurrent.AbstractService; import java.util.concurrent.*; import lombok.val; import sample.commands.CheckInItemsToInventory; import sample.commands.CreateInventoryItem; import sample.domain.InventoryItem; import sample.events.InventoryItemCreated; public class SampleApp { public static void main(String... args) { // (1) Journal と IndexEngine の設定 val repository = StandardRepository.builder() .journal(new MemoryJournal()) .indexEngine(new MemoryIndexEngine()) .physicalTimeProvider(new SampleTimeProvider()) .build(); // (2) コマンドとイベントの設定(CommandSetProvider と EventSetProvider) repository.addCommandSetProvider( new PackageCommandSetProvider(new Package[] {CreateInventoryItem.class.getPackage()})); repository.addEventSetProvider( new PackageEventSetProvider(new Package[] {InventoryItemCreated.class.getPackage()})); // 開始 repository.startAsync().awaitRunning(); try { // CreateInventoryItem コマンド適用 val d = repository.publish(new CreateInventoryItem("sample1")).get(); dumpInventoryItem(d); // CheckInItemsToInventory コマンド適用 repository.publish(new CheckInItemsToInventory(d.getId(), 5)).get(); dumpInventoryItem(d); // CheckInItemsToInventory コマンド適用 repository.publish(new CheckInItemsToInventory(d.getId(), 3)).get(); dumpInventoryItem(d); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } finally { // 停止 stopRepository(repository); } } private static InventoryItem dumpInventoryItem(InventoryItem item) { System.out.println("----- InventoryItem -----"); System.out.println("id: " + item.getId()); System.out.println("name: " + item.name()); System.out.println("count: " + item.count()); return item; } private static void stopRepository(Repository repository) { System.out.println("stop..."); try { repository.stopAsync().awaitTerminated(10, TimeUnit.SECONDS); } catch (TimeoutException ex) { ex.printStackTrace(); } } // PhysicalTimeProvider の実装 static class SampleTimeProvider extends AbstractService implements PhysicalTimeProvider { @Override public long getPhysicalTime() { return System.currentTimeMillis(); } @Override protected void doStart() { System.out.println("timeprovider start..."); notifyStarted(); } @Override protected void doStop() { System.out.println("timeprovider stop..."); notifyStopped(); } } }
publish の呼び出し箇所は CompletableFuture で処理を繋げれば以下のように実装できます。
src/main/java/SampleApp.java(CompletableFuture 活用版)
public class SampleApp { public static void main(String... args) { ・・・ repository.publish(new CreateInventoryItem("sample1")) .thenApply(SampleApp::dumpInventoryItem) .thenCompose(d -> repository.publish(new CheckInItemsToInventory(d.getId(), 5)) .thenApply(v -> d) ) .thenApply(SampleApp::dumpInventoryItem) .thenCompose(d -> repository.publish(new CheckInItemsToInventory(d.getId(), 3)) .thenApply(v -> d) ) .thenApply(SampleApp::dumpInventoryItem) .whenComplete((d, e) -> stopRepository(repository)); } ・・・ }
実行結果
> gradle -q run [main] INFO org.reflections.Reflections - Reflections took 64 ms to scan 1 urls, producing 2 keys and 4 values [main] INFO org.reflections.Reflections - Reflections took 17 ms to scan 1 urls, producing 2 keys and 6 values timeprovider start... ----- InventoryItem ----- id: d7636355-850d-4a15-9e85-ce0f1de514e7 name: sample1 count: 0 ----- InventoryItem ----- id: d7636355-850d-4a15-9e85-ce0f1de514e7 name: sample1 count: 5 ----- InventoryItem ----- id: d7636355-850d-4a15-9e85-ce0f1de514e7 name: sample1 count: 8 stop... timeprovider stop...
備考
実は、今回のサンプルを実行すると処理が終わってもプロセスは終了しません。
スレッドダンプを出力してみたところ、StandardEntity に原因がありそうです。
スレッドダンプ出力例
> jcmd 8904 Thread.print ・・・ "Thread-1" #11 prio=5 os_prio=0 ・・・ waiting on condition [・・・] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <・・・> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) at java.util.concurrent.LinkedBlockingDeque.putLast(LinkedBlockingDeque.java:396) at java.util.concurrent.LinkedBlockingDeque.put(LinkedBlockingDeque.java:649) at com.eventsourcing.StandardEntity.lambda$static$0(StandardEntity.java:27) at com.eventsourcing.StandardEntity$$Lambda$14/540159270.run(Unknown Source) at java.lang.Thread.run(Thread.java:745) ・・・
StandardEntity のソース(27行目付近)を見てみると、以下のようにスレッドを開始して放置していました。
com.eventsourcing.StandardEntity のソース
・・・ public abstract class StandardEntity<E extends Entity<E>> implements Entity<E> { private static LinkedBlockingDeque<UUID> uuids = new LinkedBlockingDeque<>(10_000); static { new Thread(() -> { while (true) { try { uuids.put(UUID.randomUUID()); // 27行目 } catch (InterruptedException e) { } } }).start(); } ・・・
RxJava2 で並列処理
前回 と同じような並列処理を RxJava 2.0 で試してみました。
ソースは http://github.com/fits/try_samples/tree/master/blog/20161226/
はじめに
まずは、map の処理と subscribe の処理を順番に 3回繰り返す処理です。
前回の Reactor との違いは Mono
が Single
に変わっただけです。
(A) サンプルコード
Single.just("(A)") .repeat(3) .map(n -> { printThread(n + "_map"); return n; }) .subscribe(n -> printThread(n + "_subscribe"));
(A) サンプルコードの実行結果
(A)_map, thread: Thread[main,5,main] (A)_subscribe, thread: Thread[main,5,main] (A)_map, thread: Thread[main,5,main] (A)_subscribe, thread: Thread[main,5,main] (A)_map, thread: Thread[main,5,main] (A)_subscribe, thread: Thread[main,5,main]
observeOn の場合
RxJava 2.0 では Reactor の publishOn
の代わりに observeOn
が使えます。
main スレッドとは別のスレッドで実行しますが、順番に実行する点は変わりません。
(B) observeOn を使った場合
Single.just("(B)") .repeat(3) .observeOn(Schedulers.computation()) // 実質的には以下でも同じ //.observeOn(Schedulers.single()) .map(n -> { printThread(n + "_map"); return n; }) .subscribe(n -> printThread(n + "_subscribe"));
(B) observeOn を使った場合の実行結果
(B)_map, thread: Thread[RxComputationThreadPool-1,5,main] (B)_subscribe, thread: Thread[RxComputationThreadPool-1,5,main] (B)_map, thread: Thread[RxComputationThreadPool-1,5,main] (B)_subscribe, thread: Thread[RxComputationThreadPool-1,5,main] (B)_map, thread: Thread[RxComputationThreadPool-1,5,main] (B)_subscribe, thread: Thread[RxComputationThreadPool-1,5,main]
flatMap と subscribeOn を使った並列処理
RxJava 2.0 では Reactor の ParallelFlux
相当の処理は用意されていないようです。
そこで、flatMap
と subscribeOn
を使ってみました。
(C) flatMap + subscribeOn
Single.just("(C)") .repeat(3) .flatMap(s -> Flowable.just(s) .map(n -> { printThread(n + "_map"); return n; }) .subscribeOn(Schedulers.computation()) //.subscribeOn(Schedulers.newThread()) ) .subscribe(n -> printThread(n + "_subscribe"));
ただし、以下の実行結果のように別スレッドで実行してはいるものの、map と subscribe を同じスレッドで実行するわけではありませんでした。
なお、Schedulers.computation()
の場合、スレッド数は実行環境のコア数に依存するようです。(newThread は新しいスレッドを使う)
(C) flatMap + subscribeOn の実行結果1
(C)_map, thread: Thread[RxComputationThreadPool-2,5,main] (C)_map, thread: Thread[RxComputationThreadPool-3,5,main] (C)_map, thread: Thread[RxComputationThreadPool-1,5,main] (C)_subscribe, thread: Thread[RxComputationThreadPool-2,5,main] (C)_subscribe, thread: Thread[RxComputationThreadPool-1,5,main] (C)_subscribe, thread: Thread[RxComputationThreadPool-1,5,main]
(C) flatMap + subscribeOn の実行結果2
(C)_map, thread: Thread[RxComputationThreadPool-1,5,main] (C)_map, thread: Thread[RxComputationThreadPool-3,5,main] (C)_map, thread: Thread[RxComputationThreadPool-2,5,main] (C)_subscribe, thread: Thread[RxComputationThreadPool-1,5,main] (C)_subscribe, thread: Thread[RxComputationThreadPool-2,5,main] (C)_subscribe, thread: Thread[RxComputationThreadPool-2,5,main]
備考
最後に、今回使用したソースを記載しておきます。
build.gradle
apply plugin: 'application' mainClassName = 'App' repositories { jcenter() } dependencies { compile 'io.reactivex.rxjava2:rxjava:2.0.3' } run { standardInput = System.in if (project.hasProperty('args')) { args project.args } }
src/main/java/App.java
import io.reactivex.Single; import io.reactivex.Flowable; import io.reactivex.schedulers.Schedulers; import java.lang.invoke.MethodHandle; import static java.lang.invoke.MethodHandles.publicLookup; import static java.lang.invoke.MethodType.methodType; public class App { public static void main(String... args) throws Throwable { MethodHandle mh = publicLookup().findStatic( App.class, "sample" + args[0], methodType(void.class) ); mh.invoke(); System.in.read(); } // (A) public static void sampleA() { Single.just("(A)") .repeat(3) .map(n -> { printThread(n + "_map"); return n; }) .subscribe(n -> printThread(n + "_subscribe")); } // (B) public static void sampleB() { Single.just("(B)") .repeat(3) .observeOn(Schedulers.computation()) //.observeOn(Schedulers.single()) .map(n -> { printThread(n + "_map"); return n; }) .subscribe(n -> printThread(n + "_subscribe")); } // (C) public static void sampleC() { Single.just("(C)") .repeat(3) .flatMap(s -> Flowable.just(s) .map(n -> { printThread(n + "_map"); return n; }) .subscribeOn(Schedulers.computation()) //.subscribeOn(Schedulers.newThread()) ) .subscribe(n -> printThread(n + "_subscribe")); } private static void printThread(String msg) { System.out.println(msg + ", thread: " + Thread.currentThread()); } }
実行例
> gradle -q run -Pargs=C (C)_map, thread: Thread[RxComputationThreadPool-2,5,main] (C)_map, thread: Thread[RxComputationThreadPool-3,5,main] (C)_map, thread: Thread[RxComputationThreadPool-1,5,main] (C)_subscribe, thread: Thread[RxComputationThreadPool-2,5,main] (C)_subscribe, thread: Thread[RxComputationThreadPool-3,5,main] (C)_subscribe, thread: Thread[RxComputationThreadPool-3,5,main]
Reactor で並列処理
Reactor の並列処理を試してみました。
ソースは http://github.com/fits/try_samples/tree/master/blog/20161208/
はじめに
Reactor で以下のようなコードを実行すると、map の処理と subscribe の処理を順番に 3回繰り返します。(repeat
メソッドの戻り値は Flux
)
(A) サンプルコード
Mono.just("(A)") .repeat(3) .map(n -> { printThread(n + "_map"); return n; }) .subscribe(n -> printThread(n + "_subscribe"));
(A) サンプルコードの実行結果
(A)_map, thread: Thread[main,5,main] (A)_subscribe, thread: Thread[main,5,main] (A)_map, thread: Thread[main,5,main] (A)_subscribe, thread: Thread[main,5,main] (A)_map, thread: Thread[main,5,main] (A)_subscribe, thread: Thread[main,5,main]
ここで、今回は map からの処理を別スレッドで並列実行してみます。
publishOn の場合
publishOn
を使うと main スレッドとは別のスレッドで実行しますが、順番に実行する点は変わりません。
(B) publishOn を使った場合
Mono.just("(B)") .repeat(3) .publishOn(Schedulers.parallel()) // 実質的に以下と同じ //.publishOn(Schedulers.single()) .map(n -> { printThread(n + "_map"); return n; }) .subscribe(n -> printThread(n + "_subscribe"));
(B) publishOn を使った場合の実行結果
(B)_map, thread: Thread[parallel-1,5,main] (B)_subscribe, thread: Thread[parallel-1,5,main] (B)_map, thread: Thread[parallel-1,5,main] (B)_subscribe, thread: Thread[parallel-1,5,main] (B)_map, thread: Thread[parallel-1,5,main] (B)_subscribe, thread: Thread[parallel-1,5,main]
publishOn へ Schedulers.parallel()
を渡しても複数のスレッドを使うわけでは無いようなので、実質的に Schedulers.single()
と変わりは無さそうです。
ParallelFlux を使った並列処理
並列で実行するためには parallel
メソッドを使って Flux
から ParallelFlux
を取得し、runOn
メソッドで Schedulers.parallel()
※ を指定します。
※ この場合、Schedulers.parallel() の代わりに Schedulers.single() を使うと並列にならない点に注意 (単一スレッドで実行する事になる)
(C) parallel + runOn
Mono.just("(C)") .repeat(3) .parallel() .runOn(Schedulers.parallel()) .map(n -> { printThread(n + "_map"); return n; }) .subscribe(n -> printThread(n + "_subscribe"));
(C) parallel + runOn の実行結果
(C)_map, thread: Thread[parallel-1,5,main] (C)_map, thread: Thread[parallel-3,5,main] (C)_map, thread: Thread[parallel-2,5,main] (C)_subscribe, thread: Thread[parallel-3,5,main] (C)_subscribe, thread: Thread[parallel-1,5,main] (C)_subscribe, thread: Thread[parallel-2,5,main]
3つのスレッドを使って並列に実行できているようです。
なお、runOn
をコメントアウトして実行すると (A) と同様の結果(main スレッドで順番に実行)になります。
備考
最後に、今回使用したソースを記載しておきます。
build.gradle
apply plugin: 'application' mainClassName = 'App' repositories { jcenter() } dependencies { compile 'io.projectreactor:reactor-core:3.0.3.RELEASE' } run { standardInput = System.in if (project.hasProperty('args')) { args project.args } }
src/main/java/App.java
import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; import java.lang.invoke.MethodHandle; import static java.lang.invoke.MethodHandles.publicLookup; import static java.lang.invoke.MethodType.methodType; public class App { public static void main(String... args) throws Throwable { MethodHandle mh = publicLookup().findStatic( App.class, "sample" + args[0], methodType(void.class) ); mh.invoke(); System.in.read(); } // (A) public static void sampleA() { Mono.just("(A)") .repeat(3) .map(n -> { printThread(n + "_map"); return n; }) .subscribe(n -> printThread(n + "_subscribe")); } // (B) publishOn を使った場合 public static void sampleB() { Mono.just("(B)") .repeat(3) .publishOn(Schedulers.parallel()) //.publishOn(Schedulers.single()) .map(n -> { printThread(n + "_map"); return n; }) .subscribe(n -> printThread(n + "_subscribe")); } // (C) parallel + runOn public static void sampleC() { Mono.just("(C)") .repeat(3) .parallel() .runOn(Schedulers.parallel()) .map(n -> { printThread(n + "_map"); return n; }) .subscribe(n -> printThread(n + "_subscribe")); } private static void printThread(String msg) { System.out.println(msg + ", thread: " + Thread.currentThread()); } }
実行例
> gradle -q run -Pargs=B (B)_map, thread: Thread[parallel-1,5,main] (B)_subscribe, thread: Thread[parallel-1,5,main] (B)_map, thread: Thread[parallel-1,5,main] (B)_subscribe, thread: Thread[parallel-1,5,main] (B)_map, thread: Thread[parallel-1,5,main] (B)_subscribe, thread: Thread[parallel-1,5,main]
Spring Web Reactive を試す
Spring 5 で導入される Spring Web Reactive を試してみました。
本来なら Spring Boot で実行する事になると思いますが、今回は Spring Boot を使わずに Undertow で直接実行してみます。
ソースは http://github.com/fits/try_samples/tree/master/blog/20161115/
サンプル作成
ビルド定義
現時点では Spring Web Reactive の正式版はリリースされていないようなのでスナップショット版を使います。
Undertow を実行するために undertow-core
、JSON で結果を返す処理を試すために jackson-databind
を依存関係へ設定しています。
build.gradle
apply plugin: 'application' mainClassName = 'sample.App' repositories { jcenter() maven { url 'http://repo.spring.io/snapshot/' } } dependencies { compile 'org.projectlombok:lombok:1.16.10' // Spring Web Reactive compile 'org.springframework:spring-web-reactive:5.0.0.BUILD-SNAPSHOT' // Undertow compile 'io.undertow:undertow-core:2.0.0.Alpha1' // JSON 用 runtime 'com.fasterxml.jackson.core:jackson-databind:2.8.4' }
設定クラス
@EnableWebReactive
を付与すれば Spring Web Reactive を有効にできるようです。
src/main/java/sample/config/AppConfig.java
package sample.config; import org.springframework.context.annotation.ComponentScan; import org.springframework.web.reactive.config.EnableWebReactive; @EnableWebReactive @ComponentScan("sample.controller") public class AppConfig { }
コントローラークラス
Spring Web Reactive は Spring Web (MVC) のプログラミングスタイルを踏襲しているようです。
Spring Web (MVC) と同じアノテーションでコントローラークラスを定義し、メソッドの戻り値に Reactor の Mono / Flux を使えばよさそうです。
クラス | 概要 |
---|---|
Mono | 単一の結果を返す場合に使用 |
Flux | 複数の結果を返す場合に使用 |
src/main/java/sample/controller/SampleController.java
package sample.controller; import lombok.Value; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @RestController public class SampleController { @RequestMapping("/") public Mono<String> sample() { return Mono.just("sample"); } @RequestMapping(value = "/data/{name}", produces = "application/json") public Flux<Data> dataList(@PathVariable("name") String name) { return Flux.fromArray(new Data[] { new Data(name + "-1", 1), new Data(name + "-2", 2), new Data(name + "-3", 3) }); } @Value class Data { private String name; private int value; } }
実行クラス
まずは、Spring Web Reactive を有効化した ApplicationContext を使って HttpHandler を作成します。 (DispatcherHandler
と WebHttpHandlerBuilder
を使用)
あとは、対象の Web サーバー ※ に合わせて実行します。
※ 今のところ Servlet 3.1 対応コンテナ、Netty、Undertow を サポートしているようです
Undertow の場合、アダプタークラス UndertowHttpHandlerAdapter
で HttpHandler をラッピングして実行するだけです。
src/main/java/sample/App.java
package sample; import io.undertow.Undertow; import lombok.val; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.http.server.reactive.UndertowHttpHandlerAdapter; import org.springframework.web.reactive.DispatcherHandler; import org.springframework.web.server.adapter.WebHttpHandlerBuilder; import sample.config.AppConfig; public class App { public static void main(String... args) { val ctx = new AnnotationConfigApplicationContext(); // @EnableWebReactive を付与したクラスを登録 ctx.register(AppConfig.class); ctx.refresh(); val handler = new DispatcherHandler(); handler.setApplicationContext(ctx); // HttpHandler の作成 val httpHandler = WebHttpHandlerBuilder.webHandler(handler).build(); val server = Undertow.builder() .addHttpListener(8080, "localhost") // Undertow 用アダプターでラッピングして設定 .setHandler(new UndertowHttpHandlerAdapter(httpHandler)) .build(); server.start(); } }
実行
Gradle で実行
> gradle -q run ・・・ 情報: Mapped "{[/data/{name}],produces=[application/json]}" onto public reactor.core.publisher.Flux<sample.controller.SampleController$Data> sample.controller.SampleController.dataList(java.lang.String) 11 14, 2016 9:32:23 午後 org.springframework.web.reactive.result.method.annotation.RequestMappingHandlerMapping register 情報: Mapped "{[/]}" onto public reactor.core.publisher.Mono<java.lang.String> sample.controller.SampleController.sample() 11 14, 2016 9:32:23 午後 org.xnio.Xnio <clinit> INFO: XNIO version 3.3.6.Final 11 14, 2016 9:32:23 午後 org.xnio.nio.NioXnio <clinit> INFO: XNIO NIO Implementation Version 3.3.6.Final
動作確認1
$ curl -s http://localhost:8080/ sample
動作確認2
$ curl -s http://localhost:8080/data/a [{"name":"a-1","value":1},{"name":"a-2","value":2},{"name":"a-3","value":3}]