Akka の FileIO でファイルを読み書き

Akka (akka-stream) の FileIO を使ってファイルの読み書きを行ってみます。

今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170131/

はじめに

akka-stream では Java 用の APIakka.stream.javadsl パッケージに、Scala 用の APIakka.stream.scaladsl パッケージに定義されています。

主要なクラスやメソッドは javadsl と scaladsl で概ね共通化されているようですが、Sourcerecover メソッドは deprecated の有無が違っていました。

Source の recover/recoverWith メソッドの deprecated 状況
パッケージ deprecated 有り deprecated 無し 備考
akka.stream.scaladsl recoverWith recover ソースは akka/stream/scaladsl/Flow.scala
akka.stream.javadsl recover recoverWith ソースは akka/stream/javadsl/Source.scala

ここで、deprecated の理由が @deprecated("Use recoverWithRetries instead.", "2.4.4") のようなので、scaladsl の方が正しく、javadsl の方は deprecated するメソッドが違っているのだと思います ※。

 ※ recoverWith の代わりに recoverWithRetries を使えるが
    (引数が 1つ増えただけなので)
    recover は引数の型がそもそも違う

Scala の場合

まずは Scala で実装してみます。

ファイル用の Sink は FileIO.toPath で、Source は FileIO.fromPath で取得できます。

ファイルの入出力の型は akka.util.ByteString となっており、以下では map を使って String との変換を行っています。

Source の recover メソッドを使えば、例外が発生していた場合に代用の値へ差し替えて処理を繋げられます。

今回は、ファイルが存在しない等で IOException が発生した際に、"invalid file, ・・・" という文字列へ差し替えるために recover を使っています。

src/main/scala/SampleApp.scala
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{FileIO, Source}
import akka.util.ByteString

import java.nio.file.Paths
import java.io.IOException

object SampleApp extends App {
  implicit val system = ActorSystem()
  import system.dispatcher  // ExecutionContext を implicit
  implicit val materializer = ActorMaterializer()

  // ファイルへの書き込み(sample1.txt へ "sample data" を出力)
  val res1 = Source.single("sample data")
                   .map(ByteString.fromString)
                   .runWith(FileIO.toPath(Paths.get("sample1.txt")))

  // ファイルの読み込み(sample2.txt の内容を println)
  val res2 = FileIO.fromPath(Paths.get("sample2.txt"))
                   .map(_.utf8String)
                   .recover {
                     case e: IOException => s"invalid file, ${e}"
                   }
                   .runForeach(println)

  res1.flatMap(_ => res2).onComplete(_ => system.terminate)
}

ビルドと実行

Gradle のビルド定義ファイルは以下の通りです。

build.gradle
apply plugin: 'scala'
apply plugin: 'application'

mainClassName = 'SampleApp'

repositories {
    jcenter()
}

dependencies {
    compile 'org.scala-lang:scala-library:2.12.1'
    compile 'com.typesafe.akka:akka-stream_2.12:2.5-M1'
}

sample2.txt ファイルの無い状態で実行します。

実行結果1
> gradle -q run

invalid file, java.nio.file.NoSuchFileException: sample2.txt

sample2.txt を作成して実行します。

実行結果2
> echo %time% > sample2.txt

> gradle -q run

22:11:48.03

Java の場合

次に Java で実装してみます。

Scala と概ね同じですが、akka.stream.javadsl.Source でも recover の引数が scala.PartialFunction となっており、多少の工夫が必要です。

recover の引数に合う scala.PartialFunction は Akka の akka.japi.pf.PFBuilder で作れるので、以下では Match.match から PFBuilder を取得して使っています。

なお、Akka では scala.PartialFunction を組み立てるための Java 用の APIakka.japi.pf パッケージにいくつか用意されています。

src/main/java/SampleApp.java
import akka.actor.ActorSystem;

import akka.japi.pf.Match;
import akka.japi.pf.PFBuilder;

import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.stream.javadsl.FileIO;
import akka.stream.javadsl.Source;

import akka.util.ByteString;

import java.io.IOException;
import java.nio.file.Paths;
import java.util.concurrent.CompletableFuture;

public class SampleApp {
    public static void main(String... args) {
        final ActorSystem system = ActorSystem.create();
        final Materializer materializer = ActorMaterializer.create(system);

        // ファイルへの書き込み(sample1.txt へ "sample data" を出力)
        CompletableFuture<?> res1 = Source.single("sample data")
                .map(ByteString::fromString)
                .runWith(FileIO.toPath(Paths.get("sample1.txt")), materializer)
                .toCompletableFuture();

        // scala.PartialFunction のビルダー定義
        PFBuilder<Throwable, String> pfunc = 
            Match.match(IOException.class, e -> "invalid file, " + e);

        // ファイルの読み込み(sample2.txt の内容を println)
        CompletableFuture<?> res2 = FileIO.fromPath(Paths.get("sample2.txt"))
                .map(ByteString::utf8String)
                .recover(pfunc.build())
                .runForeach(System.out::println, materializer)
                .toCompletableFuture();

        CompletableFuture.allOf(res1, res2)
                .handle((v, e) -> system.terminate())
                .join();
    }
}

ビルドと実行

Gradle のビルド定義ファイルは以下の通りです。

build.gradle
apply plugin: 'application'

mainClassName = 'SampleApp'

repositories {
    jcenter()
}

dependencies {
    compile 'com.typesafe.akka:akka-stream_2.12:2.5-M1'
}

sample2.txt ファイルの無い状態で実行します。

実行結果1
> gradle -q run

・・・\src\main\java\SampleApp.javaは非推奨のAPIを使用またはオーバーライドしています。
・・・
invalid file, java.nio.file.NoSuchFileException: sample2.txt

recover が deprecated されている件で警告メッセージが出力されますが、上述したように recover を deprecated しているのが誤りだと思われるので無視します。

sample2.txt を作成して実行します。

実行結果2
> echo %time% > sample2.txt

> gradle -q run

・・・
22:14:16.00

Sourcerer でイベントソーシング

Axon Framework でイベントソーシング」 や 「es4j でイベントソーシング」 と同様の処理を Sourcerer で実装してみました。

今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170110/

はじめに

以下のような Gradle ビルド定義ファイルを使います。

Sourcerer はイベントの保存先として Event Store を使用するため、今回は Event Store クライアントライブラリに esjc を使う sourcerer-esjc モジュールを使用します。

build.gradle
apply plugin: 'application'

mainClassName = 'SampleApp'

repositories {
    jcenter()
}

dependencies {
    compileOnly 'org.projectlombok:lombok:1.16.12'

    compile 'org.elder.sourcerer:sourcerer-esjc:v4.0.1'
    compile 'com.fasterxml.jackson.core:jackson-databind:2.8.5'

    compile 'io.javaslang:javaslang:2.1.0-alpha'
}

lombokjavaslang は必須ではありません。(javaslang はパターンマッチ処理に使いました)

Sourcerer によるイベントソーシング

コマンドの作成

コマンドは特に注意するところはなく、普通の Java クラスとして実装するだけです。

ここで定義したコマンドは、CommandFactory で作成した Command の引数 (Arguments) として使う事になります。

在庫作成コマンド src/main/java/sample/commands/CreateInventoryItem.java
package sample.commands;

import lombok.Value;

// 在庫作成コマンド
@Value
public class CreateInventoryItem {
    private String id;
    private String name;
}
在庫数追加コマンド src/main/java/sample/commands/CheckInItemsToInventory.java
package sample.commands;

import lombok.Value;

