読者です 読者をやめる 読者になる 読者になる

es4j でイベントソーシング

Axon Framework でイベントソーシング」 と同様の処理を es4j (Eventsourcing for Java) で実装してみました。

今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170107/

はじめに

今回はイベントをインメモリで保持する eventsourcing-inmem を使うため、以下のような Gradle 用ビルド定義を用意しました。 (lombok は便利なので使います)

build.gradle
apply plugin: 'application'

mainClassName = 'SampleApp'

repositories {
    jcenter()
}

dependencies {
    compileOnly "org.projectlombok:lombok:1.16.12"

    compile 'com.eventsourcing:eventsourcing-inmem:0.4.5'
}

es4j によるイベントソーシング

コマンドの作成

コマンドは StandardCommand<S, R> を継承して作成し、S に状態の型を R に結果の型を指定します。

コマンドを適用することで生じるイベントは events メソッドの戻り値で返し、コマンド適用結果は result メソッドの戻り値で返します。

状態は events メソッドで用意し、result メソッドの引数として渡ってきます。

まずは、在庫作成のコマンドを実装してみます。

状態の型に InventoryItemCreated を、結果の型に InventoryItem を指定しています。

状態を伴う EventStream を返すために EventStream.ofWithState を使用します。

コンストラクタの引数名からプロパティを認識するようなので、-parameters オプションを使ってビルドするか、引数へ @PropertyName を付与します。(今回は @PropertyName を使用)

なお、状態の型は InventoryItemCreated としなければならないわけでもなく、UUID を代わりに使っても動作に支障はなさそうでした。

在庫作成コマンド src/main/java/sample/commands/CreateInventoryItem.java
package sample.commands;

import com.eventsourcing.EventStream;
import com.eventsourcing.Repository;
import com.eventsourcing.StandardCommand;
import com.eventsourcing.layout.PropertyName;

import lombok.EqualsAndHashCode;
import lombok.Value;
import lombok.val;

import sample.domain.InventoryItem;
import sample.events.InventoryItemCreated;
import sample.events.InventoryItemRenamed;

// 在庫作成コマンド
@Value
@EqualsAndHashCode(callSuper = false)
public class CreateInventoryItem extends StandardCommand<InventoryItemCreated, InventoryItem> {
    private String name;

    public CreateInventoryItem(@PropertyName("name") String name) {
        this.name = name;
    }
    // イベントの生成
    @Override
    public EventStream<InventoryItemCreated> events() throws Exception {
        // 在庫作成イベント
        val created = new InventoryItemCreated();
        // 名前変更イベント
        val renamed = new InventoryItemRenamed(created.uuid(), name);

        return EventStream.ofWithState(created, created, renamed);
    }
    // 結果の生成
    @Override
    public InventoryItem result(InventoryItemCreated state, Repository repository) {
        return InventoryItem.lookup(repository, state.uuid()).get();
    }
}

次に、在庫数を加えるためのコマンドです。

ドメインモデルの内部状態を更新するだけなので、状態と結果の型を Void としました。

状態を伴わないので EventStream.of で EventStream を返します。 コマンドの適用結果が Void なので result メソッドをオーバーライドする必要もありません。

在庫数追加コマンド src/main/java/sample/commands/CheckInItemsToInventory.java
package sample.commands;

import com.eventsourcing.EventStream;
import com.eventsourcing.StandardCommand;
import com.eventsourcing.layout.PropertyName;

import lombok.EqualsAndHashCode;
import lombok.Value;

import java.util.UUID;

import sample.events.ItemsCheckedInToInventory;

// 在庫数追加コマンド
@Value
@EqualsAndHashCode(callSuper = false)
public class CheckInItemsToInventory extends StandardCommand<Void, Void> {
    private UUID reference;
    private int count;

    public CheckInItemsToInventory(
            @PropertyName("reference") UUID reference,
            @PropertyName("count") int count) {

        this.reference = reference;
        this.count = count;
    }

    @Override
    public EventStream<Void> events() throws Exception {
        return EventStream.of(new ItemsCheckedInToInventory(reference, count));
    }
}

イベントの作成

イベントは StandardEvent を継承して作成します。

イベントを検索するための条件を SimpleIndex を使って定義します。

以下では、uuid の値で InventoryItemCreated を取得できるように、SimpleIndex<InventoryItemCreated, UUID> を定義しています。

在庫作成イベント src/main/java/sample/events/InventoryItemCreated.java
package sample.events;

import com.eventsourcing.StandardEvent;
import com.eventsourcing.index.SimpleIndex;

import java.util.UUID;

// 在庫作成イベント
public class InventoryItemCreated extends StandardEvent {
    public static SimpleIndex<InventoryItemCreated, UUID> ID = SimpleIndex.as(InventoryItemCreated::uuid);
}

在庫名の更新イベントの場合、最新のものを取得すれば最終的な名称が決定するので、timestamp で比較する SimpleIndex<InventoryItemRenamed, HybridTimestamp> を定義しています。

なお、名前変更のためのイベントは eventsourcing-cep モジュールに予め用意されていますが(NameChanged)、今回は自前で実装しています。

在庫名の変更イベント src/main/java/sample/events/InventoryItemRenamed.java
package sample.events;

import com.eventsourcing.StandardEvent;
import com.eventsourcing.hlc.HybridTimestamp;
import com.eventsourcing.index.Index;
import com.eventsourcing.index.SimpleIndex;
import com.eventsourcing.layout.PropertyName;

import lombok.EqualsAndHashCode;
import lombok.Value;

import java.util.UUID;

import static com.eventsourcing.index.IndexEngine.IndexFeature.*;

// 在庫名の変更イベント
@Value
@EqualsAndHashCode(callSuper = false)
public class InventoryItemRenamed extends StandardEvent {
    private UUID reference;
    private String newName;

    public InventoryItemRenamed(@PropertyName("reference") UUID reference,
                                @PropertyName("newName") String newName) {

        this.reference = reference;
        this.newName = newName;
    }

    public static SimpleIndex<InventoryItemRenamed, UUID> ID =
            SimpleIndex.as(InventoryItemRenamed::uuid);

    public static SimpleIndex<InventoryItemRenamed, UUID> REFERENCE_ID =
            SimpleIndex.as(InventoryItemRenamed::getReference);

    @Index({EQ, LT, GT})
    public static SimpleIndex<InventoryItemRenamed, HybridTimestamp> TIMESTAMP =
            SimpleIndex.as(InventoryItemRenamed::timestamp);
}
在庫数の追加イベント src/main/java/sample/events/ItemsCheckedInToInventory.java
package sample.events;

import com.eventsourcing.StandardEvent;
import com.eventsourcing.index.SimpleIndex;
import com.eventsourcing.layout.PropertyName;

import lombok.EqualsAndHashCode;
import lombok.Value;

import java.util.UUID;

// 在庫数の追加イベント
@Value
@EqualsAndHashCode(callSuper = false)
public class ItemsCheckedInToInventory extends StandardEvent {
    private UUID reference;
    private int count;

    public ItemsCheckedInToInventory(
            @PropertyName("reference") UUID reference,
            @PropertyName("count") int count) {

        this.reference = reference;
        this.count = count;
    }

    public static SimpleIndex<ItemsCheckedInToInventory, UUID> ID =
            SimpleIndex.as(ItemsCheckedInToInventory::uuid);

    public static SimpleIndex<ItemsCheckedInToInventory, UUID> REFERENCE_ID =
            SimpleIndex.as(ItemsCheckedInToInventory::getReference);
}

ドメインプロトコルの作成

ドメインプロトコルという機能を使えば、共通的なプロパティ(例えば name など)をドメインモデルから独立して定義できます。(ドメインモデルへ implements して使う)

Protocol を extends したインターフェースとして定義し、プロパティの値を(イベントの内容から)確定する処理をデフォルトメソッドとして定義します。

