es4j でイベントソーシング
「Axon Framework でイベントソーシング」 と同様の処理を es4j (Eventsourcing for Java) で実装してみました。
今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170107/
はじめに
今回はイベントをインメモリで保持する eventsourcing-inmem
を使うため、以下のような Gradle 用ビルド定義を用意しました。 (lombok は便利なので使います)
build.gradle
apply plugin: 'application' mainClassName = 'SampleApp' repositories { jcenter() } dependencies { compileOnly "org.projectlombok:lombok:1.16.12" compile 'com.eventsourcing:eventsourcing-inmem:0.4.5' }
es4j によるイベントソーシング
コマンドの作成
コマンドは StandardCommand<S, R>
を継承して作成し、S に状態の型を R に結果の型を指定します。
コマンドを適用することで生じるイベントは events
メソッドの戻り値で返し、コマンド適用結果は result
メソッドの戻り値で返します。
状態は events
メソッドで用意し、result
メソッドの引数として渡ってきます。
まずは、在庫作成のコマンドを実装してみます。
状態の型に InventoryItemCreated
を、結果の型に InventoryItem
を指定しています。
状態を伴う EventStream
を返すために EventStream.ofWithState
を使用します。
コンストラクタの引数名からプロパティを認識するようなので、-parameters
オプションを使ってビルドするか、引数へ @PropertyName
を付与します。(今回は @PropertyName を使用)
なお、状態の型は InventoryItemCreated としなければならないわけでもなく、UUID
を代わりに使っても動作に支障はなさそうでした。
在庫作成コマンド src/main/java/sample/commands/CreateInventoryItem.java
package sample.commands; import com.eventsourcing.EventStream; import com.eventsourcing.Repository; import com.eventsourcing.StandardCommand; import com.eventsourcing.layout.PropertyName; import lombok.EqualsAndHashCode; import lombok.Value; import lombok.val; import sample.domain.InventoryItem; import sample.events.InventoryItemCreated; import sample.events.InventoryItemRenamed; // 在庫作成コマンド @Value @EqualsAndHashCode(callSuper = false) public class CreateInventoryItem extends StandardCommand<InventoryItemCreated, InventoryItem> { private String name; public CreateInventoryItem(@PropertyName("name") String name) { this.name = name; } // イベントの生成 @Override public EventStream<InventoryItemCreated> events() throws Exception { // 在庫作成イベント val created = new InventoryItemCreated(); // 名前変更イベント val renamed = new InventoryItemRenamed(created.uuid(), name); return EventStream.ofWithState(created, created, renamed); } // 結果の生成 @Override public InventoryItem result(InventoryItemCreated state, Repository repository) { return InventoryItem.lookup(repository, state.uuid()).get(); } }
次に、在庫数を加えるためのコマンドです。
ドメインモデルの内部状態を更新するだけなので、状態と結果の型を Void
としました。
状態を伴わないので EventStream.of
で EventStream を返します。
コマンドの適用結果が Void なので result メソッドをオーバーライドする必要もありません。
在庫数追加コマンド src/main/java/sample/commands/CheckInItemsToInventory.java
package sample.commands; import com.eventsourcing.EventStream; import com.eventsourcing.StandardCommand; import com.eventsourcing.layout.PropertyName; import lombok.EqualsAndHashCode; import lombok.Value; import java.util.UUID; import sample.events.ItemsCheckedInToInventory; // 在庫数追加コマンド @Value @EqualsAndHashCode(callSuper = false) public class CheckInItemsToInventory extends StandardCommand<Void, Void> { private UUID reference; private int count; public CheckInItemsToInventory( @PropertyName("reference") UUID reference, @PropertyName("count") int count) { this.reference = reference; this.count = count; } @Override public EventStream<Void> events() throws Exception { return EventStream.of(new ItemsCheckedInToInventory(reference, count)); } }
イベントの作成
イベントは StandardEvent
を継承して作成します。
イベントを検索するための条件を SimpleIndex
を使って定義します。
以下では、uuid の値で InventoryItemCreated を取得できるように、SimpleIndex<InventoryItemCreated, UUID> を定義しています。
在庫作成イベント src/main/java/sample/events/InventoryItemCreated.java
package sample.events; import com.eventsourcing.StandardEvent; import com.eventsourcing.index.SimpleIndex; import java.util.UUID; // 在庫作成イベント public class InventoryItemCreated extends StandardEvent { public static SimpleIndex<InventoryItemCreated, UUID> ID = SimpleIndex.as(InventoryItemCreated::uuid); }
在庫名の更新イベントの場合、最新のものを取得すれば最終的な名称が決定するので、timestamp で比較する SimpleIndex<InventoryItemRenamed, HybridTimestamp>
を定義しています。
なお、名前変更のためのイベントは eventsourcing-cep モジュールに予め用意されていますが(NameChanged
)、今回は自前で実装しています。
在庫名の変更イベント src/main/java/sample/events/InventoryItemRenamed.java
package sample.events; import com.eventsourcing.StandardEvent; import com.eventsourcing.hlc.HybridTimestamp; import com.eventsourcing.index.Index; import com.eventsourcing.index.SimpleIndex; import com.eventsourcing.layout.PropertyName; import lombok.EqualsAndHashCode; import lombok.Value; import java.util.UUID; import static com.eventsourcing.index.IndexEngine.IndexFeature.*; // 在庫名の変更イベント @Value @EqualsAndHashCode(callSuper = false) public class InventoryItemRenamed extends StandardEvent { private UUID reference; private String newName; public InventoryItemRenamed(@PropertyName("reference") UUID reference, @PropertyName("newName") String newName) { this.reference = reference; this.newName = newName; } public static SimpleIndex<InventoryItemRenamed, UUID> ID = SimpleIndex.as(InventoryItemRenamed::uuid); public static SimpleIndex<InventoryItemRenamed, UUID> REFERENCE_ID = SimpleIndex.as(InventoryItemRenamed::getReference); @Index({EQ, LT, GT}) public static SimpleIndex<InventoryItemRenamed, HybridTimestamp> TIMESTAMP = SimpleIndex.as(InventoryItemRenamed::timestamp); }
在庫数の追加イベント src/main/java/sample/events/ItemsCheckedInToInventory.java
package sample.events; import com.eventsourcing.StandardEvent; import com.eventsourcing.index.SimpleIndex; import com.eventsourcing.layout.PropertyName; import lombok.EqualsAndHashCode; import lombok.Value; import java.util.UUID; // 在庫数の追加イベント @Value @EqualsAndHashCode(callSuper = false) public class ItemsCheckedInToInventory extends StandardEvent { private UUID reference; private int count; public ItemsCheckedInToInventory( @PropertyName("reference") UUID reference, @PropertyName("count") int count) { this.reference = reference; this.count = count; } public static SimpleIndex<ItemsCheckedInToInventory, UUID> ID = SimpleIndex.as(ItemsCheckedInToInventory::uuid); public static SimpleIndex<ItemsCheckedInToInventory, UUID> REFERENCE_ID = SimpleIndex.as(ItemsCheckedInToInventory::getReference); }
ドメインプロトコルの作成
ドメインプロトコルという機能を使えば、共通的なプロパティ(例えば name など)をドメインモデルから独立して定義できます。(ドメインモデルへ implements して使う)
Protocol
を extends したインターフェースとして定義し、プロパティの値を(イベントの内容から)確定する処理をデフォルトメソッドとして定義します。
在庫名は一番最後の InventoryItemRenamed の newName で確定するため、latestAssociatedEntity
を使って最後の InventoryItemRenamed イベントを取得します。
なお、名前に関するドメインプロトコルも eventsourcing-cep モジュールに予め用意されています(NameProtocol
)。
在庫名 src/main/java/sample/protocols/InventoryItemNameProtocol.java
package sample.protocols; import com.eventsourcing.Protocol; import com.eventsourcing.queries.ModelQueries; import sample.events.InventoryItemRenamed; // 在庫名 public interface InventoryItemNameProtocol extends Protocol, ModelQueries { default String name() { // reference が id の値に等しい最後の InventoryItemRenamed を取得し // newName の値を返す return latestAssociatedEntity( InventoryItemRenamed.class, InventoryItemRenamed.REFERENCE_ID, InventoryItemRenamed.TIMESTAMP ).map(InventoryItemRenamed::getNewName).orElse(""); } }
次に、在庫数の場合は、ItemsCheckedInToInventory を集計するように Repository
の query
メソッドで該当する全ての ItemsCheckedInToInventory を取得し、count の合計値を算出しています。
在庫数 src/main/java/sample/protocols/InventoryItemCountProtocol.java
package sample.protocols; import com.eventsourcing.EntityHandle; import com.eventsourcing.Protocol; import lombok.val; import java.util.stream.StreamSupport; import sample.events.ItemsCheckedInToInventory; import static com.eventsourcing.index.EntityQueryFactory.equal; // 在庫数 public interface InventoryItemCountProtocol extends Protocol { default int count() { // reference が id の値に等しい ItemsCheckedInToInventory を全て取得 val res = getRepository().query( ItemsCheckedInToInventory.class, equal(ItemsCheckedInToInventory.REFERENCE_ID, getId()) ); return StreamSupport.stream(res.spliterator(), false) .map(EntityHandle::get) .mapToInt(ItemsCheckedInToInventory::getCount) .sum(); } }
ドメインモデルの作成
ドメインモデルは Model
と必要なドメインプロトコルを implements して作成します。
同一インスタンスの判定に id の値だけを使うように equals と hashCode をオーバーライドします。(今回は @EqualsAndHashCode(of = "id")
で実施)
CreateInventoryItem コマンドの result メソッドで使った lookup メソッドの実装も行います。
src/main/java/sample/domain/InventoryItem.java
package sample.domain; import com.eventsourcing.Model; import com.eventsourcing.Repository; import com.eventsourcing.queries.ModelQueries; import lombok.EqualsAndHashCode; import lombok.Value; import java.util.Optional; import java.util.UUID; import sample.events.InventoryItemCreated; import sample.protocols.InventoryItemCountProtocol; import sample.protocols.InventoryItemNameProtocol; @Value @EqualsAndHashCode(of = "id") public class InventoryItem implements Model, InventoryItemNameProtocol, InventoryItemCountProtocol { private final Repository repository; private final UUID id; protected InventoryItem(Repository repository, UUID id) { this.repository = repository; this.id = id; } public static Optional<InventoryItem> lookup(Repository repository, UUID id) { // InventoryItemCreated.ID インデックス(SimpleIndex)を使って // id に合致する InventoryItemCreated を取得 Optional<InventoryItemCreated> res = ModelQueries.lookup( repository, InventoryItemCreated.class, InventoryItemCreated.ID, id ); return res.map(ev -> new InventoryItem(repository, id)); } }
実行クラスの作成
es4j では Repository
を使ってイベントソーシングの処理を行います。
Repository は StandardRepository
へ以下のような設定を行う事で構築できます。
- (1) Journal と IndexEngine の設定
- (2) コマンドとイベントの設定(CommandSetProvider と EventSetProvider)
StandardRepository のデフォルト設定では localhost の NTP サーバーを使うようになっているようですが (NTP サーバーが無いとエラーになる)、今回はデフォルト設定の代わりに PhysicalTimeProvider 実装クラスを用意しました。
(2) で PackageXXXSetProvider を使うと指定のパッケージへ属するコマンドやイベントを一括で設定できます。
コマンドを適用する際は publish
メソッドを使います。(publish の戻り値は CompletableFuture
)
src/main/java/SampleApp.java
import com.eventsourcing.*; import com.eventsourcing.hlc.PhysicalTimeProvider; import com.eventsourcing.index.MemoryIndexEngine; import com.eventsourcing.inmem.MemoryJournal; import com.eventsourcing.repository.StandardRepository; import com.google.common.util.concurrent.AbstractService; import java.util.concurrent.*; import lombok.val; import sample.commands.CheckInItemsToInventory; import sample.commands.CreateInventoryItem; import sample.domain.InventoryItem; import sample.events.InventoryItemCreated; public class SampleApp { public static void main(String... args) { // (1) Journal と IndexEngine の設定 val repository = StandardRepository.builder() .journal(new MemoryJournal()) .indexEngine(new MemoryIndexEngine()) .physicalTimeProvider(new SampleTimeProvider()) .build(); // (2) コマンドとイベントの設定(CommandSetProvider と EventSetProvider) repository.addCommandSetProvider( new PackageCommandSetProvider(new Package[] {CreateInventoryItem.class.getPackage()})); repository.addEventSetProvider( new PackageEventSetProvider(new Package[] {InventoryItemCreated.class.getPackage()})); // 開始 repository.startAsync().awaitRunning(); try { // CreateInventoryItem コマンド適用 val d = repository.publish(new CreateInventoryItem("sample1")).get(); dumpInventoryItem(d); // CheckInItemsToInventory コマンド適用 repository.publish(new CheckInItemsToInventory(d.getId(), 5)).get(); dumpInventoryItem(d); // CheckInItemsToInventory コマンド適用 repository.publish(new CheckInItemsToInventory(d.getId(), 3)).get(); dumpInventoryItem(d); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } finally { // 停止 stopRepository(repository); } } private static InventoryItem dumpInventoryItem(InventoryItem item) { System.out.println("----- InventoryItem -----"); System.out.println("id: " + item.getId()); System.out.println("name: " + item.name()); System.out.println("count: " + item.count()); return item; } private static void stopRepository(Repository repository) { System.out.println("stop..."); try { repository.stopAsync().awaitTerminated(10, TimeUnit.SECONDS); } catch (TimeoutException ex) { ex.printStackTrace(); } } // PhysicalTimeProvider の実装 static class SampleTimeProvider extends AbstractService implements PhysicalTimeProvider { @Override public long getPhysicalTime() { return System.currentTimeMillis(); } @Override protected void doStart() { System.out.println("timeprovider start..."); notifyStarted(); } @Override protected void doStop() { System.out.println("timeprovider stop..."); notifyStopped(); } } }
publish の呼び出し箇所は CompletableFuture で処理を繋げれば以下のように実装できます。
src/main/java/SampleApp.java(CompletableFuture 活用版)
public class SampleApp { public static void main(String... args) { ・・・ repository.publish(new CreateInventoryItem("sample1")) .thenApply(SampleApp::dumpInventoryItem) .thenCompose(d -> repository.publish(new CheckInItemsToInventory(d.getId(), 5)) .thenApply(v -> d) ) .thenApply(SampleApp::dumpInventoryItem) .thenCompose(d -> repository.publish(new CheckInItemsToInventory(d.getId(), 3)) .thenApply(v -> d) ) .thenApply(SampleApp::dumpInventoryItem) .whenComplete((d, e) -> stopRepository(repository)); } ・・・ }
実行結果
> gradle -q run [main] INFO org.reflections.Reflections - Reflections took 64 ms to scan 1 urls, producing 2 keys and 4 values [main] INFO org.reflections.Reflections - Reflections took 17 ms to scan 1 urls, producing 2 keys and 6 values timeprovider start... ----- InventoryItem ----- id: d7636355-850d-4a15-9e85-ce0f1de514e7 name: sample1 count: 0 ----- InventoryItem ----- id: d7636355-850d-4a15-9e85-ce0f1de514e7 name: sample1 count: 5 ----- InventoryItem ----- id: d7636355-850d-4a15-9e85-ce0f1de514e7 name: sample1 count: 8 stop... timeprovider stop...
備考
実は、今回のサンプルを実行すると処理が終わってもプロセスは終了しません。
スレッドダンプを出力してみたところ、StandardEntity に原因がありそうです。
スレッドダンプ出力例
> jcmd 8904 Thread.print ・・・ "Thread-1" #11 prio=5 os_prio=0 ・・・ waiting on condition [・・・] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <・・・> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) at java.util.concurrent.LinkedBlockingDeque.putLast(LinkedBlockingDeque.java:396) at java.util.concurrent.LinkedBlockingDeque.put(LinkedBlockingDeque.java:649) at com.eventsourcing.StandardEntity.lambda$static$0(StandardEntity.java:27) at com.eventsourcing.StandardEntity$$Lambda$14/540159270.run(Unknown Source) at java.lang.Thread.run(Thread.java:745) ・・・
StandardEntity のソース(27行目付近)を見てみると、以下のようにスレッドを開始して放置していました。
com.eventsourcing.StandardEntity のソース
・・・ public abstract class StandardEntity<E extends Entity<E>> implements Entity<E> { private static LinkedBlockingDeque<UUID> uuids = new LinkedBlockingDeque<>(10_000); static { new Thread(() -> { while (true) { try { uuids.put(UUID.randomUUID()); // 27行目 } catch (InterruptedException e) { } } }).start(); } ・・・
RxJava2 で並列処理
前回 と同じような並列処理を RxJava 2.0 で試してみました。
ソースは http://github.com/fits/try_samples/tree/master/blog/20161226/
はじめに
まずは、map の処理と subscribe の処理を順番に 3回繰り返す処理です。
前回の Reactor との違いは Mono
が Single
に変わっただけです。
(A) サンプルコード
Single.just("(A)") .repeat(3) .map(n -> { printThread(n + "_map"); return n; }) .subscribe(n -> printThread(n + "_subscribe"));
(A) サンプルコードの実行結果
(A)_map, thread: Thread[main,5,main] (A)_subscribe, thread: Thread[main,5,main] (A)_map, thread: Thread[main,5,main] (A)_subscribe, thread: Thread[main,5,main] (A)_map, thread: Thread[main,5,main] (A)_subscribe, thread: Thread[main,5,main]
observeOn の場合
RxJava 2.0 では Reactor の publishOn
の代わりに observeOn
が使えます。
main スレッドとは別のスレッドで実行しますが、順番に実行する点は変わりません。
(B) observeOn を使った場合
Single.just("(B)") .repeat(3) .observeOn(Schedulers.computation()) // 実質的には以下でも同じ //.observeOn(Schedulers.single()) .map(n -> { printThread(n + "_map"); return n; }) .subscribe(n -> printThread(n + "_subscribe"));
(B) observeOn を使った場合の実行結果
(B)_map, thread: Thread[RxComputationThreadPool-1,5,main] (B)_subscribe, thread: Thread[RxComputationThreadPool-1,5,main] (B)_map, thread: Thread[RxComputationThreadPool-1,5,main] (B)_subscribe, thread: Thread[RxComputationThreadPool-1,5,main] (B)_map, thread: Thread[RxComputationThreadPool-1,5,main] (B)_subscribe, thread: Thread[RxComputationThreadPool-1,5,main]
flatMap と subscribeOn を使った並列処理
RxJava 2.0 では Reactor の ParallelFlux
相当の処理は用意されていないようです。
そこで、flatMap
と subscribeOn
を使ってみました。
(C) flatMap + subscribeOn
Single.just("(C)") .repeat(3) .flatMap(s -> Flowable.just(s) .map(n -> { printThread(n + "_map"); return n; }) .subscribeOn(Schedulers.computation()) //.subscribeOn(Schedulers.newThread()) ) .subscribe(n -> printThread(n + "_subscribe"));
ただし、以下の実行結果のように別スレッドで実行してはいるものの、map と subscribe を同じスレッドで実行するわけではありませんでした。
なお、Schedulers.computation()
の場合、スレッド数は実行環境のコア数に依存するようです。(newThread は新しいスレッドを使う)
(C) flatMap + subscribeOn の実行結果1
(C)_map, thread: Thread[RxComputationThreadPool-2,5,main] (C)_map, thread: Thread[RxComputationThreadPool-3,5,main] (C)_map, thread: Thread[RxComputationThreadPool-1,5,main] (C)_subscribe, thread: Thread[RxComputationThreadPool-2,5,main] (C)_subscribe, thread: Thread[RxComputationThreadPool-1,5,main] (C)_subscribe, thread: Thread[RxComputationThreadPool-1,5,main]
(C) flatMap + subscribeOn の実行結果2
(C)_map, thread: Thread[RxComputationThreadPool-1,5,main] (C)_map, thread: Thread[RxComputationThreadPool-3,5,main] (C)_map, thread: Thread[RxComputationThreadPool-2,5,main] (C)_subscribe, thread: Thread[RxComputationThreadPool-1,5,main] (C)_subscribe, thread: Thread[RxComputationThreadPool-2,5,main] (C)_subscribe, thread: Thread[RxComputationThreadPool-2,5,main]
備考
最後に、今回使用したソースを記載しておきます。
build.gradle
apply plugin: 'application' mainClassName = 'App' repositories { jcenter() } dependencies { compile 'io.reactivex.rxjava2:rxjava:2.0.3' } run { standardInput = System.in if (project.hasProperty('args')) { args project.args } }
src/main/java/App.java
import io.reactivex.Single; import io.reactivex.Flowable; import io.reactivex.schedulers.Schedulers; import java.lang.invoke.MethodHandle; import static java.lang.invoke.MethodHandles.publicLookup; import static java.lang.invoke.MethodType.methodType; public class App { public static void main(String... args) throws Throwable { MethodHandle mh = publicLookup().findStatic( App.class, "sample" + args[0], methodType(void.class) ); mh.invoke(); System.in.read(); } // (A) public static void sampleA() { Single.just("(A)") .repeat(3) .map(n -> { printThread(n + "_map"); return n; }) .subscribe(n -> printThread(n + "_subscribe")); } // (B) public static void sampleB() { Single.just("(B)") .repeat(3) .observeOn(Schedulers.computation()) //.observeOn(Schedulers.single()) .map(n -> { printThread(n + "_map"); return n; }) .subscribe(n -> printThread(n + "_subscribe")); } // (C) public static void sampleC() { Single.just("(C)") .repeat(3) .flatMap(s -> Flowable.just(s) .map(n -> { printThread(n + "_map"); return n; }) .subscribeOn(Schedulers.computation()) //.subscribeOn(Schedulers.newThread()) ) .subscribe(n -> printThread(n + "_subscribe")); } private static void printThread(String msg) { System.out.println(msg + ", thread: " + Thread.currentThread()); } }
実行例
> gradle -q run -Pargs=C (C)_map, thread: Thread[RxComputationThreadPool-2,5,main] (C)_map, thread: Thread[RxComputationThreadPool-3,5,main] (C)_map, thread: Thread[RxComputationThreadPool-1,5,main] (C)_subscribe, thread: Thread[RxComputationThreadPool-2,5,main] (C)_subscribe, thread: Thread[RxComputationThreadPool-3,5,main] (C)_subscribe, thread: Thread[RxComputationThreadPool-3,5,main]
Reactor で並列処理
Reactor の並列処理を試してみました。
ソースは http://github.com/fits/try_samples/tree/master/blog/20161208/
はじめに
Reactor で以下のようなコードを実行すると、map の処理と subscribe の処理を順番に 3回繰り返します。(repeat
メソッドの戻り値は Flux
)
(A) サンプルコード
Mono.just("(A)") .repeat(3) .map(n -> { printThread(n + "_map"); return n; }) .subscribe(n -> printThread(n + "_subscribe"));
(A) サンプルコードの実行結果
(A)_map, thread: Thread[main,5,main] (A)_subscribe, thread: Thread[main,5,main] (A)_map, thread: Thread[main,5,main] (A)_subscribe, thread: Thread[main,5,main] (A)_map, thread: Thread[main,5,main] (A)_subscribe, thread: Thread[main,5,main]
ここで、今回は map からの処理を別スレッドで並列実行してみます。
publishOn の場合
publishOn
を使うと main スレッドとは別のスレッドで実行しますが、順番に実行する点は変わりません。
(B) publishOn を使った場合
Mono.just("(B)") .repeat(3) .publishOn(Schedulers.parallel()) // 実質的に以下と同じ //.publishOn(Schedulers.single()) .map(n -> { printThread(n + "_map"); return n; }) .subscribe(n -> printThread(n + "_subscribe"));
(B) publishOn を使った場合の実行結果
(B)_map, thread: Thread[parallel-1,5,main] (B)_subscribe, thread: Thread[parallel-1,5,main] (B)_map, thread: Thread[parallel-1,5,main] (B)_subscribe, thread: Thread[parallel-1,5,main] (B)_map, thread: Thread[parallel-1,5,main] (B)_subscribe, thread: Thread[parallel-1,5,main]
publishOn へ Schedulers.parallel()
を渡しても複数のスレッドを使うわけでは無いようなので、実質的に Schedulers.single()
と変わりは無さそうです。
ParallelFlux を使った並列処理
並列で実行するためには parallel
メソッドを使って Flux
から ParallelFlux
を取得し、runOn
メソッドで Schedulers.parallel()
※ を指定します。
※ この場合、Schedulers.parallel() の代わりに Schedulers.single() を使うと並列にならない点に注意 (単一スレッドで実行する事になる)
(C) parallel + runOn
Mono.just("(C)") .repeat(3) .parallel() .runOn(Schedulers.parallel()) .map(n -> { printThread(n + "_map"); return n; }) .subscribe(n -> printThread(n + "_subscribe"));
(C) parallel + runOn の実行結果
(C)_map, thread: Thread[parallel-1,5,main] (C)_map, thread: Thread[parallel-3,5,main] (C)_map, thread: Thread[parallel-2,5,main] (C)_subscribe, thread: Thread[parallel-3,5,main] (C)_subscribe, thread: Thread[parallel-1,5,main] (C)_subscribe, thread: Thread[parallel-2,5,main]
3つのスレッドを使って並列に実行できているようです。
なお、runOn
をコメントアウトして実行すると (A) と同様の結果(main スレッドで順番に実行)になります。
備考
最後に、今回使用したソースを記載しておきます。
build.gradle
apply plugin: 'application' mainClassName = 'App' repositories { jcenter() } dependencies { compile 'io.projectreactor:reactor-core:3.0.3.RELEASE' } run { standardInput = System.in if (project.hasProperty('args')) { args project.args } }
src/main/java/App.java
import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; import java.lang.invoke.MethodHandle; import static java.lang.invoke.MethodHandles.publicLookup; import static java.lang.invoke.MethodType.methodType; public class App { public static void main(String... args) throws Throwable { MethodHandle mh = publicLookup().findStatic( App.class, "sample" + args[0], methodType(void.class) ); mh.invoke(); System.in.read(); } // (A) public static void sampleA() { Mono.just("(A)") .repeat(3) .map(n -> { printThread(n + "_map"); return n; }) .subscribe(n -> printThread(n + "_subscribe")); } // (B) publishOn を使った場合 public static void sampleB() { Mono.just("(B)") .repeat(3) .publishOn(Schedulers.parallel()) //.publishOn(Schedulers.single()) .map(n -> { printThread(n + "_map"); return n; }) .subscribe(n -> printThread(n + "_subscribe")); } // (C) parallel + runOn public static void sampleC() { Mono.just("(C)") .repeat(3) .parallel() .runOn(Schedulers.parallel()) .map(n -> { printThread(n + "_map"); return n; }) .subscribe(n -> printThread(n + "_subscribe")); } private static void printThread(String msg) { System.out.println(msg + ", thread: " + Thread.currentThread()); } }
実行例
> gradle -q run -Pargs=B (B)_map, thread: Thread[parallel-1,5,main] (B)_subscribe, thread: Thread[parallel-1,5,main] (B)_map, thread: Thread[parallel-1,5,main] (B)_subscribe, thread: Thread[parallel-1,5,main] (B)_map, thread: Thread[parallel-1,5,main] (B)_subscribe, thread: Thread[parallel-1,5,main]
Spring Web Reactive を試す
Spring 5 で導入される Spring Web Reactive を試してみました。
本来なら Spring Boot で実行する事になると思いますが、今回は Spring Boot を使わずに Undertow で直接実行してみます。
ソースは http://github.com/fits/try_samples/tree/master/blog/20161115/
サンプル作成
ビルド定義
現時点では Spring Web Reactive の正式版はリリースされていないようなのでスナップショット版を使います。
Undertow を実行するために undertow-core
、JSON で結果を返す処理を試すために jackson-databind
を依存関係へ設定しています。
build.gradle
apply plugin: 'application' mainClassName = 'sample.App' repositories { jcenter() maven { url 'http://repo.spring.io/snapshot/' } } dependencies { compile 'org.projectlombok:lombok:1.16.10' // Spring Web Reactive compile 'org.springframework:spring-web-reactive:5.0.0.BUILD-SNAPSHOT' // Undertow compile 'io.undertow:undertow-core:2.0.0.Alpha1' // JSON 用 runtime 'com.fasterxml.jackson.core:jackson-databind:2.8.4' }
設定クラス
@EnableWebReactive
を付与すれば Spring Web Reactive を有効にできるようです。
src/main/java/sample/config/AppConfig.java
package sample.config; import org.springframework.context.annotation.ComponentScan; import org.springframework.web.reactive.config.EnableWebReactive; @EnableWebReactive @ComponentScan("sample.controller") public class AppConfig { }
コントローラークラス
Spring Web Reactive は Spring Web (MVC) のプログラミングスタイルを踏襲しているようです。
Spring Web (MVC) と同じアノテーションでコントローラークラスを定義し、メソッドの戻り値に Reactor の Mono / Flux を使えばよさそうです。
クラス | 概要 |
---|---|
Mono | 単一の結果を返す場合に使用 |
Flux | 複数の結果を返す場合に使用 |
src/main/java/sample/controller/SampleController.java
package sample.controller; import lombok.Value; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @RestController public class SampleController { @RequestMapping("/") public Mono<String> sample() { return Mono.just("sample"); } @RequestMapping(value = "/data/{name}", produces = "application/json") public Flux<Data> dataList(@PathVariable("name") String name) { return Flux.fromArray(new Data[] { new Data(name + "-1", 1), new Data(name + "-2", 2), new Data(name + "-3", 3) }); } @Value class Data { private String name; private int value; } }
実行クラス
まずは、Spring Web Reactive を有効化した ApplicationContext を使って HttpHandler を作成します。 (DispatcherHandler
と WebHttpHandlerBuilder
を使用)
あとは、対象の Web サーバー ※ に合わせて実行します。
※ 今のところ Servlet 3.1 対応コンテナ、Netty、Undertow を サポートしているようです
Undertow の場合、アダプタークラス UndertowHttpHandlerAdapter
で HttpHandler をラッピングして実行するだけです。
src/main/java/sample/App.java
package sample; import io.undertow.Undertow; import lombok.val; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.http.server.reactive.UndertowHttpHandlerAdapter; import org.springframework.web.reactive.DispatcherHandler; import org.springframework.web.server.adapter.WebHttpHandlerBuilder; import sample.config.AppConfig; public class App { public static void main(String... args) { val ctx = new AnnotationConfigApplicationContext(); // @EnableWebReactive を付与したクラスを登録 ctx.register(AppConfig.class); ctx.refresh(); val handler = new DispatcherHandler(); handler.setApplicationContext(ctx); // HttpHandler の作成 val httpHandler = WebHttpHandlerBuilder.webHandler(handler).build(); val server = Undertow.builder() .addHttpListener(8080, "localhost") // Undertow 用アダプターでラッピングして設定 .setHandler(new UndertowHttpHandlerAdapter(httpHandler)) .build(); server.start(); } }
実行
Gradle で実行
> gradle -q run ・・・ 情報: Mapped "{[/data/{name}],produces=[application/json]}" onto public reactor.core.publisher.Flux<sample.controller.SampleController$Data> sample.controller.SampleController.dataList(java.lang.String) 11 14, 2016 9:32:23 午後 org.springframework.web.reactive.result.method.annotation.RequestMappingHandlerMapping register 情報: Mapped "{[/]}" onto public reactor.core.publisher.Mono<java.lang.String> sample.controller.SampleController.sample() 11 14, 2016 9:32:23 午後 org.xnio.Xnio <clinit> INFO: XNIO version 3.3.6.Final 11 14, 2016 9:32:23 午後 org.xnio.nio.NioXnio <clinit> INFO: XNIO NIO Implementation Version 3.3.6.Final
動作確認1
$ curl -s http://localhost:8080/ sample
動作確認2
$ curl -s http://localhost:8080/data/a [{"name":"a-1","value":1},{"name":"a-2","value":2},{"name":"a-3","value":3}]
Java で行列の演算 - nd4j, commons-math, la4j, ujmp, jblas, colt
Java で以下のような行列の演算を複数のライブラリで試しました。
- (a) 和
- (b) 積
- (c) 転置
とりあえず今回は、更新日が比較的新しめのライブラリを試してみました。
- ND4J 0.6.0
- Commons Math 3.6.1
- la4j 0.6.0
- UJMP 0.3.0
- jblas 1.2.4
- Colt Blazegraph 版 2.1.4
また、あくまでも個人的な印象ですが、手軽に使いたいなら la4j か Commons Math、性能重視なら ND4J か jblas、可視化や DB との連携を考慮するなら UJMP を使えば良さそうです。
ソースは http://github.com/fits/try_samples/tree/master/blog/20161031/
ND4J
Deeplearning4J で使用しているライブラリ。
build.gradle
apply plugin: 'application' mainClassName = 'SampleApp' repositories { jcenter() } dependencies { compile 'org.nd4j:nd4j-native-platform:0.6.0' runtime 'org.slf4j:slf4j-nop:1.7.21' }
src/main/java/SampleApp.java
import org.nd4j.linalg.api.ndarray.INDArray; import org.nd4j.linalg.factory.Nd4j; public class SampleApp { public static void main(String... args) { INDArray x = Nd4j.create(new double[][] { {1, 2}, {3, 4} }); INDArray y = Nd4j.create(new double[][] { {5, 6}, {7, 8} }); // (a) System.out.println( x.add(y) ); System.out.println("-----"); // (b) System.out.println( x.mmul(y) ); System.out.println("-----"); // (c) System.out.println( x.transpose() ); } }
実行結果
> gradle -q run [[ 6.00, 8.00], [10.00, 12.00]] ----- [[19.00, 22.00], [43.00, 50.00]] ----- [[1.00, 3.00], [2.00, 4.00]]
Commons Math
Apache Commons のライブラリ。
build.gradle
・・・
dependencies {
compile 'org.apache.commons:commons-math3:3.6.1'
}
src/main/java/SampleApp.java
import org.apache.commons.math3.linear.MatrixUtils; import org.apache.commons.math3.linear.RealMatrix; public class SampleApp { public static void main(String... args) { RealMatrix x = MatrixUtils.createRealMatrix(new double[][] { {1, 2}, {3, 4} }); RealMatrix y = MatrixUtils.createRealMatrix(new double[][] { {5, 6}, {7, 8} }); // (a) System.out.println( x.add(y) ); System.out.println("-----"); // (b) System.out.println( x.multiply(y) ); System.out.println("-----"); // (c) System.out.println( x.transpose() ); } }
実行結果
> gradle -q run Array2DRowRealMatrix{{6.0,8.0},{10.0,12.0}} ----- Array2DRowRealMatrix{{19.0,22.0},{43.0,50.0}} ----- Array2DRowRealMatrix{{1.0,3.0},{2.0,4.0}}
la4j
Java のみで実装された軽量なライブラリ。
build.gradle
・・・
dependencies {
compile 'org.la4j:la4j:0.6.0'
}
src/main/java/SampleApp.java
import org.la4j.Matrix; public class SampleApp { public static void main(String... args) { Matrix x = Matrix.from2DArray(new double[][] { {1, 2}, {3, 4} }); Matrix y = Matrix.from2DArray(new double[][] { {5, 6}, {7, 8} }); // (a) System.out.println( x.add(y) ); System.out.println("-----"); // (b) System.out.println( x.multiply(y) ); System.out.println("-----"); // (c) System.out.println( x.transpose() ); } }
実行結果
> gradle -q run 6.000 8.000 10.000 12.000 ----- 19.000 22.000 43.000 50.000 ----- 1.000 3.000 2.000 4.000
UJMP
データ可視化や JDBC との連携など、機能豊富そうなライブラリ。 Colt や jblas 等ともプラグインモジュールで連携できる模様。
build.gradle
・・・
dependencies {
compile 'org.ujmp:ujmp-core:0.3.0'
}
src/main/java/SampleApp.java
import org.ujmp.core.DenseMatrix; import org.ujmp.core.Matrix; public class SampleApp { public static void main(String... args) { Matrix x = DenseMatrix.Factory.linkToArray( new double[] {1, 2}, new double[] {3, 4} ); Matrix y = DenseMatrix.Factory.linkToArray( new double[] {5, 6}, new double[] {7, 8} ); // (a) System.out.println( x.plus(y) ); System.out.println("-----"); // (b) System.out.println( x.mtimes(y) ); System.out.println("-----"); // (c) System.out.println( x.transpose() ); } }
実行結果
> gradle -q run 6.0000 8.0000 10.0000 12.0000 ----- 19.0000 22.0000 43.0000 50.0000 ----- 1.0000 3.0000 2.0000 4.0000
jblas
BLAS/LAPACK をベースとしたライブラリ。 ネイティブライブラリを使用する。
build.gradle
・・・
dependencies {
compile 'org.jblas:jblas:1.2.4'
}
src/main/java/SampleApp.java
import org.jblas.DoubleMatrix; public class SampleApp { public static void main(String... args) { DoubleMatrix x = new DoubleMatrix(new double[][] { {1, 2}, {3, 4} }); DoubleMatrix y = new DoubleMatrix(new double[][] { {5, 6}, {7, 8} }); // (a) System.out.println( x.add(y) ); System.out.println("-----"); // (b) System.out.println( x.mmul(y) ); System.out.println("-----"); // (c) System.out.println( x.transpose() ); } }
実行結果
> gradle -q run [6.000000, 8.000000; 10.000000, 12.000000] ----- [19.000000, 22.000000; 43.000000, 50.000000] ----- [1.000000, 3.000000; 2.000000, 4.000000] -- org.jblas INFO Starting temp DLL cleanup task. -- org.jblas INFO Deleted 4 unused temp DLL libraries from ・・・
Colt Blazegraph 版
Colt は長らく更新されていないようなので、今回は Blazegraph による fork 版? を使いました。
build.gradle
・・・
dependencies {
compile 'com.blazegraph:colt:2.1.4'
}
src/main/java/SampleApp.java
import cern.colt.matrix.DoubleFactory2D; import cern.colt.matrix.DoubleMatrix2D; import cern.colt.matrix.linalg.Algebra; import cern.jet.math.Functions; public class SampleApp { public static void main(String... args) { DoubleMatrix2D x = DoubleFactory2D.dense.make(new double[][] { {1, 2}, {3, 4} }); DoubleMatrix2D y = DoubleFactory2D.dense.make(new double[][] { {5, 6}, {7, 8} }); // (a) System.out.println( x.copy().assign(y, Functions.plus) ); System.out.println("-----"); Algebra algebra = new Algebra(); // (b) System.out.println( algebra.mult(x, y) ); System.out.println("-----"); // (c) System.out.println( x.viewDice() ); } }
assign
を使うと自身の値を更新するため copy
を使っています。
実行結果
> gradle -q run 2 x 2 matrix 6 8 10 12 ----- 2 x 2 matrix 19 22 43 50 ----- 2 x 2 matrix 1 3 2 4
Lucene API で Solr と Elasticsearch のインデックスを確認
Groovy で Lucene の API を使用して Solr や Elasticsearch のインデックスの内容を確認してみました。(Lucene 6.2.1 の API を使用)
ソースは http://github.com/fits/try_samples/tree/master/blog/20161024/
(a) ドキュメントの内容を出力
まずは、ドキュメントに属するフィールドの内容を出力する処理です。
DirectoryReader
から Document
を取得し、フィールド IndexableField
の内容を出力しています。
dump_docs.groovy
@Grab('org.apache.lucene:lucene-core:6.2.1') import org.apache.lucene.index.DirectoryReader import org.apache.lucene.store.FSDirectory import java.nio.file.Paths def dir = FSDirectory.open(Paths.get(args[0])) DirectoryReader.open(dir).withCloseable { reader -> println "numDocs = ${reader.numDocs()}" (0..<reader.numDocs()).each { // ドキュメントの取得 def doc = reader.document(it) println "---------- doc: ${it} ----------" // ドキュメント内のフィールドを出力 doc.fields.each { f -> def value = f.binaryValue()? f.binaryValue().utf8ToString(): f.stringValue() println "<field> name=${f.name}, value=${value}, class=${f.class}" } } }
(b) Term の内容を出力
インデックス内のフィールド情報と Term を出力する処理です。 Term は基本的な検索の単位となっており、Term の内容を見れば単語の分割状況を確認できます。
これらの情報を取得するには LeafReader
を使います。
Term の内容 BytesRef
は TermsEnum
から取得できます。
LeafReader から terms
メソッドで該当フィールドの Terms
を取得し、iterator
メソッドで TermsEnum を取得します。
dump_terms.groovy
@Grab('org.apache.lucene:lucene-core:6.2.1') import org.apache.lucene.index.DirectoryReader import org.apache.lucene.store.FSDirectory import java.nio.file.Paths def dir = FSDirectory.open(Paths.get(args[0])) DirectoryReader.open(dir).withCloseable { reader -> reader.leaves().each { ctx -> // LeafReader の取得 def leafReader = ctx.reader() println "---------- leaf: ${leafReader} ----------" // フィールド情報の出力 leafReader.getFieldInfos().each { fi -> println "<fieldInfo> name: ${fi.name}, valueType: ${fi.docValuesType}, indexOptions: ${fi.indexOptions}" } leafReader.fields().each { name -> // 指定のフィールド名に対する TermsEnum を取得 def termsEnum = leafReader.terms(name).iterator() println '' println "===== <term> name=${name} =====" try { while(termsEnum.next() != null) { // Term の内容を出力 println "term=${termsEnum.term().utf8ToString()}, freq=${termsEnum.docFreq()}" } } catch(e) { } } } }
動作確認
Lucene のバージョンが以下のようになっていたので、今回は Solr 6.2.1 と Elasticsearch 5.0.0 RC1 のインデックス内容を確認してみます。
プロダクト | 使用している Lucene のバージョン |
---|---|
Solr 6.2.1 | Lucene 6.2.1 |
Elasticsearch 2.4.1 | Lucene 5.5.2 |
Elasticsearch 5.0.0 RC1 | Lucene 6.2.0 |
(1) Solr 6.2.1
Solr 6.2.1 のインデックスから確認してみます。
準備
インデックスを作成してドキュメントを登録しておきます。
1. Solr 起動とインデックスの作成
> solr start ・・・ > solr create -c sample { "responseHeader":{ "status":0, "QTime":9197}, "core":"sample"}
2. スキーマの登録
schema.json
{ "add-field": { "name": "title", "type": "string" }, "add-field": { "name": "num", "type": "int" }, "add-field": { "name": "rdate", "type": "date" } }
スキーマ登録
$ curl -s http://localhost:8983/solr/sample/schema --data-binary @schema.json { "responseHeader":{ "status":0, "QTime":554}}
3. ドキュメントの登録
data1.json
{ "title": "item1", "num": 11, "rdate": "2016-10-20T13:45:00Z" }
ドキュメント登録
$ curl -s http://localhost:8983/solr/sample/update/json/docs --data-binary @data1.json {"responseHeader":{"status":0,"QTime":199}}
ちなみに、コミットしなくてもインデックスファイルには反映されるようです。
インデックスの内容確認
それでは、インデックスの内容を確認します。
該当するインデックスのディレクトリ(例. C:\solr-6.2.1\server\solr\sample\data\index
)を引数に指定して実行します。
(a) ドキュメントの内容
> groovy dump_docs.groovy C:\solr-6.2.1\server\solr\sample\data\index numDocs = 1 ---------- doc: 0 ---------- <field> name=title, value=item1, class=class org.apache.lucene.document.StoredField <field> name=num, value=11, class=class org.apache.lucene.document.StoredField <field> name=rdate, value=1476971100000, class=class org.apache.lucene.document.StoredField <field> name=id, value=2b1080dd-0cd3-43c6-a3ff-ab618ad00113, class=class org.apache.lucene.document.StoredField
(b) Term の内容
> groovy dump_terms.groovy C:\solr-6.2.1\server\solr\sample\data\index ---------- leaf: _0(6.2.1):C1 ---------- <fieldInfo> name: title, valueType: SORTED, indexOptions: DOCS <fieldInfo> name: _text_, valueType: NONE, indexOptions: DOCS_AND_FREQS_AND_POSITIONS <fieldInfo> name: num, valueType: NUMERIC, indexOptions: DOCS <fieldInfo> name: rdate, valueType: NUMERIC, indexOptions: DOCS <fieldInfo> name: id, valueType: SORTED, indexOptions: DOCS <fieldInfo> name: _version_, valueType: NUMERIC, indexOptions: DOCS ===== <term> name=_text_ ===== term=00, freq=1 term=0cd3, freq=1 term=11, freq=1 term=13, freq=1 term=1548710288432300032, freq=1 term=20, freq=1 term=2016, freq=1 term=2b1080dd, freq=1 term=43c6, freq=1 term=45, freq=1 term=a3ff, freq=1 term=ab618ad00113, freq=1 term=item1, freq=1 term=oct, freq=1 term=thu, freq=1 term=utc, freq=1 ===== <term> name=_version_ ===== term= ?yTP , freq=1 ===== <term> name=id ===== term=2b1080dd-0cd3-43c6-a3ff-ab618ad00113, freq=1 ===== <term> name=num ===== term= , freq=1 ===== <term> name=rdate ===== term= *~Yn`, freq=1 ===== <term> name=title ===== term=item1, freq=1
version・num・rdate の値が文字化けしているように見えますが、これは org.apache.lucene.util.LegacyNumericUtils.intToPrefixCoded()
メソッド等で処理されてバイナリデータとなっているためです。
実際の値を復元するには LegacyNumericUtils.prefixCodedToInt()
等を適用する必要があるようです。
なお、LegacyNumericUtils クラスは Lucene 6.2.1 API で deprecated となっていますが、Solr は未だ使っているようです。
(2) Elasticsearch 5.0.0 RC1
次は Elasticsearch です。
準備
インデックスを作成してドキュメントを登録しておきます。
1. Elasticsearch 起動
> elasticsearch ・・・
2. インデックスの作成とスキーマ登録
schema.json
{ "mappings": { "data": { "properties": { "title": { "type": "string", "index": "not_analyzed" }, "num": { "type": "integer" }, "rdate": { "type": "date" } } } } }
インデックス作成とスキーマ登録
$ curl -s -XPUT http://localhost:9200/sample --data-binary @schema.json {"acknowledged":true,"shards_acknowledged":true}
3. ドキュメントの登録
data1.json
{ "title": "item1", "num": 11, "rdate": "2016-10-20T13:45:00Z" }
ドキュメント登録
$ curl -s http://localhost:9200/sample/data --data-binary @data1.json {"_index":"sample","_type":"data","_id":"AVfwTjEQFnFWQdd5V9p5","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"created":true}
なお、すぐにインデックスファイルへ反映されない場合は flush を実施します。
flush 例
$ curl -s http://localhost:9200/sample/_flush
インデックスの内容確認
インデックスの内容を確認します。
Elasticsearch の場合はデフォルトで複数の shard に分かれているため、ドキュメントを登録した shard を確認しておきます。
shard の確認
$ curl -s http://localhost:9200/_cat/shards/sample?v index shard prirep state docs store ip node sample 1 p STARTED 0 130b 127.0.0.1 iUp_FE_ ・・・ sample 4 p STARTED 1 3.8kb 127.0.0.1 iUp_FE_ sample 4 r UNASSIGNED sample 0 p STARTED 0 130b 127.0.0.1 iUp_FE_ sample 0 r UNASSIGNED
shard 4 にドキュメントが登録されています。
Elasticsearch のインデックスディレクトリは data/nodes/<ノード番号>/indices/<インデックスのuuid>/<shard番号>/index
となっているようで、今回は data\nodes\0\indices\QBXMjcCFSWy26Gow1Y9ItQ\4\index
でした。(インデックスの uuid は QBXMjcCFSWy26Gow1Y9ItQ)
(a) ドキュメントの内容
> groovy dump_docs.groovy C:\elasticsearch-5.0.0-rc1\data\nodes\0\indices\QBXMjcCFSWy26Gow1Y9ItQ\4\index numDocs = 1 ---------- doc: 0 ---------- <field> name=_source, value={ "title": "item1", "num": 11, "rdate": "2016-10-20T13:45:00Z" } , class=class org.apache.lucene.document.StoredField <field> name=_uid, value=data#AVfwTjEQFnFWQdd5V9p5, class=class org.apache.lucene.document.StoredField
(b) Term の内容
> groovy dump_terms.groovy C:\elasticsearch-5.0.0-rc1\data\nodes\0\indices\QBXMjcCFSWy26Gow1Y9ItQ\4\index ---------- leaf: _0(6.2.0):c1 ---------- <fieldInfo> name: _source, valueType: NONE, indexOptions: NONE <fieldInfo> name: _type, valueType: SORTED_SET, indexOptions: DOCS <fieldInfo> name: _uid, valueType: NONE, indexOptions: DOCS <fieldInfo> name: _version, valueType: NUMERIC, indexOptions: NONE <fieldInfo> name: title, valueType: SORTED_SET, indexOptions: DOCS <fieldInfo> name: num, valueType: SORTED_NUMERIC, indexOptions: NONE <fieldInfo> name: rdate, valueType: SORTED_NUMERIC, indexOptions: NONE <fieldInfo> name: _all, valueType: NONE, indexOptions: DOCS_AND_FREQS_AND_POSITIONS <fieldInfo> name: _field_names, valueType: NONE, indexOptions: DOCS ===== <term> name=_all ===== term=00z, freq=1 term=10, freq=1 term=11, freq=1 term=2016, freq=1 term=20t13, freq=1 term=45, freq=1 term=item1, freq=1 ===== <term> name=_field_names ===== term=_all, freq=1 term=_source, freq=1 term=_type, freq=1 term=_uid, freq=1 term=_version, freq=1 term=num, freq=1 term=rdate, freq=1 term=title, freq=1 ===== <term> name=_type ===== term=data, freq=1 ===== <term> name=_uid ===== term=data#AVfwTjEQFnFWQdd5V9p5, freq=1 ===== <term> name=title ===== term=item1, freq=1
Solr とは、かなり違った結果になっています。
Deeplearning4J で MNIST を分類
「Deeplearning4J で iris を分類」 に続いて、畳み込みニューラルネットを使った MNIST の分類を試します。
Deeplearning4J のバージョンが上がって、@Grab を使った Groovy 上での実行が上手くいかなかったので、今回は Kotlin で実装し Gradle で実行します。
- Gradle 3.1
- Kotlin 1.0.4
ソースは http://github.com/fits/try_samples/tree/master/blog/20161010/
はじめに
今回は、Gradle のマルチプロジェクトを使って以下の処理をサブプロジェクトとしました。
Gradle のビルド定義は以下の通り。
build.gradle
def slf4jVer = '1.7.21' buildscript { repositories { jcenter() } dependencies { classpath 'org.jetbrains.kotlin:kotlin-gradle-plugin:1.0.4' } } // サブプロジェクトの共通設定 subprojects { apply plugin: 'kotlin' apply plugin: 'application' repositories { jcenter() } dependencies { compile 'org.jetbrains.kotlin:kotlin-stdlib:1.0.4' compile('org.deeplearning4j:deeplearning4j-core:0.6.0') { // エラーの回避策 (Could not find javacpp-presets-${os.name}-${os.arch}.jar) exclude group: 'org.bytedeco', module: 'javacpp-presets' } runtime 'org.nd4j:nd4j-native-platform:0.6.0' } run { // 実行時引数 if (project.hasProperty('args')) { args project.args.split(' ') } } } // (1) 畳み込みニューラルネットのモデル作成(JSON で出力) project(':conv_model') { mainClassName = 'ConvModelKt' dependencies { runtime "org.slf4j:slf4j-nop:${slf4jVer}" } } // (2) 学習処理 project(':learn_mnist') { mainClassName = 'LearnMnistKt' dependencies { runtime "org.slf4j:slf4j-simple:${slf4jVer}" } } // (3) 評価処理 project(':eval_mnist') { mainClassName = 'EvalMnistKt' dependencies { runtime "org.slf4j:slf4j-nop:${slf4jVer}" } }
settings.gradle
include 'conv_model', 'learn_mnist', 'eval_mnist'
ファイル構成は以下の通りです。
ファイル構成
- build.gradle
- settings.gradle
- conv_model/src/main/kotlin/convModel.kt
- learn_mnist/src/main/kotlin/learnMnist.kt
- eval_mnist/src/main/kotlin/evalMnist.kt
(1) 畳み込みニューラルネットのモデル作成(JSON で出力)
畳み込みニューラルネットの構成情報を JSON 化して標準出力します。
MnistDataSetIterator
の MNIST データセットに対して畳み込みニューラルネットを行うには InputType.convolutionalFlat(28, 28, 1)
を setInputType
します。
conv_model/src/main/kotlin/convModel.kt
import org.deeplearning4j.nn.conf.NeuralNetConfiguration import org.deeplearning4j.nn.conf.inputs.InputType import org.deeplearning4j.nn.conf.layers.ConvolutionLayer import org.deeplearning4j.nn.conf.layers.OutputLayer import org.deeplearning4j.nn.conf.layers.SubsamplingLayer import org.nd4j.linalg.lossfunctions.LossFunctions fun main(args: Array<String>) { val builder = NeuralNetConfiguration.Builder() .iterations(3) // 3回繰り返し .list( // 畳み込み層 ConvolutionLayer.Builder(5, 5) .nIn(1) .nOut(8) .padding(2, 2) .activation("relu") .build() , // プーリング層(最大プーリング) SubsamplingLayer.Builder( SubsamplingLayer.PoolingType.MAX, intArrayOf(2, 2)) .stride(2, 2) .build() , // 畳み込み層 ConvolutionLayer.Builder(5, 5) .nOut(16) .padding(1, 1) .activation("relu") .build() , // プーリング層(最大プーリング) SubsamplingLayer.Builder( SubsamplingLayer.PoolingType.MAX, intArrayOf(3, 3)) .stride(3, 3) .build() , OutputLayer.Builder(LossFunctions.LossFunction.MCXENT) .nOut(10) .activation("softmax") .build() ) // MNIST データセットに対する畳み込みニューラルネット用の入力タイプ設定 .setInputType(InputType.convolutionalFlat(28, 28, 1)) // JSON 化して出力 println(builder.build().toJson()) }
(2) 学習処理
(1) で出力した JSON ファイルを使って学習を実施し、学習後のパラメータ(重み)を JSON ファイルへ出力します。
MNIST データセットには MnistDataSetIterator
を使いました ※。
※ train 引数(今回使用したコンストラクタの第2引数)が true の場合に学習用、 false の場合に評価用のデータセットとなります
learn_mnist/src/main/kotlin/learnMnist.kt
import org.deeplearning4j.datasets.iterator.impl.MnistDataSetIterator import org.deeplearning4j.nn.conf.MultiLayerConfiguration import org.deeplearning4j.nn.multilayer.MultiLayerNetwork import org.deeplearning4j.optimize.listeners.ScoreIterationListener import org.nd4j.linalg.api.ndarray.NdArrayJSONWriter import java.io.File fun main(args: Array<String>) { // ニューラルネットのモデルファイル(JSON) val confJson = File(args[0]).readText() // パラメータ(重み)の出力ファイル名 val destFile = args[1] val conf = MultiLayerConfiguration.fromJson(confJson) val network = MultiLayerNetwork(conf) network.init() // スコア(誤差)の出力 network.setListeners(ScoreIterationListener()) // MNIST 学習用データ(バッチサイズ 100) val trainData = MnistDataSetIterator(100, true, 0) // 学習 network.fit(trainData) // 学習後のパラメータ(重み)を JSON ファイルへ出力 NdArrayJSONWriter.write(network.params(), destFile) }
(3) 評価処理
(1) と (2) の結果を使って評価します。
eval_mnist/src/main/kotlin/evalMnist.kt
import org.deeplearning4j.datasets.iterator.impl.MnistDataSetIterator import org.deeplearning4j.nn.conf.MultiLayerConfiguration import org.deeplearning4j.nn.multilayer.MultiLayerNetwork import org.nd4j.linalg.api.ndarray.NdArrayJSONReader import java.io.File fun main(args: Array<String>) { // ニューラルネットのモデルファイル(JSON) val confJson = File(args[0]).readText() // パラメータ(重み)のロード val params = NdArrayJSONReader().read(File(args[1])) val conf = MultiLayerConfiguration.fromJson(confJson) val network = MultiLayerNetwork(conf, params) // MNIST 評価用データ val testData = MnistDataSetIterator(1, false, 0) // 評価 val res = network.evaluate(testData) println(res.stats()) }
実行
まず、畳み込みニューラルネットの構成を JSON ファイルへ出力します。
(1) 畳み込みニューラルネットのモデル作成
> gradle -q :conv_model:run > conv_model.json
次に、学習を実施します。
org.reflections.Reflections - could not create Vfs.Dir from url
という警告ログが出力されましたが、特に支障は無さそうです。
(2) 学習
> gradle -q :learn_mnist:run -Pargs="../conv_model.json ../conv_params.json" ・・・ [main] INFO org.deeplearning4j.optimize.listeners.ScoreIterationListener - Score at iteration 1790 is 0.07888245582580566 [main] INFO org.deeplearning4j.nn.multilayer.MultiLayerNetwork - Finetune phase [main] INFO org.deeplearning4j.nn.multilayer.MultiLayerNetwork - Finetune phase [main] INFO org.deeplearning4j.nn.multilayer.MultiLayerNetwork - Finetune phase
最後に、評価を実施します。
(3) 評価
> gradle -q :eval_mnist:run -Pargs="../conv_model.json ../conv_params.json" Examples labeled as 0 classified by model as 0: 944 times Examples labeled as 0 classified by model as 2: 7 times Examples labeled as 0 classified by model as 5: 4 times ・・・ Examples labeled as 9 classified by model as 7: 24 times Examples labeled as 9 classified by model as 8: 22 times Examples labeled as 9 classified by model as 9: 937 times ==========================Scores======================================== Accuracy: 0.9432 Precision: 0.9463 Recall: 0.9427 F1 Score: 0.9445 ========================================================================