// 在庫数追加コマンド
@Value
public class CheckInItemsToInventory {
    private int count;
}

イベントの作成

イベントの型へ @EventType を付与します。

AggregateProjection 等で指定するイベントの型は 1つなので、まずはベースとなるインターフェースを定義します。

イベントの内容は JSON 化して Event Store へ保存する事になりますが、デフォルトでは JSON へ型情報が付与されず、複数のイベント型を使うと JSON からイベントオブジェクトを復元する際に不都合が生じます。

これを回避するために @JsonTypeInfo@JsonSubTypes を使って JSON へ型情報を残すように設定しています ※。

 ※ @JsonTypeInfo の use へ NAME を指定した場合は
    "@type":"クラス名" の情報を JSON へ出力します
イベントのベースインターフェース src/main/java/sample/events/InventoryEvent.java
package sample.events;

import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.elder.sourcerer.EventType;

@EventType
// JSON へ型情報を残すための設定
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME)
@JsonSubTypes({
    @Type(InventoryItemCreated.class),
    @Type(InventoryItemRenamed.class),
    @Type(ItemsCheckedInToInventory.class)
})
public interface InventoryEvent {
}
在庫作成イベント src/main/java/sample/events/InventoryItemCreated.java
package sample.events;

import lombok.Value;

// 在庫作成イベント
@Value
public class InventoryItemCreated implements InventoryEvent {
    private String id;
}
在庫名の変更イベント src/main/java/sample/events/InventoryItemRenamed.java
package sample.events;

import lombok.Value;

// 在庫名の変更イベント
@Value
public class InventoryItemRenamed implements InventoryEvent {
    private String name;
}
在庫数の追加イベント src/main/java/sample/events/ItemsCheckedInToInventory.java
package sample.events;

import lombok.Value;

// 在庫数の追加イベント
@Value
public class ItemsCheckedInToInventory implements InventoryEvent {
    private int count;
}

コマンドハンドラの作成

コマンドを処理してイベントを生成するメソッドを定義します。

引数や戻り値の型は CommandFactoryfromOperation メソッドへ渡す値 (Operations の各種メソッドで生成) に応じて調整します。

単一のコマンドから複数のイベントを生成する場合は戻り値を List<イベント型> とするだけのようです。

src/main/java/sample/InventoryOperations.java
package sample;

import com.google.common.collect.ImmutableList;
import sample.commands.CheckInItemsToInventory;
import sample.commands.CreateInventoryItem;
import sample.events.InventoryEvent;
import sample.events.InventoryItemCreated;
import sample.events.InventoryItemRenamed;
import sample.events.ItemsCheckedInToInventory;

import java.util.List;

public class InventoryOperations {
    // 作成コマンドの処理
    public static List<InventoryEvent> create(CreateInventoryItem cmd) {
        return ImmutableList.of(
            new InventoryItemCreated(cmd.getId()),
            new InventoryItemRenamed(cmd.getName())
        );
    }
    // 在庫数追加コマンドの処理
    public static ItemsCheckedInToInventory checkIn(InventoryItem state, CheckInItemsToInventory cmd) {
        return new ItemsCheckedInToInventory(cmd.getCount());
    }
}

AggregateProjection の作成

AggregateProjection<状態の型, イベントの型> を実装して、イベントから状態を復元する処理を実装します。

今回は javaslang モジュールの MatchCase を使ってパターンマッチで処理しています。

src/main/java/sample/InventoryProjection.java
package sample;

import org.elder.sourcerer.AggregateProjection;
import org.jetbrains.annotations.NotNull;
import sample.events.InventoryItemCreated;
import sample.events.InventoryEvent;
import sample.events.InventoryItemRenamed;
import sample.events.ItemsCheckedInToInventory;

import static javaslang.API.*;
import static javaslang.Predicates.*;

public class InventoryProjection implements AggregateProjection<InventoryItem, InventoryEvent>{

    @NotNull
    @Override
    public InventoryItem empty() {
        return new InventoryItem("", "", 0);
    }

    @NotNull
    @Override
    public InventoryItem apply(@NotNull String id,
                               @NotNull InventoryItem state, 
                               @NotNull InventoryEvent event) {

        return Match(event).of(
            // InventoryItemCreated イベントの場合
            Case(instanceOf(InventoryItemCreated.class), ev -> 
                    state.withId(ev.getId())),
            // InventoryItemRenamed イベントの場合
            Case(instanceOf(InventoryItemRenamed.class), ev -> 
                    state.withName(ev.getName())),
            // ItemsCheckedInToInventory イベントの場合
            Case(instanceOf(ItemsCheckedInToInventory.class), ev ->
                    state.withCount(state.getCount() + ev.getCount())),
            // その他
            Case($(), state)
        );
    }
}

在庫の状態 InventoryItem は以下のように実装しました。

src/main/java/sample/InventoryItem.java
package sample;

import lombok.Value;

@Value
public class InventoryItem {
    private String id;
    private String name;
    private int count;

    public InventoryItem withId(String newId) {
        return new InventoryItem(newId, name, count);
    }

    public InventoryItem withName(String newName) {
        return new InventoryItem(id, newName, count);
    }

    public InventoryItem withCount(int newCount) {
        return new InventoryItem(id, name, newCount);
    }
}

実行クラスの作成

sourcerer-esjc-spring モジュールを使う方が楽だと思われますが、今回は AggregateRepositoryCommandFactory の組み立て処理を自前で実装してみました。

  • (1) EventStore クライアント作成
  • (2) EventRepositoryFactory 作成
  • (3) EventRepository 作成
  • (4) AggregateRepository 作成
  • (5) CommandFactory 作成

CommandFactory へコマンドハンドラのメソッドをそれぞれ指定して Command を作成します。(コマンドハンドラメソッドのシグネチャに対応する Operations のメソッドを使います)

Command へ aggregateId を設定し arguments へコマンドの内容を設定した後、run を実行するとコマンドが適用されます。

src/main/java/SampleApp.java
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.msemys.esjc.EventStoreBuilder;

import lombok.val;

import org.elder.sourcerer.*;
import org.elder.sourcerer.esjc.EventStoreEsjcEventRepositoryFactory;

import sample.*;
import sample.commands.CheckInItemsToInventory;
import sample.commands.CreateInventoryItem;
import sample.events.InventoryEvent;

import java.util.UUID;

public class SampleApp {
    public static void main(String... args) throws JsonProcessingException {
        // (1) EventStore クライアント作成
        val eventStore = EventStoreBuilder.newBuilder()
                // ポート番号に 1113 を使う点に注意 (2113 ではない)
                .singleNodeAddress("localhost", 1113) 
                .build();

        val mapper = new ObjectMapper();
        // (2) EventRepositoryFactory 作成
        val eventRepositoryFactory = new EventStoreEsjcEventRepositoryFactory(
                eventStore, 
                mapper, 
                "sample"
        );
        // (3) EventRepository 作成
        val eventRepository = eventRepositoryFactory.getEventRepository(InventoryEvent.class);
        // (4) AggregateRepository 作成
        val aggregateRepository = new DefaultAggregateRepository<>(
                eventRepository, 
                new InventoryProjection()
        );
        // (5) CommandFactory 作成
        val commandFactory = new DefaultCommandFactory<>(aggregateRepository);

        // 在庫作成処理
        val createCommand = commandFactory.fromOperation(
                Operations.constructorOf(InventoryOperations::create));
        // 在庫数の追加処理
        val checkInCommand = commandFactory.fromOperation(
                Operations.updateOf(InventoryOperations::checkIn));

        val id = UUID.randomUUID().toString();
        // 在庫作成の実行
        val r1 = createCommand.setAggregateId(id)
                .setArguments(new CreateInventoryItem(id, "sample"))
                .run();

        printCommandResult(r1);
        // 在庫数の追加の実行(1回目)
        val r2 = checkInCommand.setAggregateId(id)
                .setArguments(new CheckInItemsToInventory(5))
                .run();

        printCommandResult(r2);

        // 在庫数の追加の実行(2回目)
        val r3 = checkInCommand.setAggregateId(id)
                .setArguments(new CheckInItemsToInventory(3))
                .run();

        printCommandResult(r3);

        // 集約の内容を取得
        val aggregate = aggregateRepository.load(id);
        // 状態を出力
        System.out.println("*** result state: " + aggregate.state());

        eventStore.disconnect();
    }