まずは、在庫名のドメインプロトコルを実装してみます。

在庫名は一番最後の InventoryItemRenamed の newName で確定するため、latestAssociatedEntity を使って最後の InventoryItemRenamed イベントを取得します。

なお、名前に関するドメインプロトコルも eventsourcing-cep モジュールに予め用意されています(NameProtocol)。

在庫名 src/main/java/sample/protocols/InventoryItemNameProtocol.java
package sample.protocols;

import com.eventsourcing.Protocol;
import com.eventsourcing.queries.ModelQueries;

import sample.events.InventoryItemRenamed;

// 在庫名
public interface InventoryItemNameProtocol extends Protocol, ModelQueries {

    default String name() {
        // reference が id の値に等しい最後の InventoryItemRenamed を取得し
        // newName の値を返す
        return latestAssociatedEntity(
            InventoryItemRenamed.class,
            InventoryItemRenamed.REFERENCE_ID,
            InventoryItemRenamed.TIMESTAMP
        ).map(InventoryItemRenamed::getNewName).orElse("");
    }
}

次に、在庫数の場合は、ItemsCheckedInToInventory を集計するように Repositoryquery メソッドで該当する全ての ItemsCheckedInToInventory を取得し、count の合計値を算出しています。

在庫数 src/main/java/sample/protocols/InventoryItemCountProtocol.java
package sample.protocols;

import com.eventsourcing.EntityHandle;
import com.eventsourcing.Protocol;

import lombok.val;

import java.util.stream.StreamSupport;

import sample.events.ItemsCheckedInToInventory;

import static com.eventsourcing.index.EntityQueryFactory.equal;

// 在庫数
public interface InventoryItemCountProtocol extends Protocol {

    default int count() {
        // reference が id の値に等しい ItemsCheckedInToInventory を全て取得
        val res = getRepository().query(
            ItemsCheckedInToInventory.class,
            equal(ItemsCheckedInToInventory.REFERENCE_ID, getId())
        );

        return StreamSupport.stream(res.spliterator(), false)
                .map(EntityHandle::get)
                .mapToInt(ItemsCheckedInToInventory::getCount)
                .sum();
    }
}

ドメインモデルの作成

ドメインモデルは Model と必要なドメインプロトコルを implements して作成します。

同一インスタンスの判定に id の値だけを使うように equals と hashCode をオーバーライドします。(今回は @EqualsAndHashCode(of = "id") で実施)

CreateInventoryItem コマンドの result メソッドで使った lookup メソッドの実装も行います。

src/main/java/sample/domain/InventoryItem.java
package sample.domain;

import com.eventsourcing.Model;
import com.eventsourcing.Repository;
import com.eventsourcing.queries.ModelQueries;

import lombok.EqualsAndHashCode;
import lombok.Value;

import java.util.Optional;
import java.util.UUID;

import sample.events.InventoryItemCreated;
import sample.protocols.InventoryItemCountProtocol;
import sample.protocols.InventoryItemNameProtocol;

@Value
@EqualsAndHashCode(of = "id")
public class InventoryItem implements Model, InventoryItemNameProtocol, InventoryItemCountProtocol {

    private final Repository repository;
    private final UUID id;

    protected InventoryItem(Repository repository, UUID id) {
        this.repository = repository;
        this.id = id;
    }

    public static Optional<InventoryItem> lookup(Repository repository, UUID id) {
        // InventoryItemCreated.ID インデックス(SimpleIndex)を使って
        // id に合致する InventoryItemCreated を取得
        Optional<InventoryItemCreated> res = ModelQueries.lookup(
            repository,
            InventoryItemCreated.class,
            InventoryItemCreated.ID,
            id
        );

        return res.map(ev -> new InventoryItem(repository, id));
    }
}

実行クラスの作成

es4j では Repository を使ってイベントソーシングの処理を行います。

Repository は StandardRepository へ以下のような設定を行う事で構築できます。

  • (1) Journal と IndexEngine の設定
  • (2) コマンドとイベントの設定(CommandSetProvider と EventSetProvider)

StandardRepository のデフォルト設定では localhost の NTP サーバーを使うようになっているようですが (NTP サーバーが無いとエラーになる)、今回はデフォルト設定の代わりに PhysicalTimeProvider 実装クラスを用意しました。

(2) で PackageXXXSetProvider を使うと指定のパッケージへ属するコマンドやイベントを一括で設定できます。