    private static void printCommandResult(CommandResult<? extends InventoryEvent> result) {
        System.out.printf(
            "*** command result: events=%s, newVersion=%s\n", 
            result.getEvents(), 
            result.getNewVersion()
        );
    }
}

実行

https://geteventstore.com/ から Event Store の実行環境をダウンロードして、起動しておきます。

Event Store 起動
> EventStore.ClusterNode.exe --db ./db --log ./logs

・・・
[08912,14,15:31:00.488] Starting Normal TCP listening on TCP endpoint: 127.0.0.1:1113.
・・・
[08912,09,15:31:07.100] Created stats stream '$stats-127.0.0.1:2113', code = Success
サンプルアプリケーションの実行
> gradle -q run

・・・
*** command result: events=[InventoryItemCreated(id=4184fd14-7725-4d09-8e24-59e9a94859a1), InventoryItemRenamed(name=sample)], newVersion=1
・・・
*** command result: events=[ItemsCheckedInToInventory(count=5)], newVersion=2
・・・
*** command result: events=[ItemsCheckedInToInventory(count=3)], newVersion=3
・・・
*** result state: InventoryItem(id=4184fd14-7725-4d09-8e24-59e9a94859a1, name=sample, count=8)
・・・

Event Store への登録内容を確認するため、http://127.0.0.1:2113/ へアクセスして 「Stream Browser」 で sample:・・・・ の内容を見てみました。

f:id:fits:20170110235412p:plain

備考

今回のサンプルも処理が終わった後(disconnect 実行後)にプロセスが終了しません。

原因を詳しく調べていませんが、Event Store クライアントライブラリの esjc 1.6.0 を単体で試した際に、以下が原因でプロセスが終了しないようだったので、esjc が原因かもしれません。

  • executor 未指定の際に Settings 内で作る ThreadPoolExecutor の shutdown を実行しない
  • EventStoreTcp 内の NioEventLoopGroup の shutdownGracefully を実行しない

es4j でイベントソーシング

Axon Framework でイベントソーシング」 と同様の処理を es4j (Eventsourcing for Java) で実装してみました。

今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170107/

はじめに

今回はイベントをインメモリで保持する eventsourcing-inmem を使うため、以下のような Gradle 用ビルド定義を用意しました。 (lombok は便利なので使います)

build.gradle
apply plugin: 'application'

mainClassName = 'SampleApp'

repositories {
    jcenter()
}

dependencies {
    compileOnly "org.projectlombok:lombok:1.16.12"

    compile 'com.eventsourcing:eventsourcing-inmem:0.4.5'
}

es4j によるイベントソーシング

コマンドの作成

コマンドは StandardCommand<S, R> を継承して作成し、S に状態の型を R に結果の型を指定します。

コマンドを適用することで生じるイベントは events メソッドの戻り値で返し、コマンド適用結果は result メソッドの戻り値で返します。

状態は events メソッドで用意し、result メソッドの引数として渡ってきます。

まずは、在庫作成のコマンドを実装してみます。

状態の型に InventoryItemCreated を、結果の型に InventoryItem を指定しています。

状態を伴う EventStream を返すために EventStream.ofWithState を使用します。

コンストラクタの引数名からプロパティを認識するようなので、-parameters オプションを使ってビルドするか、引数へ @PropertyName を付与します。(今回は @PropertyName を使用)

なお、状態の型は InventoryItemCreated としなければならないわけでもなく、UUID を代わりに使っても動作に支障はなさそうでした。

在庫作成コマンド src/main/java/sample/commands/CreateInventoryItem.java
package sample.commands;

import com.eventsourcing.EventStream;
import com.eventsourcing.Repository;
import com.eventsourcing.StandardCommand;
import com.eventsourcing.layout.PropertyName;

import lombok.EqualsAndHashCode;
import lombok.Value;
import lombok.val;

import sample.domain.InventoryItem;
import sample.events.InventoryItemCreated;
import sample.events.InventoryItemRenamed;

// 在庫作成コマンド
@Value
@EqualsAndHashCode(callSuper = false)
public class CreateInventoryItem extends StandardCommand<InventoryItemCreated, InventoryItem> {
    private String name;

    public CreateInventoryItem(@PropertyName("name") String name) {
        this.name = name;
    }
    // イベントの生成
    @Override
    public EventStream<InventoryItemCreated> events() throws Exception {
        // 在庫作成イベント
        val created = new InventoryItemCreated();
        // 名前変更イベント
        val renamed = new InventoryItemRenamed(created.uuid(), name);

        return EventStream.ofWithState(created, created, renamed);
    }
    // 結果の生成
    @Override
    public InventoryItem result(InventoryItemCreated state, Repository repository) {
        return InventoryItem.lookup(repository, state.uuid()).get();
    }
}

次に、在庫数を加えるためのコマンドです。

ドメインモデルの内部状態を更新するだけなので、状態と結果の型を Void としました。

状態を伴わないので EventStream.of で EventStream を返します。 コマンドの適用結果が Void なので result メソッドをオーバーライドする必要もありません。

在庫数追加コマンド src/main/java/sample/commands/CheckInItemsToInventory.java
package sample.commands;

import com.eventsourcing.EventStream;
import com.eventsourcing.StandardCommand;
import com.eventsourcing.layout.PropertyName;

import lombok.EqualsAndHashCode;
import lombok.Value;

import java.util.UUID;

import sample.events.ItemsCheckedInToInventory;

// 在庫数追加コマンド
@Value
@EqualsAndHashCode(callSuper = false)
public class CheckInItemsToInventory extends StandardCommand<Void, Void> {
    private UUID reference;
    private int count;

    public CheckInItemsToInventory(
            @PropertyName("reference") UUID reference,
            @PropertyName("count") int count) {

        this.reference = reference;
        this.count = count;
    }

    @Override
    public EventStream<Void> events() throws Exception {
        return EventStream.of(new ItemsCheckedInToInventory(reference, count));
    }
}

イベントの作成

イベントは StandardEvent を継承して作成します。

イベントを検索するための条件を SimpleIndex を使って定義します。

以下では、uuid の値で InventoryItemCreated を取得できるように、SimpleIndex<InventoryItemCreated, UUID> を定義しています。

在庫作成イベント src/main/java/sample/events/InventoryItemCreated.java
package sample.events;

import com.eventsourcing.StandardEvent;
import com.eventsourcing.index.SimpleIndex;

import java.util.UUID;

// 在庫作成イベント
public class InventoryItemCreated extends StandardEvent {
    public static SimpleIndex<InventoryItemCreated, UUID> ID = SimpleIndex.as(InventoryItemCreated::uuid);
}

在庫名の更新イベントの場合、最新のものを取得すれば最終的な名称が決定するので、timestamp で比較する SimpleIndex<InventoryItemRenamed, HybridTimestamp> を定義しています。

なお、名前変更のためのイベントは eventsourcing-cep モジュールに予め用意されていますが(NameChanged)、今回は自前で実装しています。

在庫名の変更イベント src/main/java/sample/events/InventoryItemRenamed.java
package sample.events;

import com.eventsourcing.StandardEvent;
import com.eventsourcing.hlc.HybridTimestamp;
import com.eventsourcing.index.Index;
import com.eventsourcing.index.SimpleIndex;
import com.eventsourcing.layout.PropertyName;

import lombok.EqualsAndHashCode;
import lombok.Value;

import java.util.UUID;

import static com.eventsourcing.index.IndexEngine.IndexFeature.*;

// 在庫名の変更イベント
@Value
@EqualsAndHashCode(callSuper = false)
public class InventoryItemRenamed extends StandardEvent {
    private UUID reference;
    private String newName;

    public InventoryItemRenamed(@PropertyName("reference") UUID reference,
                                @PropertyName("newName") String newName) {

        this.reference = reference;
        this.newName = newName;
    }

    public static SimpleIndex<InventoryItemRenamed, UUID> ID =
            SimpleIndex.as(InventoryItemRenamed::uuid);

    public static SimpleIndex<InventoryItemRenamed, UUID> REFERENCE_ID =
            SimpleIndex.as(InventoryItemRenamed::getReference);

    @Index({EQ, LT, GT})
    public static SimpleIndex<InventoryItemRenamed, HybridTimestamp> TIMESTAMP =
            SimpleIndex.as(InventoryItemRenamed::timestamp);
}
在庫数の追加イベント src/main/java/sample/events/ItemsCheckedInToInventory.java
package sample.events;

import com.eventsourcing.StandardEvent;
import com.eventsourcing.index.SimpleIndex;
import com.eventsourcing.layout.PropertyName;

import lombok.EqualsAndHashCode;
import lombok.Value;

import java.util.UUID;

// 在庫数の追加イベント
@Value
@EqualsAndHashCode(callSuper = false)
public class ItemsCheckedInToInventory extends StandardEvent {
    private UUID reference;
    private int count;

    public ItemsCheckedInToInventory(
            @PropertyName("reference") UUID reference,
            @PropertyName("count") int count) {

        this.reference = reference;
        this.count = count;
    }

    public static SimpleIndex<ItemsCheckedInToInventory, UUID> ID =
            SimpleIndex.as(ItemsCheckedInToInventory::uuid);

    public static SimpleIndex<ItemsCheckedInToInventory, UUID> REFERENCE_ID =
            SimpleIndex.as(ItemsCheckedInToInventory::getReference);
}

ドメインプロトコルの作成

ドメインプロトコルという機能を使えば、共通的なプロパティ(例えば name など)をドメインモデルから独立して定義できます。(ドメインモデルへ implements して使う)

Protocol を extends したインターフェースとして定義し、プロパティの値を(イベントの内容から)確定する処理をデフォルトメソッドとして定義します。

まずは、在庫名のドメインプロトコルを実装してみます。

在庫名は一番最後の InventoryItemRenamed の newName で確定するため、latestAssociatedEntity を使って最後の InventoryItemRenamed イベントを取得します。

なお、名前に関するドメインプロトコルも eventsourcing-cep モジュールに予め用意されています(NameProtocol)。

在庫名 src/main/java/sample/protocols/InventoryItemNameProtocol.java
package sample.protocols;

import com.eventsourcing.Protocol;
import com.eventsourcing.queries.ModelQueries;

import sample.events.InventoryItemRenamed;

// 在庫名
public interface InventoryItemNameProtocol extends Protocol, ModelQueries {

    default String name() {
        // reference が id の値に等しい最後の InventoryItemRenamed を取得し
        // newName の値を返す
        return latestAssociatedEntity(
            InventoryItemRenamed.class,
            InventoryItemRenamed.REFERENCE_ID,
            InventoryItemRenamed.TIMESTAMP
        ).map(InventoryItemRenamed::getNewName).orElse("");
    }
}

次に、在庫数の場合は、ItemsCheckedInToInventory を集計するように Repositoryquery メソッドで該当する全ての ItemsCheckedInToInventory を取得し、count の合計値を算出しています。

在庫数 src/main/java/sample/protocols/InventoryItemCountProtocol.java
package sample.protocols;

import com.eventsourcing.EntityHandle;
import com.eventsourcing.Protocol;

import lombok.val;

import java.util.stream.StreamSupport;

import sample.events.ItemsCheckedInToInventory;

import static com.eventsourcing.index.EntityQueryFactory.equal;

// 在庫数
public interface InventoryItemCountProtocol extends Protocol {

    default int count() {
        // reference が id の値に等しい ItemsCheckedInToInventory を全て取得
        val res = getRepository().query(
            ItemsCheckedInToInventory.class,
            equal(ItemsCheckedInToInventory.REFERENCE_ID, getId())
        );

        return StreamSupport.stream(res.spliterator(), false)
                .map(EntityHandle::get)
                .mapToInt(ItemsCheckedInToInventory::getCount)
                .sum();
    }
}

ドメインモデルの作成

ドメインモデルは Model と必要なドメインプロトコルを implements して作成します。

同一インスタンスの判定に id の値だけを使うように equals と hashCode をオーバーライドします。(今回は @EqualsAndHashCode(of = "id") で実施)

CreateInventoryItem コマンドの result メソッドで使った lookup メソッドの実装も行います。

src/main/java/sample/domain/InventoryItem.java
package sample.domain;

import com.eventsourcing.Model;
import com.eventsourcing.Repository;
import com.eventsourcing.queries.ModelQueries;

import lombok.EqualsAndHashCode;
import lombok.Value;

import java.util.Optional;
import java.util.UUID;

import sample.events.InventoryItemCreated;
import sample.protocols.InventoryItemCountProtocol;
import sample.protocols.InventoryItemNameProtocol;

@Value
@EqualsAndHashCode(of = "id")
public class InventoryItem implements Model, InventoryItemNameProtocol, InventoryItemCountProtocol {

    private final Repository repository;
    private final UUID id;

    protected InventoryItem(Repository repository, UUID id) {
        this.repository = repository;
        this.id = id;
    }

    public static Optional<InventoryItem> lookup(Repository repository, UUID id) {
        // InventoryItemCreated.ID インデックス(SimpleIndex)を使って
        // id に合致する InventoryItemCreated を取得
        Optional<InventoryItemCreated> res = ModelQueries.lookup(
            repository,
            InventoryItemCreated.class,
            InventoryItemCreated.ID,
            id
        );

        return res.map(ev -> new InventoryItem(repository, id));
    }
}

実行クラスの作成

es4j では Repository を使ってイベントソーシングの処理を行います。

Repository は StandardRepository へ以下のような設定を行う事で構築できます。

  • (1) Journal と IndexEngine の設定
  • (2) コマンドとイベントの設定(CommandSetProvider と EventSetProvider)

StandardRepository のデフォルト設定では localhost の NTP サーバーを使うようになっているようですが (NTP サーバーが無いとエラーになる)、今回はデフォルト設定の代わりに PhysicalTimeProvider 実装クラスを用意しました。

(2) で PackageXXXSetProvider を使うと指定のパッケージへ属するコマンドやイベントを一括で設定できます。