コマンドを適用する際は publish メソッドを使います。(publish の戻り値は CompletableFuture

src/main/java/SampleApp.java
import com.eventsourcing.*;
import com.eventsourcing.hlc.PhysicalTimeProvider;
import com.eventsourcing.index.MemoryIndexEngine;
import com.eventsourcing.inmem.MemoryJournal;
import com.eventsourcing.repository.StandardRepository;
import com.google.common.util.concurrent.AbstractService;

import java.util.concurrent.*;

import lombok.val;

import sample.commands.CheckInItemsToInventory;
import sample.commands.CreateInventoryItem;
import sample.domain.InventoryItem;
import sample.events.InventoryItemCreated;

public class SampleApp {
    public static void main(String... args) {
        // (1) Journal と IndexEngine の設定
        val repository = StandardRepository.builder()
                .journal(new MemoryJournal())
                .indexEngine(new MemoryIndexEngine())
                .physicalTimeProvider(new SampleTimeProvider())
                .build();

        // (2) コマンドとイベントの設定(CommandSetProvider と EventSetProvider)
        repository.addCommandSetProvider(
            new PackageCommandSetProvider(new Package[] {CreateInventoryItem.class.getPackage()}));

        repository.addEventSetProvider(
            new PackageEventSetProvider(new Package[] {InventoryItemCreated.class.getPackage()}));

        // 開始
        repository.startAsync().awaitRunning();

        try {
            // CreateInventoryItem コマンド適用
            val d = repository.publish(new CreateInventoryItem("sample1")).get();

            dumpInventoryItem(d);

            // CheckInItemsToInventory コマンド適用
            repository.publish(new CheckInItemsToInventory(d.getId(), 5)).get();

            dumpInventoryItem(d);

            // CheckInItemsToInventory コマンド適用
            repository.publish(new CheckInItemsToInventory(d.getId(), 3)).get();

            dumpInventoryItem(d);

        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        } finally {
            // 停止
            stopRepository(repository);
        }
    }

    private static InventoryItem dumpInventoryItem(InventoryItem item) {
        System.out.println("----- InventoryItem -----");

        System.out.println("id: " + item.getId());
        System.out.println("name: " + item.name());
        System.out.println("count: " + item.count());

        return item;
    }

    private static void stopRepository(Repository repository) {
        System.out.println("stop...");

        try {
            repository.stopAsync().awaitTerminated(10, TimeUnit.SECONDS);
        } catch (TimeoutException ex) {
            ex.printStackTrace();
        }
    }
    // PhysicalTimeProvider の実装
    static class SampleTimeProvider extends AbstractService implements PhysicalTimeProvider {

        @Override
        public long getPhysicalTime() {
            return System.currentTimeMillis();
        }

        @Override
        protected void doStart() {
            System.out.println("timeprovider start...");
            notifyStarted();
        }

        @Override
        protected void doStop() {
            System.out.println("timeprovider stop...");
            notifyStopped();
        }
    }
}

publish の呼び出し箇所は CompletableFuture で処理を繋げれば以下のように実装できます。

src/main/java/SampleApp.java(CompletableFuture 活用版)
public class SampleApp {
    public static void main(String... args) {
        ・・・

        repository.publish(new CreateInventoryItem("sample1"))
            .thenApply(SampleApp::dumpInventoryItem)
            .thenCompose(d ->
                repository.publish(new CheckInItemsToInventory(d.getId(), 5))
                    .thenApply(v -> d)
            )
            .thenApply(SampleApp::dumpInventoryItem)
            .thenCompose(d ->
                repository.publish(new CheckInItemsToInventory(d.getId(), 3))
                    .thenApply(v -> d)
            )
            .thenApply(SampleApp::dumpInventoryItem)
            .whenComplete((d, e) -> stopRepository(repository));
    }
    ・・・
}
実行結果
> gradle -q run

[main] INFO org.reflections.Reflections - Reflections took 64 ms to scan 1 urls, producing 2 keys and 4 values
[main] INFO org.reflections.Reflections - Reflections took 17 ms to scan 1 urls, producing 2 keys and 6 values
timeprovider start...
----- InventoryItem -----
id: d7636355-850d-4a15-9e85-ce0f1de514e7
name: sample1
count: 0
----- InventoryItem -----
id: d7636355-850d-4a15-9e85-ce0f1de514e7
name: sample1
count: 5
----- InventoryItem -----
id: d7636355-850d-4a15-9e85-ce0f1de514e7
name: sample1
count: 8
stop...
timeprovider stop...

備考

実は、今回のサンプルを実行すると処理が終わってもプロセスは終了しません。

スレッドダンプを出力してみたところ、StandardEntity に原因がありそうです。

スレッドダンプ出力例
> jcmd 8904 Thread.print

・・・
"Thread-1" #11 prio=5 os_prio=0 ・・・ waiting on condition [・・・]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <・・・> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingDeque.putLast(LinkedBlockingDeque.java:396)
        at java.util.concurrent.LinkedBlockingDeque.put(LinkedBlockingDeque.java:649)
        at com.eventsourcing.StandardEntity.lambda$static$0(StandardEntity.java:27)
        at com.eventsourcing.StandardEntity$$Lambda$14/540159270.run(Unknown Source)
        at java.lang.Thread.run(Thread.java:745)
・・・

StandardEntity のソース(27行目付近)を見てみると、以下のようにスレッドを開始して放置していました。

com.eventsourcing.StandardEntity のソース
・・・
public abstract class StandardEntity<E extends Entity<E>> implements Entity<E> {

    private static LinkedBlockingDeque<UUID> uuids = new LinkedBlockingDeque<>(10_000);

    static {
        new Thread(() -> {
            while (true) {
                try {
                    uuids.put(UUID.randomUUID()); // 27行目
                } catch (InterruptedException e) {
                }
            }
        }).start();
    }
    ・・・

RxJava2 で並列処理

前回 と同じような並列処理を RxJava 2.0 で試してみました。

ソースは http://github.com/fits/try_samples/tree/master/blog/20161226/

はじめに

まずは、map の処理と subscribe の処理を順番に 3回繰り返す処理です。 前回の Reactor との違いは MonoSingle に変わっただけです。

(A) サンプルコード
Single.just("(A)")
    .repeat(3)
    .map(n -> {
        printThread(n + "_map");
        return n;
    })
    .subscribe(n -> printThread(n + "_subscribe"));
(A) サンプルコードの実行結果
(A)_map, thread: Thread[main,5,main]
(A)_subscribe, thread: Thread[main,5,main]
(A)_map, thread: Thread[main,5,main]
(A)_subscribe, thread: Thread[main,5,main]
(A)_map, thread: Thread[main,5,main]
(A)_subscribe, thread: Thread[main,5,main]

observeOn の場合

RxJava 2.0 では Reactor の publishOn の代わりに observeOn が使えます。 main スレッドとは別のスレッドで実行しますが、順番に実行する点は変わりません。

(B) observeOn を使った場合
Single.just("(B)")
    .repeat(3)
    .observeOn(Schedulers.computation())
    // 実質的には以下でも同じ
    //.observeOn(Schedulers.single())
    .map(n -> {
        printThread(n + "_map");
        return n;
    })
    .subscribe(n -> printThread(n + "_subscribe"));
(B) observeOn を使った場合の実行結果
(B)_map, thread: Thread[RxComputationThreadPool-1,5,main]
(B)_subscribe, thread: Thread[RxComputationThreadPool-1,5,main]
(B)_map, thread: Thread[RxComputationThreadPool-1,5,main]
(B)_subscribe, thread: Thread[RxComputationThreadPool-1,5,main]
(B)_map, thread: Thread[RxComputationThreadPool-1,5,main]
(B)_subscribe, thread: Thread[RxComputationThreadPool-1,5,main]

flatMap と subscribeOn を使った並列処理

RxJava 2.0 では Reactor の ParallelFlux 相当の処理は用意されていないようです。

そこで、flatMapsubscribeOn を使ってみました。

(C) flatMap + subscribeOn
Single.just("(C)")
    .repeat(3)
    .flatMap(s ->
        Flowable.just(s)
            .map(n -> {
                printThread(n + "_map");
                return n;
            })
            .subscribeOn(Schedulers.computation())
            //.subscribeOn(Schedulers.newThread())
    )
    .subscribe(n -> printThread(n + "_subscribe"));

ただし、以下の実行結果のように別スレッドで実行してはいるものの、map と subscribe を同じスレッドで実行するわけではありませんでした。

なお、Schedulers.computation() の場合、スレッド数は実行環境のコア数に依存するようです。(newThread は新しいスレッドを使う)

(C) flatMap + subscribeOn の実行結果1
(C)_map, thread: Thread[RxComputationThreadPool-2,5,main]
(C)_map, thread: Thread[RxComputationThreadPool-3,5,main]
(C)_map, thread: Thread[RxComputationThreadPool-1,5,main]
(C)_subscribe, thread: Thread[RxComputationThreadPool-2,5,main]
(C)_subscribe, thread: Thread[RxComputationThreadPool-1,5,main]
(C)_subscribe, thread: Thread[RxComputationThreadPool-1,5,main]
(C) flatMap + subscribeOn の実行結果2
(C)_map, thread: Thread[RxComputationThreadPool-1,5,main]
(C)_map, thread: Thread[RxComputationThreadPool-3,5,main]
(C)_map, thread: Thread[RxComputationThreadPool-2,5,main]
(C)_subscribe, thread: Thread[RxComputationThreadPool-1,5,main]
(C)_subscribe, thread: Thread[RxComputationThreadPool-2,5,main]
(C)_subscribe, thread: Thread[RxComputationThreadPool-2,5,main]

備考

最後に、今回使用したソースを記載しておきます。

build.gradle
apply plugin: 'application'

mainClassName = 'App'

repositories {
    jcenter()
}

dependencies {
    compile 'io.reactivex.rxjava2:rxjava:2.0.3'
}

run {
    standardInput = System.in

    if (project.hasProperty('args')) {
        args project.args
    }
}
src/main/java/App.java
import io.reactivex.Single;
import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;

import java.lang.invoke.MethodHandle;

import static java.lang.invoke.MethodHandles.publicLookup;
import static java.lang.invoke.MethodType.methodType;

public class App {
    public static void main(String... args) throws Throwable {
        MethodHandle mh = publicLookup().findStatic(
            App.class, 
            "sample" + args[0], 
            methodType(void.class)
        );

        mh.invoke();

        System.in.read();
    }

    // (A)
    public static void sampleA() {
        Single.just("(A)")
            .repeat(3)
            .map(n -> {
                printThread(n + "_map");
                return n;
            })
            .subscribe(n -> printThread(n + "_subscribe"));
    }

    // (B)
    public static void sampleB() {
        Single.just("(B)")
            .repeat(3)
            .observeOn(Schedulers.computation())
            //.observeOn(Schedulers.single())
            .map(n -> {
                printThread(n + "_map");
                return n;
            })
            .subscribe(n -> printThread(n + "_subscribe"));
    }

    // (C) 
    public static void sampleC() {
        Single.just("(C)")
            .repeat(3)
            .flatMap(s ->
                Flowable.just(s)
                    .map(n -> {
                        printThread(n + "_map");
                        return n;
                    })
                    .subscribeOn(Schedulers.computation())
                    //.subscribeOn(Schedulers.newThread())

            )
            .subscribe(n -> printThread(n + "_subscribe"));
    }

    private static void printThread(String msg) {
        System.out.println(msg + ", thread: " + Thread.currentThread());
    }
}
実行例
> gradle -q run -Pargs=C

(C)_map, thread: Thread[RxComputationThreadPool-2,5,main]
(C)_map, thread: Thread[RxComputationThreadPool-3,5,main]
(C)_map, thread: Thread[RxComputationThreadPool-1,5,main]
(C)_subscribe, thread: Thread[RxComputationThreadPool-2,5,main]
(C)_subscribe, thread: Thread[RxComputationThreadPool-3,5,main]
(C)_subscribe, thread: Thread[RxComputationThreadPool-3,5,main]

Reactor で並列処理

Reactor の並列処理を試してみました。

ソースは http://github.com/fits/try_samples/tree/master/blog/20161208/

はじめに

Reactor で以下のようなコードを実行すると、map の処理と subscribe の処理を順番に 3回繰り返します。(repeat メソッドの戻り値は Flux

(A) サンプルコード
Mono.just("(A)")
    .repeat(3)
    .map(n -> {
        printThread(n + "_map");
        return n;
    })
    .subscribe(n -> printThread(n + "_subscribe"));
(A) サンプルコードの実行結果
(A)_map, thread: Thread[main,5,main]
(A)_subscribe, thread: Thread[main,5,main]
(A)_map, thread: Thread[main,5,main]
(A)_subscribe, thread: Thread[main,5,main]
(A)_map, thread: Thread[main,5,main]
(A)_subscribe, thread: Thread[main,5,main]

ここで、今回は map からの処理を別スレッドで並列実行してみます。

publishOn の場合

publishOn を使うと main スレッドとは別のスレッドで実行しますが、順番に実行する点は変わりません。

(B) publishOn を使った場合
Mono.just("(B)")
    .repeat(3)
    .publishOn(Schedulers.parallel())
    // 実質的に以下と同じ
    //.publishOn(Schedulers.single())
    .map(n -> {
        printThread(n + "_map");
        return n;
    })
    .subscribe(n -> printThread(n + "_subscribe"));
(B) publishOn を使った場合の実行結果
(B)_map, thread: Thread[parallel-1,5,main]
(B)_subscribe, thread: Thread[parallel-1,5,main]
(B)_map, thread: Thread[parallel-1,5,main]
(B)_subscribe, thread: Thread[parallel-1,5,main]
(B)_map, thread: Thread[parallel-1,5,main]
(B)_subscribe, thread: Thread[parallel-1,5,main]

publishOn へ Schedulers.parallel() を渡しても複数のスレッドを使うわけでは無いようなので、実質的に Schedulers.single() と変わりは無さそうです。

ParallelFlux を使った並列処理

並列で実行するためには parallel メソッドを使って Flux から ParallelFlux を取得し、runOn メソッドで Schedulers.parallel() ※ を指定します。

 ※ この場合、Schedulers.parallel() の代わりに
    Schedulers.single() を使うと並列にならない点に注意
    (単一スレッドで実行する事になる)
(C) parallel + runOn
Mono.just("(C)")
    .repeat(3)
    .parallel()
    .runOn(Schedulers.parallel())
    .map(n -> {
        printThread(n + "_map");
        return n;
    })
    .subscribe(n -> printThread(n + "_subscribe"));
(C) parallel + runOn の実行結果
(C)_map, thread: Thread[parallel-1,5,main]
(C)_map, thread: Thread[parallel-3,5,main]
(C)_map, thread: Thread[parallel-2,5,main]
(C)_subscribe, thread: Thread[parallel-3,5,main]
(C)_subscribe, thread: Thread[parallel-1,5,main]
(C)_subscribe, thread: Thread[parallel-2,5,main]

3つのスレッドを使って並列に実行できているようです。

なお、runOnコメントアウトして実行すると (A) と同様の結果(main スレッドで順番に実行)になります。

備考

最後に、今回使用したソースを記載しておきます。

build.gradle
apply plugin: 'application'

mainClassName = 'App'

repositories {
    jcenter()
}

dependencies {
    compile 'io.projectreactor:reactor-core:3.0.3.RELEASE'
}

run {
    standardInput = System.in

    if (project.hasProperty('args')) {
        args project.args
    }
}
src/main/java/App.java
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.lang.invoke.MethodHandle;

import static java.lang.invoke.MethodHandles.publicLookup;
import static java.lang.invoke.MethodType.methodType;

public class App {
    public static void main(String... args) throws Throwable {
        MethodHandle mh = publicLookup().findStatic(
            App.class, 
            "sample" + args[0], 
            methodType(void.class)
        );

        mh.invoke();

        System.in.read();
    }

    // (A)
    public static void sampleA() {
        Mono.just("(A)")
            .repeat(3)
            .map(n -> {
                printThread(n + "_map");
                return n;
            })
            .subscribe(n -> printThread(n + "_subscribe"));
    }

    // (B) publishOn を使った場合
    public static void sampleB() {
        Mono.just("(B)")
            .repeat(3)
            .publishOn(Schedulers.parallel())
            //.publishOn(Schedulers.single())
            .map(n -> {
                printThread(n + "_map");
                return n;
            })
            .subscribe(n -> printThread(n + "_subscribe"));
    }

    // (C) parallel + runOn
    public static void sampleC() {
        Mono.just("(C)")
            .repeat(3)
            .parallel()
            .runOn(Schedulers.parallel())
            .map(n -> {
                printThread(n + "_map");
                return n;
            })
            .subscribe(n -> printThread(n + "_subscribe"));
    }

    private static void printThread(String msg) {
        System.out.println(msg + ", thread: " + Thread.currentThread());
    }
}
実行例
> gradle -q run -Pargs=B

(B)_map, thread: Thread[parallel-1,5,main]
(B)_subscribe, thread: Thread[parallel-1,5,main]
(B)_map, thread: Thread[parallel-1,5,main]
(B)_subscribe, thread: Thread[parallel-1,5,main]
(B)_map, thread: Thread[parallel-1,5,main]
(B)_subscribe, thread: Thread[parallel-1,5,main]

Spring Web Reactive を試す

Spring 5 で導入される Spring Web Reactive を試してみました。

本来なら Spring Boot で実行する事になると思いますが、今回は Spring Boot を使わずに Undertow で直接実行してみます。

ソースは http://github.com/fits/try_samples/tree/master/blog/20161115/

サンプル作成

ビルド定義

現時点では Spring Web Reactive の正式版はリリースされていないようなのでスナップショット版を使います。

Undertow を実行するために undertow-coreJSON で結果を返す処理を試すために jackson-databind を依存関係へ設定しています。

build.gradle
apply plugin: 'application'

mainClassName = 'sample.App'

repositories {
    jcenter()

    maven {
        url 'http://repo.spring.io/snapshot/'
    }
}

dependencies {
    compile 'org.projectlombok:lombok:1.16.10'

    // Spring Web Reactive
    compile 'org.springframework:spring-web-reactive:5.0.0.BUILD-SNAPSHOT'
    // Undertow
    compile 'io.undertow:undertow-core:2.0.0.Alpha1'

    // JSON 用
    runtime 'com.fasterxml.jackson.core:jackson-databind:2.8.4'
}

設定クラス

@EnableWebReactive を付与すれば Spring Web Reactive を有効にできるようです。

src/main/java/sample/config/AppConfig.java
package sample.config;

import org.springframework.context.annotation.ComponentScan;
import org.springframework.web.reactive.config.EnableWebReactive;

@EnableWebReactive
@ComponentScan("sample.controller")
public class AppConfig {
}

コントローラークラス

Spring Web Reactive は Spring Web (MVC) のプログラミングスタイルを踏襲しているようです。

Spring Web (MVC) と同じアノテーションでコントローラークラスを定義し、メソッドの戻り値に Reactor の Mono / Flux を使えばよさそうです。

クラス 概要
Mono 単一の結果を返す場合に使用
Flux 複数の結果を返す場合に使用
src/main/java/sample/controller/SampleController.java
package sample.controller;

import lombok.Value;

import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@RestController
public class SampleController {

    @RequestMapping("/")
    public Mono<String> sample() {
        return Mono.just("sample");
    }

    @RequestMapping(value = "/data/{name}", produces = "application/json")
    public Flux<Data> dataList(@PathVariable("name") String name) {
        return Flux.fromArray(new Data[] {
            new Data(name + "-1", 1),
            new Data(name + "-2", 2),
            new Data(name + "-3", 3)
        });
    }

    @Value
    class Data {
        private String name;
        private int value;
    }
}

実行クラス

まずは、Spring Web Reactive を有効化した ApplicationContext を使って HttpHandler を作成します。 (DispatcherHandlerWebHttpHandlerBuilder を使用)

あとは、対象の Web サーバー ※ に合わせて実行します。

 ※ 今のところ Servlet 3.1 対応コンテナ、Netty、Undertow を
    サポートしているようです

Undertow の場合、アダプタークラス UndertowHttpHandlerAdapter で HttpHandler をラッピングして実行するだけです。

src/main/java/sample/App.java
package sample;

import io.undertow.Undertow;

import lombok.val;

import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.http.server.reactive.UndertowHttpHandlerAdapter;
import org.springframework.web.reactive.DispatcherHandler;
import org.springframework.web.server.adapter.WebHttpHandlerBuilder;

import sample.config.AppConfig;

public class App {
    public static void main(String... args) {

        val ctx = new AnnotationConfigApplicationContext();
        // @EnableWebReactive を付与したクラスを登録
        ctx.register(AppConfig.class);
        ctx.refresh();

        val handler = new DispatcherHandler();
        handler.setApplicationContext(ctx);

        // HttpHandler の作成
        val httpHandler = WebHttpHandlerBuilder.webHandler(handler).build();

        val server = Undertow.builder()
                .addHttpListener(8080, "localhost")
                // Undertow 用アダプターでラッピングして設定
                .setHandler(new UndertowHttpHandlerAdapter(httpHandler))
                .build();

        server.start();
    }
}

実行

Gradle で実行
> gradle -q run

・・・
情報: Mapped "{[/data/{name}],produces=[application/json]}" onto public reactor.core.publisher.Flux<sample.controller.SampleController$Data> sample.controller.SampleController.dataList(java.lang.String)
11 14, 2016 9:32:23 午後 org.springframework.web.reactive.result.method.annotation.RequestMappingHandlerMapping register
情報: Mapped "{[/]}" onto public reactor.core.publisher.Mono<java.lang.String> sample.controller.SampleController.sample()
11 14, 2016 9:32:23 午後 org.xnio.Xnio <clinit>
INFO: XNIO version 3.3.6.Final
11 14, 2016 9:32:23 午後 org.xnio.nio.NioXnio <clinit>
INFO: XNIO NIO Implementation Version 3.3.6.Final
動作確認1
$ curl -s http://localhost:8080/
sample
動作確認2
$ curl -s http://localhost:8080/data/a
[{"name":"a-1","value":1},{"name":"a-2","value":2},{"name":"a-3","value":3}]

Java で行列の演算 - nd4j, commons-math, la4j, ujmp, jblas, colt

Java で以下のような行列の演算を複数のライブラリで試しました。

  • (a) 和
  • (b) 積
  • (c) 転置

とりあえず今回は、更新日が比較的新しめのライブラリを試してみました。

また、あくまでも個人的な印象ですが、手軽に使いたいなら la4j か Commons Math、性能重視なら ND4J か jblas、可視化や DB との連携を考慮するなら UJMP を使えば良さそうです。

ソースは http://github.com/fits/try_samples/tree/master/blog/20161031/

ND4J

Deeplearning4J で使用しているライブラリ。

build.gradle
apply plugin: 'application'

mainClassName = 'SampleApp'

repositories {
    jcenter()
}

dependencies {
    compile 'org.nd4j:nd4j-native-platform:0.6.0'
    runtime 'org.slf4j:slf4j-nop:1.7.21'
}
src/main/java/SampleApp.java
import org.nd4j.linalg.api.ndarray.INDArray;
import org.nd4j.linalg.factory.Nd4j;

public class SampleApp {
    public static void main(String... args) {
        INDArray x = Nd4j.create(new double[][] {
            {1, 2},
            {3, 4}
        });

        INDArray y = Nd4j.create(new double[][] {
            {5, 6},
            {7, 8}
        });
        // (a)
        System.out.println( x.add(y) );

        System.out.println("-----");
        // (b)
        System.out.println( x.mmul(y) );

        System.out.println("-----");
        // (c)
        System.out.println( x.transpose() );
    }
}
実行結果
> gradle -q run

[[ 6.00,  8.00],
 [10.00, 12.00]]
-----
[[19.00, 22.00],
 [43.00, 50.00]]
-----
[[1.00, 3.00],
 [2.00, 4.00]]

Commons Math

Apache Commons のライブラリ。

build.gradle
・・・
dependencies {
    compile 'org.apache.commons:commons-math3:3.6.1'
}
src/main/java/SampleApp.java
import org.apache.commons.math3.linear.MatrixUtils;
import org.apache.commons.math3.linear.RealMatrix;

public class SampleApp {
    public static void main(String... args) {
        RealMatrix x = MatrixUtils.createRealMatrix(new double[][] {
            {1, 2},
            {3, 4}
        });

        RealMatrix y = MatrixUtils.createRealMatrix(new double[][] {
            {5, 6},
            {7, 8}
        });
        // (a)
        System.out.println( x.add(y) );

        System.out.println("-----");
        // (b)
        System.out.println( x.multiply(y) );

        System.out.println("-----");
        // (c)
        System.out.println( x.transpose() );
    }
}
実行結果
> gradle -q run

Array2DRowRealMatrix{{6.0,8.0},{10.0,12.0}}
-----
Array2DRowRealMatrix{{19.0,22.0},{43.0,50.0}}
-----
Array2DRowRealMatrix{{1.0,3.0},{2.0,4.0}}

la4j

Java のみで実装された軽量なライブラリ。

build.gradle
・・・
dependencies {
    compile 'org.la4j:la4j:0.6.0'
}
src/main/java/SampleApp.java
import org.la4j.Matrix;

public class SampleApp {
    public static void main(String... args) {
        Matrix x = Matrix.from2DArray(new double[][] {
            {1, 2},
            {3, 4}
        });

        Matrix y = Matrix.from2DArray(new double[][] {
            {5, 6},
            {7, 8}
        });
        // (a)
        System.out.println( x.add(y) );

        System.out.println("-----");
        // (b)
        System.out.println( x.multiply(y) );

        System.out.println("-----");
        // (c)
        System.out.println( x.transpose() );
    }
}
実行結果
> gradle -q run

 6.000  8.000
10.000 12.000

-----
19.000 22.000
43.000 50.000

-----
1.000 3.000
2.000 4.000

UJMP

データ可視化や JDBC との連携など、機能豊富そうなライブラリ。 Colt や jblas 等ともプラグインモジュールで連携できる模様。

build.gradle
・・・
dependencies {
    compile 'org.ujmp:ujmp-core:0.3.0'
}
src/main/java/SampleApp.java
import org.ujmp.core.DenseMatrix;
import org.ujmp.core.Matrix;

public class SampleApp {
    public static void main(String... args) {
        Matrix x = DenseMatrix.Factory.linkToArray(
            new double[] {1, 2},
            new double[] {3, 4}
        );

        Matrix y = DenseMatrix.Factory.linkToArray(
            new double[] {5, 6},
            new double[] {7, 8}
        );
        // (a)
        System.out.println( x.plus(y) );

        System.out.println("-----");
        // (b)
        System.out.println( x.mtimes(y) );

        System.out.println("-----");
        // (c)
        System.out.println( x.transpose() );
    }
}
実行結果
> gradle -q run

    6.0000     8.0000
   10.0000    12.0000

-----
   19.0000    22.0000
   43.0000    50.0000

-----
    1.0000     3.0000
    2.0000     4.0000

jblas

BLAS/LAPACK をベースとしたライブラリ。 ネイティブライブラリを使用する。

build.gradle
・・・
dependencies {
    compile 'org.jblas:jblas:1.2.4'
}
src/main/java/SampleApp.java
import org.jblas.DoubleMatrix;

public class SampleApp {
    public static void main(String... args) {
        DoubleMatrix x = new DoubleMatrix(new double[][] {
            {1, 2},
            {3, 4}
        });

        DoubleMatrix y = new DoubleMatrix(new double[][] {
            {5, 6},
            {7, 8}
        });
        // (a)
        System.out.println( x.add(y) );

        System.out.println("-----");
        // (b)
        System.out.println( x.mmul(y) );

        System.out.println("-----");
        // (c)
        System.out.println( x.transpose() );
    }
}
実行結果
> gradle -q run

[6.000000, 8.000000; 10.000000, 12.000000]
-----
[19.000000, 22.000000; 43.000000, 50.000000]
-----
[1.000000, 3.000000; 2.000000, 4.000000]
-- org.jblas INFO Starting temp DLL cleanup task.
-- org.jblas INFO Deleted 4 unused temp DLL libraries from ・・・

Colt Blazegraph 版

Colt は長らく更新されていないようなので、今回は Blazegraph による fork 版? を使いました。

build.gradle
・・・
dependencies {
    compile 'com.blazegraph:colt:2.1.4'
}
src/main/java/SampleApp.java
import cern.colt.matrix.DoubleFactory2D;
import cern.colt.matrix.DoubleMatrix2D;
import cern.colt.matrix.linalg.Algebra;
import cern.jet.math.Functions;

public class SampleApp {
    public static void main(String... args) {
        DoubleMatrix2D x = DoubleFactory2D.dense.make(new double[][] {
            {1, 2},
            {3, 4}
        });

        DoubleMatrix2D y = DoubleFactory2D.dense.make(new double[][] {
            {5, 6},
            {7, 8}
        });
        // (a)
        System.out.println( x.copy().assign(y, Functions.plus) );

        System.out.println("-----");

        Algebra algebra = new Algebra();
        // (b)
        System.out.println( algebra.mult(x, y) );

        System.out.println("-----");
        // (c)
        System.out.println( x.viewDice() );
    }
}

assign を使うと自身の値を更新するため copy を使っています。

実行結果
> gradle -q run

2 x 2 matrix
 6  8
10 12
-----
2 x 2 matrix
19 22
43 50
-----
2 x 2 matrix
1 3
2 4

Lucene API で Solr と Elasticsearch のインデックスを確認

Groovy で LuceneAPI を使用して Solr や Elasticsearch のインデックスの内容を確認してみました。(Lucene 6.2.1 の API を使用)

ソースは http://github.com/fits/try_samples/tree/master/blog/20161024/

(a) ドキュメントの内容を出力

まずは、ドキュメントに属するフィールドの内容を出力する処理です。

DirectoryReader から Document を取得し、フィールド IndexableField の内容を出力しています。

dump_docs.groovy
@Grab('org.apache.lucene:lucene-core:6.2.1')
import org.apache.lucene.index.DirectoryReader
import org.apache.lucene.store.FSDirectory
import java.nio.file.Paths

def dir = FSDirectory.open(Paths.get(args[0]))

DirectoryReader.open(dir).withCloseable { reader ->
    println "numDocs = ${reader.numDocs()}"

    (0..<reader.numDocs()).each {
        // ドキュメントの取得
        def doc = reader.document(it)

        println "---------- doc: ${it} ----------"

        // ドキュメント内のフィールドを出力
        doc.fields.each { f -> 
            def value = f.binaryValue()? f.binaryValue().utf8ToString(): f.stringValue()

            println "<field> name=${f.name}, value=${value}, class=${f.class}"
        }
    }
}

(b) Term の内容を出力

インデックス内のフィールド情報と Term を出力する処理です。 Term は基本的な検索の単位となっており、Term の内容を見れば単語の分割状況を確認できます。

これらの情報を取得するには LeafReader を使います。

Term の内容 BytesRefTermsEnum から取得できます。

LeafReader から terms メソッドで該当フィールドの Terms を取得し、iterator メソッドで TermsEnum を取得します。

dump_terms.groovy
@Grab('org.apache.lucene:lucene-core:6.2.1')
import org.apache.lucene.index.DirectoryReader
import org.apache.lucene.store.FSDirectory
import java.nio.file.Paths

def dir = FSDirectory.open(Paths.get(args[0]))

DirectoryReader.open(dir).withCloseable { reader ->

    reader.leaves().each { ctx ->
        // LeafReader の取得
        def leafReader = ctx.reader()

        println "---------- leaf: ${leafReader} ----------"

        // フィールド情報の出力
        leafReader.getFieldInfos().each { fi ->
            println "<fieldInfo> name: ${fi.name}, valueType: ${fi.docValuesType}, indexOptions: ${fi.indexOptions}"
        }

        leafReader.fields().each { name ->
            // 指定のフィールド名に対する TermsEnum を取得
            def termsEnum = leafReader.terms(name).iterator()

            println ''
            println "===== <term> name=${name} ====="

            try {
                while(termsEnum.next() != null) {
                    // Term の内容を出力
                    println "term=${termsEnum.term().utf8ToString()}, freq=${termsEnum.docFreq()}"
                }
            } catch(e) {
            }
        }
    }
}

動作確認

Lucene のバージョンが以下のようになっていたので、今回は Solr 6.2.1 と Elasticsearch 5.0.0 RC1 のインデックス内容を確認してみます。

プロダクト 使用している Lucene のバージョン
Solr 6.2.1 Lucene 6.2.1
Elasticsearch 2.4.1 Lucene 5.5.2
Elasticsearch 5.0.0 RC1 Lucene 6.2.0

(1) Solr 6.2.1

Solr 6.2.1 のインデックスから確認してみます。

準備

インデックスを作成してドキュメントを登録しておきます。

1. Solr 起動とインデックスの作成

> solr start

・・・

> solr create -c sample

{
  "responseHeader":{
    "status":0,
    "QTime":9197},
  "core":"sample"}

2. スキーマの登録

schema.json
{
    "add-field": {
        "name": "title",
        "type": "string"
    },
    "add-field": {
        "name": "num",
        "type": "int"
    },
    "add-field": {
        "name": "rdate",
        "type": "date"
    }
}
スキーマ登録
$ curl -s http://localhost:8983/solr/sample/schema --data-binary @schema.json

{
  "responseHeader":{
    "status":0,
    "QTime":554}}

3. ドキュメントの登録

data1.json
{
    "title": "item1",
    "num": 11,
    "rdate": "2016-10-20T13:45:00Z"
}
ドキュメント登録
$ curl -s http://localhost:8983/solr/sample/update/json/docs --data-binary @data1.json

{"responseHeader":{"status":0,"QTime":199}}

ちなみに、コミットしなくてもインデックスファイルには反映されるようです。

インデックスの内容確認

それでは、インデックスの内容を確認します。

該当するインデックスのディレクトリ(例. C:\solr-6.2.1\server\solr\sample\data\index)を引数に指定して実行します。

(a) ドキュメントの内容
> groovy dump_docs.groovy C:\solr-6.2.1\server\solr\sample\data\index

numDocs = 1
---------- doc: 0 ----------
<field> name=title, value=item1, class=class org.apache.lucene.document.StoredField
<field> name=num, value=11, class=class org.apache.lucene.document.StoredField
<field> name=rdate, value=1476971100000, class=class org.apache.lucene.document.StoredField
<field> name=id, value=2b1080dd-0cd3-43c6-a3ff-ab618ad00113, class=class org.apache.lucene.document.StoredField
(b) Term の内容
> groovy dump_terms.groovy C:\solr-6.2.1\server\solr\sample\data\index

---------- leaf: _0(6.2.1):C1 ----------
<fieldInfo> name: title, valueType: SORTED, indexOptions: DOCS
<fieldInfo> name: _text_, valueType: NONE, indexOptions: DOCS_AND_FREQS_AND_POSITIONS
<fieldInfo> name: num, valueType: NUMERIC, indexOptions: DOCS
<fieldInfo> name: rdate, valueType: NUMERIC, indexOptions: DOCS
<fieldInfo> name: id, valueType: SORTED, indexOptions: DOCS
<fieldInfo> name: _version_, valueType: NUMERIC, indexOptions: DOCS

===== <term> name=_text_ =====
term=00, freq=1
term=0cd3, freq=1
term=11, freq=1
term=13, freq=1
term=1548710288432300032, freq=1
term=20, freq=1
term=2016, freq=1
term=2b1080dd, freq=1
term=43c6, freq=1
term=45, freq=1
term=a3ff, freq=1
term=ab618ad00113, freq=1
term=item1, freq=1
term=oct, freq=1
term=thu, freq=1
term=utc, freq=1

===== <term> name=_version_ =====
term= ?yTP   , freq=1

===== <term> name=id =====
term=2b1080dd-0cd3-43c6-a3ff-ab618ad00113, freq=1

===== <term> name=num =====
term=   , freq=1

===== <term> name=rdate =====
term=    *~Yn`, freq=1

===== <term> name=title =====
term=item1, freq=1

version・num・rdate の値が文字化けしているように見えますが、これは org.apache.lucene.util.LegacyNumericUtils.intToPrefixCoded() メソッド等で処理されてバイナリデータとなっているためです。

実際の値を復元するには LegacyNumericUtils.prefixCodedToInt() 等を適用する必要があるようです。

なお、LegacyNumericUtils クラスは Lucene 6.2.1 API で deprecated となっていますが、Solr は未だ使っているようです。

(2) Elasticsearch 5.0.0 RC1

次は Elasticsearch です。

準備

インデックスを作成してドキュメントを登録しておきます。

1. Elasticsearch 起動

> elasticsearch

・・・

2. インデックスの作成とスキーマ登録

schema.json
{
    "mappings": {
        "data": {
            "properties": {
                "title": {
                    "type": "string",
                    "index": "not_analyzed"
                },
                "num": { "type": "integer" },
                "rdate": { "type": "date" }
            }
        }
    }
}
インデックス作成とスキーマ登録
$ curl -s -XPUT http://localhost:9200/sample --data-binary @schema.json

{"acknowledged":true,"shards_acknowledged":true}

3. ドキュメントの登録

data1.json
{
    "title": "item1",
    "num": 11,
    "rdate": "2016-10-20T13:45:00Z"
}
ドキュメント登録
$ curl -s http://localhost:9200/sample/data --data-binary @data1.json

{"_index":"sample","_type":"data","_id":"AVfwTjEQFnFWQdd5V9p5","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"created":true}

なお、すぐにインデックスファイルへ反映されない場合は flush を実施します。

flush 例
$ curl -s http://localhost:9200/sample/_flush

インデックスの内容確認

インデックスの内容を確認します。

Elasticsearch の場合はデフォルトで複数の shard に分かれているため、ドキュメントを登録した shard を確認しておきます。

shard の確認
$ curl -s http://localhost:9200/_cat/shards/sample?v

index  shard prirep state      docs store ip        node
sample 1     p      STARTED       0  130b 127.0.0.1 iUp_FE_
・・・
sample 4     p      STARTED       1 3.8kb 127.0.0.1 iUp_FE_
sample 4     r      UNASSIGNED
sample 0     p      STARTED       0  130b 127.0.0.1 iUp_FE_
sample 0     r      UNASSIGNED

shard 4 にドキュメントが登録されています。

Elasticsearch のインデックスディレクトリは data/nodes/<ノード番号>/indices/<インデックスのuuid>/<shard番号>/index となっているようで、今回は data\nodes\0\indices\QBXMjcCFSWy26Gow1Y9ItQ\4\index でした。(インデックスの uuid は QBXMjcCFSWy26Gow1Y9ItQ)

(a) ドキュメントの内容
> groovy dump_docs.groovy C:\elasticsearch-5.0.0-rc1\data\nodes\0\indices\QBXMjcCFSWy26Gow1Y9ItQ\4\index

numDocs = 1
---------- doc: 0 ----------
<field> name=_source, value={
        "title": "item1",
        "num": 11,
        "rdate": "2016-10-20T13:45:00Z"
}
, class=class org.apache.lucene.document.StoredField
<field> name=_uid, value=data#AVfwTjEQFnFWQdd5V9p5, class=class org.apache.lucene.document.StoredField
(b) Term の内容
> groovy dump_terms.groovy C:\elasticsearch-5.0.0-rc1\data\nodes\0\indices\QBXMjcCFSWy26Gow1Y9ItQ\4\index

---------- leaf: _0(6.2.0):c1 ----------
<fieldInfo> name: _source, valueType: NONE, indexOptions: NONE
<fieldInfo> name: _type, valueType: SORTED_SET, indexOptions: DOCS
<fieldInfo> name: _uid, valueType: NONE, indexOptions: DOCS
<fieldInfo> name: _version, valueType: NUMERIC, indexOptions: NONE
<fieldInfo> name: title, valueType: SORTED_SET, indexOptions: DOCS
<fieldInfo> name: num, valueType: SORTED_NUMERIC, indexOptions: NONE
<fieldInfo> name: rdate, valueType: SORTED_NUMERIC, indexOptions: NONE
<fieldInfo> name: _all, valueType: NONE, indexOptions: DOCS_AND_FREQS_AND_POSITIONS
<fieldInfo> name: _field_names, valueType: NONE, indexOptions: DOCS

===== <term> name=_all =====
term=00z, freq=1
term=10, freq=1
term=11, freq=1
term=2016, freq=1
term=20t13, freq=1
term=45, freq=1
term=item1, freq=1

===== <term> name=_field_names =====
term=_all, freq=1
term=_source, freq=1
term=_type, freq=1
term=_uid, freq=1
term=_version, freq=1
term=num, freq=1
term=rdate, freq=1
term=title, freq=1

===== <term> name=_type =====
term=data, freq=1

===== <term> name=_uid =====
term=data#AVfwTjEQFnFWQdd5V9p5, freq=1

===== <term> name=title =====
term=item1, freq=1

Solr とは、かなり違った結果になっています。

Deeplearning4J で MNIST を分類

Deeplearning4J で iris を分類」 に続いて、畳み込みニューラルネットを使った MNIST の分類を試します。

Deeplearning4J のバージョンが上がって、@Grab を使った Groovy 上での実行が上手くいかなかったので、今回は Kotlin で実装し Gradle で実行します。

  • Gradle 3.1
  • Kotlin 1.0.4

ソースは http://github.com/fits/try_samples/tree/master/blog/20161010/

はじめに

今回は、Gradle のマルチプロジェクトを使って以下の処理をサブプロジェクトとしました。

Gradle のビルド定義は以下の通り。

build.gradle
def slf4jVer = '1.7.21'

buildscript {
    repositories {
        jcenter()
    }

    dependencies {
        classpath 'org.jetbrains.kotlin:kotlin-gradle-plugin:1.0.4'
    }
}
// サブプロジェクトの共通設定
subprojects {
    apply plugin: 'kotlin'
    apply plugin: 'application'

    repositories {
        jcenter()
    }

    dependencies {
        compile 'org.jetbrains.kotlin:kotlin-stdlib:1.0.4'

        compile('org.deeplearning4j:deeplearning4j-core:0.6.0') {
            // エラーの回避策 (Could not find javacpp-presets-${os.name}-${os.arch}.jar)
            exclude group: 'org.bytedeco', module: 'javacpp-presets'
        }

        runtime 'org.nd4j:nd4j-native-platform:0.6.0'
    }

    run {
        // 実行時引数
        if (project.hasProperty('args')) {
            args project.args.split(' ')
        }
    }
}
// (1) 畳み込みニューラルネットのモデル作成(JSON で出力)
project(':conv_model') {
    mainClassName = 'ConvModelKt'

    dependencies {
        runtime "org.slf4j:slf4j-nop:${slf4jVer}"
    }
}
// (2) 学習処理
project(':learn_mnist') {
    mainClassName = 'LearnMnistKt'

    dependencies {
        runtime "org.slf4j:slf4j-simple:${slf4jVer}"
    }
}
// (3) 評価処理
project(':eval_mnist') {
    mainClassName = 'EvalMnistKt'

    dependencies {
        runtime "org.slf4j:slf4j-nop:${slf4jVer}"
    }
}
settings.gradle
include 'conv_model', 'learn_mnist', 'eval_mnist'

ファイル構成は以下の通りです。

ファイル構成
  • build.gradle
  • settings.gradle
  • conv_model/src/main/kotlin/convModel.kt
  • learn_mnist/src/main/kotlin/learnMnist.kt
  • eval_mnist/src/main/kotlin/evalMnist.kt

(1) 畳み込みニューラルネットのモデル作成(JSON で出力)

畳み込みニューラルネットの構成情報を JSON 化して標準出力します。

MnistDataSetIterator の MNIST データセットに対して畳み込みニューラルネットを行うには InputType.convolutionalFlat(28, 28, 1)setInputType します。

conv_model/src/main/kotlin/convModel.kt
import org.deeplearning4j.nn.conf.NeuralNetConfiguration
import org.deeplearning4j.nn.conf.inputs.InputType
import org.deeplearning4j.nn.conf.layers.ConvolutionLayer
import org.deeplearning4j.nn.conf.layers.OutputLayer
import org.deeplearning4j.nn.conf.layers.SubsamplingLayer
import org.nd4j.linalg.lossfunctions.LossFunctions

fun main(args: Array<String>) {
    val builder = NeuralNetConfiguration.Builder()
        .iterations(3) // 3回繰り返し
        .list(
            // 畳み込み層
            ConvolutionLayer.Builder(5, 5)
                .nIn(1)
                .nOut(8)
                .padding(2, 2)
                .activation("relu")
                .build()
            ,
            // プーリング層(最大プーリング)
            SubsamplingLayer.Builder(
                SubsamplingLayer.PoolingType.MAX, intArrayOf(2, 2))
                .stride(2, 2)
                .build()
            ,
            // 畳み込み層
            ConvolutionLayer.Builder(5, 5)
                .nOut(16)
                .padding(1, 1)
                .activation("relu")
                .build()
            ,
            // プーリング層(最大プーリング)
            SubsamplingLayer.Builder(
                SubsamplingLayer.PoolingType.MAX, intArrayOf(3, 3))
                .stride(3, 3)
                .build()
            ,
            OutputLayer.Builder(LossFunctions.LossFunction.MCXENT)
                .nOut(10)
                .activation("softmax")
                .build()
        )
        // MNIST データセットに対する畳み込みニューラルネット用の入力タイプ設定
        .setInputType(InputType.convolutionalFlat(28, 28, 1))

    // JSON 化して出力
    println(builder.build().toJson())
}

(2) 学習処理

(1) で出力した JSON ファイルを使って学習を実施し、学習後のパラメータ(重み)を JSON ファイルへ出力します。

MNIST データセットには MnistDataSetIterator を使いました ※。

 ※ train 引数(今回使用したコンストラクタの第2引数)が true の場合に学習用、
    false の場合に評価用のデータセットとなります
learn_mnist/src/main/kotlin/learnMnist.kt
import org.deeplearning4j.datasets.iterator.impl.MnistDataSetIterator
import org.deeplearning4j.nn.conf.MultiLayerConfiguration
import org.deeplearning4j.nn.multilayer.MultiLayerNetwork
import org.deeplearning4j.optimize.listeners.ScoreIterationListener

import org.nd4j.linalg.api.ndarray.NdArrayJSONWriter

import java.io.File

fun main(args: Array<String>) {
    // ニューラルネットのモデルファイル(JSON)
    val confJson = File(args[0]).readText()
    // パラメータ(重み)の出力ファイル名
    val destFile = args[1]

    val conf = MultiLayerConfiguration.fromJson(confJson)
    val network = MultiLayerNetwork(conf)

    network.init()
    // スコア(誤差)の出力
    network.setListeners(ScoreIterationListener())

    // MNIST 学習用データ(バッチサイズ 100)
    val trainData = MnistDataSetIterator(100, true, 0)
    // 学習
    network.fit(trainData)
    // 学習後のパラメータ(重み)を JSON ファイルへ出力
    NdArrayJSONWriter.write(network.params(), destFile)
}

(3) 評価処理

(1) と (2) の結果を使って評価します。

eval_mnist/src/main/kotlin/evalMnist.kt
import org.deeplearning4j.datasets.iterator.impl.MnistDataSetIterator
import org.deeplearning4j.nn.conf.MultiLayerConfiguration
import org.deeplearning4j.nn.multilayer.MultiLayerNetwork

import org.nd4j.linalg.api.ndarray.NdArrayJSONReader

import java.io.File

fun main(args: Array<String>) {
    // ニューラルネットのモデルファイル(JSON)
    val confJson = File(args[0]).readText()
    // パラメータ(重み)のロード
    val params = NdArrayJSONReader().read(File(args[1]))

    val conf = MultiLayerConfiguration.fromJson(confJson)
    val network = MultiLayerNetwork(conf, params)
    // MNIST 評価用データ
    val testData = MnistDataSetIterator(1, false, 0)
    // 評価
    val res = network.evaluate(testData)

    println(res.stats())
}

実行

まず、畳み込みニューラルネットの構成を JSON ファイルへ出力します。

(1) 畳み込みニューラルネットのモデル作成
> gradle -q :conv_model:run > conv_model.json

次に、学習を実施します。

org.reflections.Reflections - could not create Vfs.Dir from url という警告ログが出力されましたが、特に支障は無さそうです。

(2) 学習
> gradle -q :learn_mnist:run -Pargs="../conv_model.json ../conv_params.json"

・・・
[main] INFO org.deeplearning4j.optimize.listeners.ScoreIterationListener - Score at iteration 1790 is 0.07888245582580566
[main] INFO org.deeplearning4j.nn.multilayer.MultiLayerNetwork - Finetune phase
[main] INFO org.deeplearning4j.nn.multilayer.MultiLayerNetwork - Finetune phase
[main] INFO org.deeplearning4j.nn.multilayer.MultiLayerNetwork - Finetune phase

最後に、評価を実施します。

(3) 評価
> gradle -q :eval_mnist:run -Pargs="../conv_model.json ../conv_params.json"

Examples labeled as 0 classified by model as 0: 944 times
Examples labeled as 0 classified by model as 2: 7 times
Examples labeled as 0 classified by model as 5: 4 times
・・・
Examples labeled as 9 classified by model as 7: 24 times
Examples labeled as 9 classified by model as 8: 22 times
Examples labeled as 9 classified by model as 9: 937 times


==========================Scores========================================
 Accuracy:  0.9432
 Precision: 0.9463
 Recall:    0.9427
 F1 Score:  0.9445
========================================================================