コマンドを適用する際は publish メソッドを使います。(publish の戻り値は CompletableFuture

src/main/java/SampleApp.java
import com.eventsourcing.*;
import com.eventsourcing.hlc.PhysicalTimeProvider;
import com.eventsourcing.index.MemoryIndexEngine;
import com.eventsourcing.inmem.MemoryJournal;
import com.eventsourcing.repository.StandardRepository;
import com.google.common.util.concurrent.AbstractService;

import java.util.concurrent.*;

import lombok.val;

import sample.commands.CheckInItemsToInventory;
import sample.commands.CreateInventoryItem;
import sample.domain.InventoryItem;
import sample.events.InventoryItemCreated;

public class SampleApp {
    public static void main(String... args) {
        // (1) Journal と IndexEngine の設定
        val repository = StandardRepository.builder()
                .journal(new MemoryJournal())
                .indexEngine(new MemoryIndexEngine())
                .physicalTimeProvider(new SampleTimeProvider())
                .build();

        // (2) コマンドとイベントの設定(CommandSetProvider と EventSetProvider)
        repository.addCommandSetProvider(
            new PackageCommandSetProvider(new Package[] {CreateInventoryItem.class.getPackage()}));

        repository.addEventSetProvider(
            new PackageEventSetProvider(new Package[] {InventoryItemCreated.class.getPackage()}));

        // 開始
        repository.startAsync().awaitRunning();

        try {
            // CreateInventoryItem コマンド適用
            val d = repository.publish(new CreateInventoryItem("sample1")).get();

            dumpInventoryItem(d);

            // CheckInItemsToInventory コマンド適用
            repository.publish(new CheckInItemsToInventory(d.getId(), 5)).get();

            dumpInventoryItem(d);

            // CheckInItemsToInventory コマンド適用
            repository.publish(new CheckInItemsToInventory(d.getId(), 3)).get();

            dumpInventoryItem(d);

        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        } finally {
            // 停止
            stopRepository(repository);
        }
    }

    private static InventoryItem dumpInventoryItem(InventoryItem item) {
        System.out.println("----- InventoryItem -----");

        System.out.println("id: " + item.getId());
        System.out.println("name: " + item.name());
        System.out.println("count: " + item.count());

        return item;
    }

    private static void stopRepository(Repository repository) {
        System.out.println("stop...");

        try {
            repository.stopAsync().awaitTerminated(10, TimeUnit.SECONDS);
        } catch (TimeoutException ex) {
            ex.printStackTrace();
        }
    }
    // PhysicalTimeProvider の実装
    static class SampleTimeProvider extends AbstractService implements PhysicalTimeProvider {

        @Override
        public long getPhysicalTime() {
            return System.currentTimeMillis();
        }

        @Override
        protected void doStart() {
            System.out.println("timeprovider start...");
            notifyStarted();
        }

        @Override
        protected void doStop() {
            System.out.println("timeprovider stop...");
            notifyStopped();
        }
    }
}

publish の呼び出し箇所は CompletableFuture で処理を繋げれば以下のように実装できます。

src/main/java/SampleApp.java(CompletableFuture 活用版)
public class SampleApp {
    public static void main(String... args) {
        ・・・

        repository.publish(new CreateInventoryItem("sample1"))
            .thenApply(SampleApp::dumpInventoryItem)
            .thenCompose(d ->
                repository.publish(new CheckInItemsToInventory(d.getId(), 5))
                    .thenApply(v -> d)
            )
            .thenApply(SampleApp::dumpInventoryItem)
            .thenCompose(d ->
                repository.publish(new CheckInItemsToInventory(d.getId(), 3))
                    .thenApply(v -> d)
            )
            .thenApply(SampleApp::dumpInventoryItem)
            .whenComplete((d, e) -> stopRepository(repository));
    }
    ・・・
}
実行結果
> gradle -q run

[main] INFO org.reflections.Reflections - Reflections took 64 ms to scan 1 urls, producing 2 keys and 4 values
[main] INFO org.reflections.Reflections - Reflections took 17 ms to scan 1 urls, producing 2 keys and 6 values
timeprovider start...
----- InventoryItem -----
id: d7636355-850d-4a15-9e85-ce0f1de514e7
name: sample1
count: 0
----- InventoryItem -----
id: d7636355-850d-4a15-9e85-ce0f1de514e7
name: sample1
count: 5
----- InventoryItem -----
id: d7636355-850d-4a15-9e85-ce0f1de514e7
name: sample1
count: 8
stop...
timeprovider stop...

備考

実は、今回のサンプルを実行すると処理が終わってもプロセスは終了しません。

スレッドダンプを出力してみたところ、StandardEntity に原因がありそうです。

スレッドダンプ出力例
> jcmd 8904 Thread.print

・・・
"Thread-1" #11 prio=5 os_prio=0 ・・・ waiting on condition [・・・]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <・・・> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingDeque.putLast(LinkedBlockingDeque.java:396)
        at java.util.concurrent.LinkedBlockingDeque.put(LinkedBlockingDeque.java:649)
        at com.eventsourcing.StandardEntity.lambda$static$0(StandardEntity.java:27)
        at com.eventsourcing.StandardEntity$$Lambda$14/540159270.run(Unknown Source)
        at java.lang.Thread.run(Thread.java:745)
・・・

StandardEntity のソース(27行目付近)を見てみると、以下のようにスレッドを開始して放置していました。

com.eventsourcing.StandardEntity のソース
・・・
public abstract class StandardEntity<E extends Entity<E>> implements Entity<E> {

    private static LinkedBlockingDeque<UUID> uuids = new LinkedBlockingDeque<>(10_000);

    static {
        new Thread(() -> {
            while (true) {
                try {
                    uuids.put(UUID.randomUUID()); // 27行目
                } catch (InterruptedException e) {
                }
            }
        }).start();
    }
    ・・・

RxJava2 で並列処理

前回 と同じような並列処理を RxJava 2.0 で試してみました。

ソースは http://github.com/fits/try_samples/tree/master/blog/20161226/

はじめに

まずは、map の処理と subscribe の処理を順番に 3回繰り返す処理です。 前回の Reactor との違いは MonoSingle に変わっただけです。

(A) サンプルコード
Single.just("(A)")
    .repeat(3)
    .map(n -> {
        printThread(n + "_map");
        return n;
    })
    .subscribe(n -> printThread(n + "_subscribe"));
(A) サンプルコードの実行結果
(A)_map, thread: Thread[main,5,main]
(A)_subscribe, thread: Thread[main,5,main]
(A)_map, thread: Thread[main,5,main]
(A)_subscribe, thread: Thread[main,5,main]
(A)_map, thread: Thread[main,5,main]
(A)_subscribe, thread: Thread[main,5,main]

observeOn の場合

RxJava 2.0 では Reactor の publishOn の代わりに observeOn が使えます。 main スレッドとは別のスレッドで実行しますが、順番に実行する点は変わりません。

(B) observeOn を使った場合
Single.just("(B)")
    .repeat(3)
    .observeOn(Schedulers.computation())
    // 実質的には以下でも同じ
    //.observeOn(Schedulers.single())
    .map(n -> {
        printThread(n + "_map");
        return n;
    })
    .subscribe(n -> printThread(n + "_subscribe"));
(B) observeOn を使った場合の実行結果
(B)_map, thread: Thread[RxComputationThreadPool-1,5,main]
(B)_subscribe, thread: Thread[RxComputationThreadPool-1,5,main]
(B)_map, thread: Thread[RxComputationThreadPool-1,5,main]
(B)_subscribe, thread: Thread[RxComputationThreadPool-1,5,main]
(B)_map, thread: Thread[RxComputationThreadPool-1,5,main]
(B)_subscribe, thread: Thread[RxComputationThreadPool-1,5,main]

flatMap と subscribeOn を使った並列処理

RxJava 2.0 では Reactor の ParallelFlux 相当の処理は用意されていないようです。

そこで、flatMapsubscribeOn を使ってみました。

(C) flatMap + subscribeOn
Single.just("(C)")
    .repeat(3)
    .flatMap(s ->
        Flowable.just(s)
            .map(n -> {
                printThread(n + "_map");
                return n;
            })
            .subscribeOn(Schedulers.computation())
            //.subscribeOn(Schedulers.newThread())
    )
    .subscribe(n -> printThread(n + "_subscribe"));

ただし、以下の実行結果のように別スレッドで実行してはいるものの、map と subscribe を同じスレッドで実行するわけではありませんでした。

なお、Schedulers.computation() の場合、スレッド数は実行環境のコア数に依存するようです。(newThread は新しいスレッドを使う)

(C) flatMap + subscribeOn の実行結果1
(C)_map, thread: Thread[RxComputationThreadPool-2,5,main]
(C)_map, thread: Thread[RxComputationThreadPool-3,5,main]
(C)_map, thread: Thread[RxComputationThreadPool-1,5,main]
(C)_subscribe, thread: Thread[RxComputationThreadPool-2,5,main]
(C)_subscribe, thread: Thread[RxComputationThreadPool-1,5,main]
(C)_subscribe, thread: Thread[RxComputationThreadPool-1,5,main]
(C) flatMap + subscribeOn の実行結果2
(C)_map, thread: Thread[RxComputationThreadPool-1,5,main]
(C)_map, thread: Thread[RxComputationThreadPool-3,5,main]
(C)_map, thread: Thread[RxComputationThreadPool-2,5,main]
(C)_subscribe, thread: Thread[RxComputationThreadPool-1,5,main]
(C)_subscribe, thread: Thread[RxComputationThreadPool-2,5,main]
(C)_subscribe, thread: Thread[RxComputationThreadPool-2,5,main]

備考

最後に、今回使用したソースを記載しておきます。

build.gradle
apply plugin: 'application'

mainClassName = 'App'

repositories {
    jcenter()
}

dependencies {
    compile 'io.reactivex.rxjava2:rxjava:2.0.3'
}

run {
    standardInput = System.in

    if (project.hasProperty('args')) {
        args project.args
    }
}
src/main/java/App.java
import io.reactivex.Single;
import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;

import java.lang.invoke.MethodHandle;

import static java.lang.invoke.MethodHandles.publicLookup;
import static java.lang.invoke.MethodType.methodType;

public class App {
    public static void main(String... args) throws Throwable {
        MethodHandle mh = publicLookup().findStatic(
            App.class, 
            "sample" + args[0], 
            methodType(void.class)
        );

        mh.invoke();

        System.in.read();
    }

    // (A)
    public static void sampleA() {
        Single.just("(A)")
            .repeat(3)
            .map(n -> {
                printThread(n + "_map");
                return n;
            })
            .subscribe(n -> printThread(n + "_subscribe"));
    }

    // (B)
    public static void sampleB() {
        Single.just("(B)")
            .repeat(3)
            .observeOn(Schedulers.computation())
            //.observeOn(Schedulers.single())
            .map(n -> {
                printThread(n + "_map");
                return n;
            })
            .subscribe(n -> printThread(n + "_subscribe"));
    }

    // (C) 
    public static void sampleC() {
        Single.just("(C)")
            .repeat(3)
            .flatMap(s ->
                Flowable.just(s)
                    .map(n -> {
                        printThread(n + "_map");
                        return n;
                    })
                    .subscribeOn(Schedulers.computation())
                    //.subscribeOn(Schedulers.newThread())

            )
            .subscribe(n -> printThread(n + "_subscribe"));
    }

    private static void printThread(String msg) {
        System.out.println(msg + ", thread: " + Thread.currentThread());
    }
}
実行例
> gradle -q run -Pargs=C

(C)_map, thread: Thread[RxComputationThreadPool-2,5,main]
(C)_map, thread: Thread[RxComputationThreadPool-3,5,main]
(C)_map, thread: Thread[RxComputationThreadPool-1,5,main]
(C)_subscribe, thread: Thread[RxComputationThreadPool-2,5,main]
(C)_subscribe, thread: Thread[RxComputationThreadPool-3,5,main]
(C)_subscribe, thread: Thread[RxComputationThreadPool-3,5,main]

Reactor で並列処理

Reactor の並列処理を試してみました。

ソースは http://github.com/fits/try_samples/tree/master/blog/20161208/

はじめに

Reactor で以下のようなコードを実行すると、map の処理と subscribe の処理を順番に 3回繰り返します。(repeat メソッドの戻り値は Flux

(A) サンプルコード
Mono.just("(A)")
    .repeat(3)
    .map(n -> {
        printThread(n + "_map");
        return n;
    })
    .subscribe(n -> printThread(n + "_subscribe"));
(A) サンプルコードの実行結果
(A)_map, thread: Thread[main,5,main]
(A)_subscribe, thread: Thread[main,5,main]
(A)_map, thread: Thread[main,5,main]
(A)_subscribe, thread: Thread[main,5,main]
(A)_map, thread: Thread[main,5,main]
(A)_subscribe, thread: Thread[main,5,main]

ここで、今回は map からの処理を別スレッドで並列実行してみます。

publishOn の場合

publishOn を使うと main スレッドとは別のスレッドで実行しますが、順番に実行する点は変わりません。

(B) publishOn を使った場合
Mono.just("(B)")
    .repeat(3)
    .publishOn(Schedulers.parallel())
    // 実質的に以下と同じ
    //.publishOn(Schedulers.single())
    .map(n -> {
        printThread(n + "_map");
        return n;
    })
    .subscribe(n -> printThread(n + "_subscribe"));
(B) publishOn を使った場合の実行結果
(B)_map, thread: Thread[parallel-1,5,main]
(B)_subscribe, thread: Thread[parallel-1,5,main]
(B)_map, thread: Thread[parallel-1,5,main]
(B)_subscribe, thread: Thread[parallel-1,5,main]
(B)_map, thread: Thread[parallel-1,5,main]
(B)_subscribe, thread: Thread[parallel-1,5,main]

publishOn へ Schedulers.parallel() を渡しても複数のスレッドを使うわけでは無いようなので、実質的に Schedulers.single() と変わりは無さそうです。

ParallelFlux を使った並列処理

並列で実行するためには parallel メソッドを使って Flux から ParallelFlux を取得し、runOn メソッドで Schedulers.parallel() ※ を指定します。

 ※ この場合、Schedulers.parallel() の代わりに
    Schedulers.single() を使うと並列にならない点に注意
    (単一スレッドで実行する事になる)
(C) parallel + runOn
Mono.just("(C)")
    .repeat(3)
    .parallel()
    .runOn(Schedulers.parallel())
    .map(n -> {
        printThread(n + "_map");
        return n;
    })
    .subscribe(n -> printThread(n + "_subscribe"));
(C) parallel + runOn の実行結果
(C)_map, thread: Thread[parallel-1,5,main]
(C)_map, thread: Thread[parallel-3,5,main]
(C)_map, thread: Thread[parallel-2,5,main]
(C)_subscribe, thread: Thread[parallel-3,5,main]
(C)_subscribe, thread: Thread[parallel-1,5,main]
(C)_subscribe, thread: Thread[parallel-2,5,main]

3つのスレッドを使って並列に実行できているようです。

なお、runOnコメントアウトして実行すると (A) と同様の結果(main スレッドで順番に実行)になります。

備考

最後に、今回使用したソースを記載しておきます。

build.gradle
apply plugin: 'application'

mainClassName = 'App'

repositories {
    jcenter()
}

dependencies {
    compile 'io.projectreactor:reactor-core:3.0.3.RELEASE'
}

run {
    standardInput = System.in

    if (project.hasProperty('args')) {
        args project.args
    }
}
src/main/java/App.java
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.lang.invoke.MethodHandle;

import static java.lang.invoke.MethodHandles.publicLookup;
import static java.lang.invoke.MethodType.methodType;

public class App {
    public static void main(String... args) throws Throwable {
        MethodHandle mh = publicLookup().findStatic(
            App.class, 
            "sample" + args[0], 
            methodType(void.class)
        );

        mh.invoke();

        System.in.read();
    }

    // (A)
    public static void sampleA() {
        Mono.just("(A)")
            .repeat(3)
            .map(n -> {
                printThread(n + "_map");
                return n;
            })
            .subscribe(n -> printThread(n + "_subscribe"));
    }

    // (B) publishOn を使った場合
    public static void sampleB() {
        Mono.just("(B)")
            .repeat(3)
            .publishOn(Schedulers.parallel())
            //.publishOn(Schedulers.single())
            .map(n -> {
                printThread(n + "_map");
                return n;
            })
            .subscribe(n -> printThread(n + "_subscribe"));
    }

    // (C) parallel + runOn
    public static void sampleC() {
        Mono.just("(C)")
            .repeat(3)
            .parallel()
            .runOn(Schedulers.parallel())
            .map(n -> {
                printThread(n + "_map");
                return n;
            })
            .subscribe(n -> printThread(n + "_subscribe"));
    }

    private static void printThread(String msg) {
        System.out.println(msg + ", thread: " + Thread.currentThread());
    }
}
実行例
> gradle -q run -Pargs=B

(B)_map, thread: Thread[parallel-1,5,main]
(B)_subscribe, thread: Thread[parallel-1,5,main]
(B)_map, thread: Thread[parallel-1,5,main]
(B)_subscribe, thread: Thread[parallel-1,5,main]
(B)_map, thread: Thread[parallel-1,5,main]
(B)_subscribe, thread: Thread[parallel-1,5,main]

Spring Web Reactive を試す

Spring 5 で導入される Spring Web Reactive を試してみました。

本来なら Spring Boot で実行する事になると思いますが、今回は Spring Boot を使わずに Undertow で直接実行してみます。

ソースは http://github.com/fits/try_samples/tree/master/blog/20161115/

サンプル作成

ビルド定義

現時点では Spring Web Reactive の正式版はリリースされていないようなのでスナップショット版を使います。

Undertow を実行するために undertow-coreJSON で結果を返す処理を試すために jackson-databind を依存関係へ設定しています。

build.gradle
apply plugin: 'application'

mainClassName = 'sample.App'

repositories {
    jcenter()

    maven {
        url 'http://repo.spring.io/snapshot/'
    }
}

dependencies {
    compile 'org.projectlombok:lombok:1.16.10'

    // Spring Web Reactive
    compile 'org.springframework:spring-web-reactive:5.0.0.BUILD-SNAPSHOT'
    // Undertow
    compile 'io.undertow:undertow-core:2.0.0.Alpha1'

    // JSON 用
    runtime 'com.fasterxml.jackson.core:jackson-databind:2.8.4'
}

設定クラス

@EnableWebReactive を付与すれば Spring Web Reactive を有効にできるようです。

src/main/java/sample/config/AppConfig.java
package sample.config;

import org.springframework.context.annotation.ComponentScan;
import org.springframework.web.reactive.config.EnableWebReactive;

@EnableWebReactive
@ComponentScan("sample.controller")
public class AppConfig {
}

コントローラークラス

Spring Web Reactive は Spring Web (MVC) のプログラミングスタイルを踏襲しているようです。

Spring Web (MVC) と同じアノテーションでコントローラークラスを定義し、メソッドの戻り値に Reactor の Mono / Flux を使えばよさそうです。

クラス 概要
Mono 単一の結果を返す場合に使用
Flux 複数の結果を返す場合に使用
src/main/java/sample/controller/SampleController.java
package sample.controller;

import lombok.Value;

import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@RestController
public class SampleController {

    @RequestMapping("/")
    public Mono<String> sample() {
        return Mono.just("sample");
    }

    @RequestMapping(value = "/data/{name}", produces = "application/json")
    public Flux<Data> dataList(@PathVariable("name") String name) {
        return Flux.fromArray(new Data[] {
            new Data(name + "-1", 1),
            new Data(name + "-2", 2),
            new Data(name + "-3", 3)
        });
    }

    @Value
    class Data {
        private String name;
        private int value;
    }
}

実行クラス

まずは、Spring Web Reactive を有効化した ApplicationContext を使って HttpHandler を作成します。 (DispatcherHandlerWebHttpHandlerBuilder を使用)

あとは、対象の Web サーバー ※ に合わせて実行します。

 ※ 今のところ Servlet 3.1 対応コンテナ、Netty、Undertow を
    サポートしているようです

Undertow の場合、アダプタークラス UndertowHttpHandlerAdapter で HttpHandler をラッピングして実行するだけです。

src/main/java/sample/App.java
package sample;

import io.undertow.Undertow;

import lombok.val;

import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.http.server.reactive.UndertowHttpHandlerAdapter;
import org.springframework.web.reactive.DispatcherHandler;
import org.springframework.web.server.adapter.WebHttpHandlerBuilder;

import sample.config.AppConfig;

public class App {
    public static void main(String... args) {

        val ctx = new AnnotationConfigApplicationContext();
        // @EnableWebReactive を付与したクラスを登録
        ctx.register(AppConfig.class);
        ctx.refresh();

        val handler = new DispatcherHandler();
        handler.setApplicationContext(ctx);

        // HttpHandler の作成
        val httpHandler = WebHttpHandlerBuilder.webHandler(handler).build();

        val server = Undertow.builder()
                .addHttpListener(8080, "localhost")
                // Undertow 用アダプターでラッピングして設定
                .setHandler(new UndertowHttpHandlerAdapter(httpHandler))
                .build();

        server.start();
    }
}

実行

Gradle で実行
> gradle -q run

・・・
情報: Mapped "{[/data/{name}],produces=[application/json]}" onto public reactor.core.publisher.Flux<sample.controller.SampleController$Data> sample.controller.SampleController.dataList(java.lang.String)
11 14, 2016 9:32:23 午後 org.springframework.web.reactive.result.method.annotation.RequestMappingHandlerMapping register
情報: Mapped "{[/]}" onto public reactor.core.publisher.Mono<java.lang.String> sample.controller.SampleController.sample()
11 14, 2016 9:32:23 午後 org.xnio.Xnio <clinit>
INFO: XNIO version 3.3.6.Final
11 14, 2016 9:32:23 午後 org.xnio.nio.NioXnio <clinit>
INFO: XNIO NIO Implementation Version 3.3.6.Final
動作確認1
$ curl -s http://localhost:8080/
sample
動作確認2
$ curl -s http://localhost:8080/data/a
[{"name":"a-1","value":1},{"name":"a-2","value":2},{"name":"a-3","value":3}]

Java で行列の演算 - nd4j, commons-math, la4j, ujmp, jblas, colt

Java で以下のような行列の演算を複数のライブラリで試しました。

  • (a) 和
  • (b) 積
  • (c) 転置

とりあえず今回は、更新日が比較的新しめのライブラリを試してみました。

また、あくまでも個人的な印象ですが、手軽に使いたいなら la4j か Commons Math、性能重視なら ND4J か jblas、可視化や DB との連携を考慮するなら UJMP を使えば良さそうです。

ソースは http://github.com/fits/try_samples/tree/master/blog/20161031/

ND4J

Deeplearning4J で使用しているライブラリ。

build.gradle
apply plugin: 'application'

mainClassName = 'SampleApp'

repositories {
    jcenter()
}

dependencies {
    compile 'org.nd4j:nd4j-native-platform:0.6.0'
    runtime 'org.slf4j:slf4j-nop:1.7.21'
}
src/main/java/SampleApp.java
import org.nd4j.linalg.api.ndarray.INDArray;
import org.nd4j.linalg.factory.Nd4j;

public class SampleApp {
    public static void main(String... args) {
        INDArray x = Nd4j.create(new double[][] {
            {1, 2},
            {3, 4}
        });

        INDArray y = Nd4j.create(new double[][] {
            {5, 6},
            {7, 8}
        });
        // (a)
        System.out.println( x.add(y) );

        System.out.println("-----");
        // (b)
        System.out.println( x.mmul(y) );

        System.out.println("-----");
        // (c)
        System.out.println( x.transpose() );
    }
}
実行結果
> gradle -q run

[[ 6.00,  8.00],
 [10.00, 12.00]]
-----
[[19.00, 22.00],
 [43.00, 50.00]]
-----
[[1.00, 3.00],
 [2.00, 4.00]]

Commons Math

Apache Commons のライブラリ。

build.gradle
・・・
dependencies {
    compile 'org.apache.commons:commons-math3:3.6.1'
}
src/main/java/SampleApp.java
import org.apache.commons.math3.linear.MatrixUtils;
import org.apache.commons.math3.linear.RealMatrix;

public class SampleApp {
    public static void main(String... args) {
        RealMatrix x = MatrixUtils.createRealMatrix(new double[][] {
            {1, 2},
            {3, 4}
        });

        RealMatrix y = MatrixUtils.createRealMatrix(new double[][] {
            {5, 6},
            {7, 8}
        });
        // (a)
        System.out.println( x.add(y) );

        System.out.println("-----");
        // (b)
        System.out.println( x.multiply(y) );

        System.out.println("-----");
        // (c)
        System.out.println( x.transpose() );
    }
}
実行結果
> gradle -q run

Array2DRowRealMatrix{{6.0,8.0},{10.0,12.0}}
-----
Array2DRowRealMatrix{{19.0,22.0},{43.0,50.0}}
-----
Array2DRowRealMatrix{{1.0,3.0},{2.0,4.0}}

la4j

Java のみで実装された軽量なライブラリ。

build.gradle
・・・
dependencies {
    compile 'org.la4j:la4j:0.6.0'
}
src/main/java/SampleApp.java
import org.la4j.Matrix;

public class SampleApp {
    public static void main(String... args) {
        Matrix x = Matrix.from2DArray(new double[][] {
            {1, 2},
            {3, 4}
        });

        Matrix y = Matrix.from2DArray(new double[][] {
            {5, 6},
            {7, 8}
        });
        // (a)
        System.out.println( x.add(y) );

        System.out.println("-----");
        // (b)
        System.out.println( x.multiply(y) );

        System.out.println("-----");
        // (c)
        System.out.println( x.transpose() );
    }
}
実行結果
> gradle -q run

 6.000  8.000
10.000 12.000

-----
19.000 22.000
43.000 50.000

-----
1.000 3.000
2.000 4.000

UJMP

データ可視化や JDBC との連携など、機能豊富そうなライブラリ。 Colt や jblas 等ともプラグインモジュールで連携できる模様。

build.gradle
・・・
dependencies {
    compile 'org.ujmp:ujmp-core:0.3.0'
}
src/main/java/SampleApp.java
import org.ujmp.core.DenseMatrix;
import org.ujmp.core.Matrix;

public class SampleApp {
    public static void main(String... args) {
        Matrix x = DenseMatrix.Factory.linkToArray(
            new double[] {1, 2},
            new double[] {3, 4}
        );

        Matrix y = DenseMatrix.Factory.linkToArray(
            new double[] {5, 6},
            new double[] {7, 8}
        );
        // (a)
        System.out.println( x.plus(y) );

        System.out.println("-----");
        // (b)
        System.out.println( x.mtimes(y) );

        System.out.println("-----");
        // (c)
        System.out.println( x.transpose() );
    }
}
実行結果
> gradle -q run

    6.0000     8.0000
   10.0000    12.0000

-----
   19.0000    22.0000
   43.0000    50.0000

-----
    1.0000     3.0000
    2.0000     4.0000

jblas

BLAS/LAPACK をベースとしたライブラリ。 ネイティブライブラリを使用する。

build.gradle
・・・
dependencies {
    compile 'org.jblas:jblas:1.2.4'
}
src/main/java/SampleApp.java
import org.jblas.DoubleMatrix;

public class SampleApp {
    public static void main(String... args) {
        DoubleMatrix x = new DoubleMatrix(new double[][] {
            {1, 2},
            {3, 4}
        });

        DoubleMatrix y = new DoubleMatrix(new double[][] {
            {5, 6},
            {7, 8}
        });
        // (a)
        System.out.println( x.add(y) );

        System.out.println("-----");
        // (b)
        System.out.println( x.mmul(y) );

        System.out.println("-----");
        // (c)
        System.out.println( x.transpose() );
    }
}
実行結果
> gradle -q run

[6.000000, 8.000000; 10.000000, 12.000000]
-----
[19.000000, 22.000000; 43.000000, 50.000000]
-----
[1.000000, 3.000000; 2.000000, 4.000000]
-- org.jblas INFO Starting temp DLL cleanup task.
-- org.jblas INFO Deleted 4 unused temp DLL libraries from ・・・

Colt Blazegraph 版

Colt は長らく更新されていないようなので、今回は Blazegraph による fork 版? を使いました。

build.gradle
・・・
dependencies {
    compile 'com.blazegraph:colt:2.1.4'
}
src/main/java/SampleApp.java
import cern.colt.matrix.DoubleFactory2D;
import cern.colt.matrix.DoubleMatrix2D;
import cern.colt.matrix.linalg.Algebra;
import cern.jet.math.Functions;

public class SampleApp {
    public static void main(String... args) {
        DoubleMatrix2D x = DoubleFactory2D.dense.make(new double[][] {
            {1, 2},
            {3, 4}
        });

        DoubleMatrix2D y = DoubleFactory2D.dense.make(new double[][] {
            {5, 6},
            {7, 8}
        });
        // (a)
        System.out.println( x.copy().assign(y, Functions.plus) );

        System.out.println("-----");

        Algebra algebra = new Algebra();
        // (b)
        System.out.println( algebra.mult(x, y) );

        System.out.println("-----");
        // (c)
        System.out.println( x.viewDice() );
    }
}

assign を使うと自身の値を更新するため copy を使っています。

実行結果
> gradle -q run

2 x 2 matrix
 6  8
10 12
-----
2 x 2 matrix
19 22
43 50
-----
2 x 2 matrix
1 3
2 4