Axon Framework でイベントソーシング
Axon Fraework のイベントソーシング機能を軽く試してみました。
今回のソースは http://github.com/fits/try_samples/tree/master/blog/20160815/
はじめに
今回は以下のような Gradle 用ビルド定義を使います。
lombok は必須ではありませんが、便利なので使っています。
compileOnly
でコンパイル時にのみ使用するモジュールを指定できます。
build.gradle
apply plugin: 'application' repositories { jcenter() } dependencies { compileOnly 'org.projectlombok:lombok:1.16.10' compile 'org.axonframework:axon-core:3.0-M3' runtime 'org.slf4j:slf4j-simple:1.7.21' } mainClassName = 'SampleApp'
Axon Framework によるイベントソーシング
DDD や CQRS におけるイベントソーシングでは、永続化したイベント群を順次適用して集約の状態を復元します。
Axon Framework では以下のように処理すれば、イベントソーシングを実現できるようです。
- (a) CommandHandler でコマンドからイベントを生成し適用
- (b) EventSourcingHandler でイベントの内容からモデルの状態を変更
なお、これらのハンドラはアノテーションで指定できるようになっています。
コマンドの作成
まずは、在庫作成のコマンドを実装します。
処理対象の識別子を設定するフィールドに @TargetAggregateIdentifier
を付けますが、CreateInventoryItem には特に付けなくても問題無さそうでした。(次の CheckInItemsToInventory では必須)
なお、lombok.Value を使っているため、コンストラクタや getter メソッドが自動的に生成されます。
src/main/java/sample/commands/CreateInventoryItem.java
package sample.commands; import org.axonframework.commandhandling.TargetAggregateIdentifier; import lombok.Value; // 在庫作成コマンド @Value public class CreateInventoryItem { @TargetAggregateIdentifier // このアノテーションは必須では無さそう private String id; private String name; }
次に、在庫数を加えるためのコマンドです。
src/main/java/sample/commands/CheckInItemsToInventory.java
package sample.commands; import org.axonframework.commandhandling.TargetAggregateIdentifier; import lombok.Value; // 在庫数追加コマンド @Value public class CheckInItemsToInventory { @TargetAggregateIdentifier // このアノテーションは必須 private String id; private int count; }
イベントの作成
次は、コマンドによって生じるイベントを作成します。
今回は、CreateInventoryItem コマンドから 2つのイベント (InventoryItemCreated と InventoryItemRenamed) が生じるような仕様で考えてみました。
src/main/java/sample/events/InventoryItemCreated.java
package sample.events; import lombok.Value; // 在庫作成イベント @Value public class InventoryItemCreated { private String id; }
src/main/java/sample/events/InventoryItemRenamed.java
package sample.events; import lombok.Value; // 名前変更イベント @Value public class InventoryItemRenamed { private String newName; }
src/main/java/sample/events/ItemsCheckedInToInventory.java
package sample.events; import lombok.Value; // 在庫数追加イベント @Value public class ItemsCheckedInToInventory { private int count; }
エンティティの作成
在庫エンティティを作成します。
一意の識別子を設定するフィールドへ @AggregateIdentifier
を付与します。
コマンドを処理するコンストラクタ ※ やメソッドへ @CommandHandler
を付与し、その処理内でイベントを作成して AggregateLifecycle.apply
メソッドへ渡せば、@EventSourcingHandler
を付与した該当メソッドが呼び出されます。
イベントの内容に合わせてエンティティの内部状態を更新する事で、イベントソーシングを実現できます。
※ 新規作成のコマンドを処理する場合に、コンストラクタを使います
なお、引数なしのデフォルトコンストラクタは必須なようです。
src/main/java/sample/models/InventoryItem.java
package sample.models; import org.axonframework.commandhandling.CommandHandler; import org.axonframework.commandhandling.model.AggregateIdentifier; import org.axonframework.commandhandling.model.AggregateLifecycle; import org.axonframework.commandhandling.model.ApplyMore; import org.axonframework.eventsourcing.EventSourcingHandler; import lombok.Getter; import lombok.ToString; import sample.commands.CreateInventoryItem; import sample.commands.CheckInItemsToInventory; import sample.events.InventoryItemCreated; import sample.events.InventoryItemRenamed; import sample.events.ItemsCheckedInToInventory; @ToString public class InventoryItem { @AggregateIdentifier @Getter private String id; @Getter private String name; @Getter private int count; // デフォルトコンストラクタは必須 public InventoryItem() { } // 在庫作成コマンド処理 @CommandHandler public InventoryItem(CreateInventoryItem cmd) { System.out.println("C call new: " + cmd); // 在庫作成イベントの作成と適用 AggregateLifecycle.apply(new InventoryItemCreated(cmd.getId())); // 名前変更イベントの作成と適用 AggregateLifecycle.apply(new InventoryItemRenamed(cmd.getName())); } // 在庫数追加コマンド処理 @CommandHandler private ApplyMore updateCount(CheckInItemsToInventory cmd) { System.out.println("C call updateCount: " + cmd); // 在庫数追加イベントの作成と適用 return AggregateLifecycle.apply(new ItemsCheckedInToInventory(cmd.getCount())); } // 在庫作成イベントの適用処理 @EventSourcingHandler private void applyCreated(InventoryItemCreated event) { System.out.println("E call applyCreated: " + event); this.id = event.getId(); } // 名前変更イベントの適用処理 @EventSourcingHandler private void applyRenamed(InventoryItemRenamed event) { System.out.println("E call applyRenamed: " + event); this.name = event.getNewName(); } // 在庫数追加イベントの適用処理 @EventSourcingHandler private void applyCheckedIn(ItemsCheckedInToInventory event) { System.out.println("E call applyCheckedIn: " + event); this.count += event.getCount(); } }
実行クラスの作成
動作確認のための実行クラスを作成します。
CommandGateway へコマンドを send すれば処理が流れるように CommandBus・Repository・EventStore 等を組み合わせます。
イベントソーシングには EventSourcingRepository
を使用します。
アノテーションを使ったコマンドハンドラを適用するには AggregateAnnotationCommandHandler
を使用します。
また、今回はインメモリでイベントを保持する InMemoryEventStorageEngine
を使っています。
src/main/java/SampleApp.java
import org.axonframework.commandhandling.AggregateAnnotationCommandHandler; import org.axonframework.commandhandling.SimpleCommandBus; import org.axonframework.commandhandling.gateway.DefaultCommandGateway; import org.axonframework.eventsourcing.EventSourcedAggregate; import org.axonframework.eventsourcing.EventSourcingRepository; import org.axonframework.eventsourcing.eventstore.EmbeddedEventStore; import org.axonframework.eventsourcing.eventstore.inmemory.InMemoryEventStorageEngine; import lombok.val; import sample.commands.CreateInventoryItem; import sample.commands.CheckInItemsToInventory; import sample.models.InventoryItem; public class SampleApp { public static void main(String... args) { val cmdBus = new SimpleCommandBus(); val gateway = new DefaultCommandGateway(cmdBus); val es = new EmbeddedEventStore(new InMemoryEventStorageEngine()); // イベントソーシング用の Repository val repository = new EventSourcingRepository<>(InventoryItem.class, es); // アノテーションによるコマンドハンドラを適用 new AggregateAnnotationCommandHandler<>(InventoryItem.class, repository).subscribe(cmdBus); String r1 = gateway.sendAndWait(new CreateInventoryItem("s1", "sample1")); System.out.println("id: " + r1); System.out.println("----------"); EventSourcedAggregate<InventoryItem> r2 = gateway.sendAndWait(new CheckInItemsToInventory("s1", 5)); printAggregate(r2); System.out.println("----------"); EventSourcedAggregate<InventoryItem> r3 = gateway.sendAndWait(new CheckInItemsToInventory("s1", 3)); printAggregate(r3); } private static void printAggregate(EventSourcedAggregate<InventoryItem> esag) { System.out.println(esag.getAggregateRoot()); } }
なお、CreateInventoryItem を send した後に、同じ ID (今回は "s1")を使って再度 CreateInventoryItem を send してみたところ、特に重複チェックなどが実施されるわけではなく、普通にイベント (InventoryItemCreated と InventoryItemRenamed) が追加されました。
実行
実行結果は以下の通りです。
コマンドを送信する度に、これまでのイベントを適用してエンティティの状態を復元した後、新しいコマンドを処理している動作を確認できました。
> gradle run ・・・ :run C call new: CreateInventoryItem(id=s1, name=sample1) E call applyCreated: InventoryItemCreated(id=s1) E call applyRenamed: InventoryItemRenamed(newName=sample1) id: s1 ---------- E call applyCreated: InventoryItemCreated(id=s1) E call applyRenamed: InventoryItemRenamed(newName=sample1) C call updateCount: CheckInItemsToInventory(id=s1, count=5) E call applyCheckedIn: ItemsCheckedInToInventory(count=5) InventoryItem(id=s1, name=sample1, count=5) ---------- E call applyCreated: InventoryItemCreated(id=s1) E call applyRenamed: InventoryItemRenamed(newName=sample1) E call applyCheckedIn: ItemsCheckedInToInventory(count=5) C call updateCount: CheckInItemsToInventory(id=s1, count=3) E call applyCheckedIn: ItemsCheckedInToInventory(count=3) InventoryItem(id=s1, name=sample1, count=8)
備考 - スナップショット
イベント数が多くなると、イベントを毎回初めから適用して状態を復元するのはパフォーマンス的に厳しくなるため、スナップショットを使います。
Axon Framework にもスナップショットの機能が用意されており、EventCountSnapshotterTrigger
等を EventSourcingRepository へ設定すれば使えるようです。
ただし、今回のサンプル (SampleApp.java) では EventCountSnapshotterTrigger を機能させられませんでした。
というのも、EventCountSnapshotterTrigger では decorateForAppend
メソッドの実行時にスナップショット化を行うようですが ※、今回のサンプルでは decorateForAppend は一度も実行せず decorateForRead
メソッドのみが実行されるようでした。
※ ソースをざっと見た限りでは、カウンターのカウントアップ等も decorateForAppend の中でのみ実施しているようだった
Groovy の @Grab で Spark Framework を実行
Spark Framework - A tiny Java web framework を Groovy の @Grab
を使って実行してみました。
- Spark Framework 2.5
- Groovy 2.4.7
今回のソースは http://github.com/fits/try_samples/tree/master/blog/20160801/
はじめに
以下のように @Grab を使った Spark の Groovy スクリプトを groovy コマンドで実行し、Web クライアントでアクセスしてみると、java.lang.NoSuchMethodError: javax.servlet.http.HttpServletResponse.getHeaders
エラーが発生してしまいました。
now.groovy
@Grab('com.sparkjava:spark-core:2.5') @Grab('org.slf4j:slf4j-simple:1.7.21') import static spark.Spark.* get('/now') { req, res -> new Date().format('yyyy/MM/dd HH:mm:ss') }
groovy コマンドで上記スクリプトを実行。
実行例
> groovy now.groovy ・・・ [Thread-1] INFO org.eclipse.jetty.server.Server - Started @3213ms
/now
へアクセスすると、以下のようなエラーが発生。
エラー例(クライアント側)
$ curl http://localhost:4567/now <html> <head> <meta http-equiv="Content-Type" content="text/html;charset=ISO-8859-1"/> <title>Error 500 </title> </head> <body> <h2>HTTP ERROR: 500</h2> <p>Problem accessing /now. Reason: <pre> java.lang.NoSuchMethodError: javax.servlet.http.HttpServletResponse.getHeaders(Ljava/lang/String;)Ljava/util/Collection;</pre></p> <hr /><a href="http://eclipse.org/jetty">Powered by Jetty:// 9.3.6.v20151106</a><hr/> </body> </html>
エラー例(サーバー側)
> groovy now.groovy ・・・ java.lang.NoSuchMethodError: javax.servlet.http.HttpServletResponse.getHeaders(Ljava/lang/String;)Ljava/util/Collection; at spark.utils.GzipUtils.checkAndWrap(GzipUtils.java:67) at spark.http.matching.Body.serializeTo(Body.java:69) at spark.http.matching.MatcherFilter.doFilter(MatcherFilter.java:158) at spark.embeddedserver.jetty.JettyHandler.doHandle(JettyHandler.java:50) at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:189) ・・・
このエラーは、groovy コマンドの実行時に $GROOVY_HOME/lib/servlet-api-2.4.jar (Servlet 2.4) を先にロードする事が原因で発生しているようです。
HttpServletResponse.getHeaders
は Servlet 3.0 から追加されたメソッドのため、Servlet 2.4 の API が適用されていると該当メソッドが存在せず NoSuchMethodError
になります。
回避策
エラー原因は、groovy コマンドが $GROOVY_HOME/lib/servlet-api-2.4.jar をロードする事なので、Gradle 等で実行すれば上記のようなエラーは発生しません。
しかし、今回は groovy コマンドで実行する場合の回避策をいくつか検討してみました。
- (a) -cp オプションを使用
- (b) Groovy 設定ファイル (groovy-starter.conf) を編集
- (c) $GROOVY_HOME/lib/servlet-api-2.4.jar を削除
(a) と (b) は Servlet 2.4 より先に Servlet 3.1 を適用させる方法で、(c) は servlet-api-2.4.jar をロードさせない方法です。
(a) -cp オプションを使用
Servlet 3.1 の JAR (下記では javax.servlet-api-3.1.0.jar)を入手し、groovy コマンドの -cp オプションでその JAR を指定して実行します。
こうする事でエラーは出なくなりました。
実行例(サーバー)
> groovy -cp lib_a/javax.servlet-api-3.1.0.jar now.groovy ・・・
実行例(クライアント)
$ curl http://localhost:4567/now 2016/07/31 20:46:42
(b) Groovy 設定ファイル (groovy-starter.conf) を編集
$GROOVY_HOME/lib/servlet-api-2.4.jar は groovy-starter.conf の設定 (load !{groovy.home}/lib/*.jar
) によりロードされています。
つまり、$GROOVY_HOME/lib/*.jar よりも先に、別のディレクトリ内の JAR をロードするように groovy-starter.conf を書き換え、そのディレクトリへ Servlet 3.1 の JAR を配置すれば、(a) と同様に回避できるはずです。
groovy-starter.conf 変更例
# 以下を追加 load lib_a/*.jar # load required libraries load !{groovy.home}/lib/*.jar ・・・
lib_a/javax.servlet-api-3.1.0.jar を配置して実行すると、エラーは出なくなりました。
実行例(サーバー)
> groovy now.groovy ・・・
実行例(クライアント)
$ curl http://localhost:4567/now 2016/07/31 20:48:05
備考. Groovy 設定ファイル (groovy-starter.conf) の指定方法
groovy-starter.conf を直接書き換えるのはイマイチなので、任意の Groovy 設定ファイルを使いたいところです。
startGroovy
スクリプトの内容を見ると GROOVY_CONF
環境変数で指定できそうです。
ただし、startGroovy.bat
の方は今のところ GROOVY_CONF
環境変数を考慮しておらず、Windows 環境 (groovy.bat を使う場合) では使えません。
そこで今回は、下記 postinit.bat を用意し、Windows 環境で GROOVY_CONF
に対応してみました。
%USERPROFILE%/.groovy/postinit.bat の例
if not "%GROOVY_CONF%" == "" ( set GROOVY_OPTS=%GROOVY_OPTS% -Dgroovy.starter.conf="%GROOVY_CONF%" set STARTER_CONF=%GROOVY_CONF% )
上記を配置した後、以下のように実行します。
GROOVY_CONF 環境変数の利用例 (Windows)
> set GROOVY_CONF=groovy-starter_custom.conf > groovy now.groovy ・・・
(c) $GROOVY_HOME/lib/servlet-api-2.4.jar を削除
$GROOVY_HOME/lib/servlet-api-2.4.jar を一時的に削除(拡張子を変える等)して実行します。
備考. Gradle で実行する場合
Gradle で実行する場合は、src/main/groovy/now.groovy を配置して(@Grab の箇所は削除しておく)、以下のような build.gradle を使います。
build.gradle 例
apply plugin: 'groovy' apply plugin: 'application' repositories { jcenter() } dependencies { compile 'com.sparkjava:spark-core:2.5' compile 'org.codehaus.groovy:groovy:2.4.7' runtime 'org.slf4j:slf4j-simple:1.7.21' } mainClassName = 'now'
実行例
> gradle run ・・・ :run ・・・ [Thread-1] INFO org.eclipse.jetty.server.Server - Started @1035ms
node-ffi で OpenCL を使う2 - 演算の実行
「node-ffi で OpenCL を使う」 に続き、Node.js を使って OpenCL の演算を実施してみます。
サンプルソースは http://github.com/fits/try_samples/tree/master/blog/20160725/
はじめに
演算の実行には ref-array
モジュールを使った方が便利なため、node-ffi をインストールした環境へ追加でインストールしておきます。
ref-array インストール例
> npm install ref-array
OpenCL の演算実行サンプル
今回は配列の要素を 3乗する OpenCL のコード(以下)を Node.js から実行する事にします。
cube.cl
__kernel void cube( __global float* input, __global float* output, const unsigned int count) { int i = get_global_id(0); if (i < count) { output[i] = input[i] * input[i] * input[i]; } }
サンプルコード概要
上記 cube.cl を実行する Node.js サンプルコードの全体像です。(OpenCL の API は try-finally 内で呼び出しています)
OpenCL 演算の入力値として data
変数の値を使用します。OpenCL のコードはファイルから読み込んで code
変数へ設定しています。
OpenCL API の clCreateXXX
で作成したリソースは clReleaseXXX
で解放するようなので、解放処理を都度 releaseList
へ追加しておき、finally で実行するようにしています。
なお、OpenCL API のエラーコード取得には以下の 2通りがあります。(使用する関数による)
- 関数の戻り値でエラーコードを取得
- 関数の引数(ポインタ)でエラーコードを取得
calc.js (全体)
'use strict'; const fs = require('fs'); const ref = require('ref'); const ArrayType = require('ref-array'); const ffi = require('ffi'); const CL_DEVICE_TYPE_DEFAULT = 1; const CL_MEM_READ_WRITE = (1 << 0); const CL_MEM_WRITE_ONLY = (1 << 1); const CL_MEM_READ_ONLY = (1 << 2); const CL_MEM_USE_HOST_PTR = (1 << 3); const CL_MEM_ALLOC_HOST_PTR = (1 << 4); const CL_MEM_COPY_HOST_PTR = (1 << 5); const intPtr = ref.refType(ref.types.int32); const uintPtr = ref.refType(ref.types.uint32); const sizeTPtr = ref.refType('size_t'); const StringArray = ArrayType('string'); const clLib = (process.platform == 'win32') ? 'OpenCL' : 'libOpenCL'; // 使用する OpenCL の関数定義 const openCl = ffi.Library(clLib, { 'clGetPlatformIDs': ['int', ['uint', sizeTPtr, uintPtr]], 'clGetDeviceIDs': ['int', ['size_t', 'ulong', 'uint', sizeTPtr, uintPtr]], 'clCreateContext': ['pointer', ['pointer', 'uint', sizeTPtr, 'pointer', 'pointer', intPtr]], 'clReleaseContext': ['int', ['pointer']], 'clCreateProgramWithSource': ['pointer', ['pointer', 'uint', StringArray, sizeTPtr, intPtr]], 'clBuildProgram': ['int', ['pointer', 'uint', sizeTPtr, 'string', 'pointer', 'pointer']], 'clReleaseProgram': ['int', ['pointer']], 'clCreateKernel': ['pointer', ['pointer', 'string', intPtr]], 'clReleaseKernel': ['int', ['pointer']], 'clCreateBuffer': ['pointer', ['pointer', 'ulong', 'size_t', 'pointer', intPtr]], 'clReleaseMemObject': ['int', ['pointer']], 'clSetKernelArg': ['int', ['pointer', 'uint', 'size_t', 'pointer']], 'clCreateCommandQueue': ['pointer', ['pointer', 'size_t', 'ulong', intPtr]], 'clReleaseCommandQueue': ['int', ['pointer']], 'clEnqueueReadBuffer': ['int', ['pointer', 'pointer', 'bool', 'size_t', 'size_t', 'pointer', 'uint', 'pointer', 'pointer']], 'clEnqueueNDRangeKernel': ['int', ['pointer', 'pointer', 'uint', sizeTPtr, sizeTPtr, sizeTPtr, 'uint', 'pointer', 'pointer']] }); // エラーチェック const checkError = (err, title = '') => { if (err instanceof Buffer) { // ポインタの場合はエラーコードを取り出す err = intPtr.get(err); } if (err != 0) { throw new Error(`${title} Error: ${err}`); } }; // 演算対象データ const data = [1.1, 2.2, 3.3, 4.4, 5.5, 6.6, 7.7, 8.8, 9.9]; const functionName = process.argv[2] // OpenCL コードの読み込み(ファイルから) const code = fs.readFileSync(process.argv[3]); const releaseList = []; try { ・・・ OpenCL API の呼び出し処理 ・・・ } finally { // リソースの解放 releaseList.reverse().forEach( f => f() ); }
clCreateProgramWithSource
へ OpenCL のコードを渡す際に、ref-array
で作成した String の配列 StringArray
を使っています。
OpenCL 処理部分
番号 | 概要 | OpenCL 関数名 |
---|---|---|
(1) | プラットフォーム取得 | clGetPlatformIDs |
(2) | デバイス取得 | clGetDeviceIDs |
(3) | コンテキスト作成 | clCreateContext |
(4) | コマンドキュー作成 | clCreateCommandQueue |
(5) | プログラム作成 | clCreateProgramWithSource |
(6) | プログラムのビルド | clBuildProgram |
(7) | カーネル作成 | clCreateKernel |
(8) | 引数用のバッファ作成 | clCreateBuffer |
(9) | 引数の設定 | clSetKernelArg |
(10) | 処理の実行 | clEnqueueNDRangeKernel |
(11) | 結果の取得 | clEnqueueReadBuffer |
OpenCL のコードを実行するには (6) のように API を使ってビルドする必要があります。
Node.js と OpenCL 間で配列データ等をやりとりするには (8) で作ったバッファを使います。(入力値をバッファへ書き込んで、出力値をバッファから読み出す)
また、今回は clEnqueueNDRangeKernel
を使って実行しましたが、clEnqueueTask
を使って実行する方法もあります。
calc.js (OpenCL 処理部分)
・・・ try { const platformIdsPtr = ref.alloc(sizeTPtr); // (1) プラットフォーム取得 let res = openCl.clGetPlatformIDs(1, platformIdsPtr, null); checkError(res, 'clGetPlatformIDs'); const platformId = sizeTPtr.get(platformIdsPtr); const deviceIdsPtr = ref.alloc(sizeTPtr); // (2) デバイス取得 (デフォルトをとりあえず使用) res = openCl.clGetDeviceIDs(platformId, CL_DEVICE_TYPE_DEFAULT, 1, deviceIdsPtr, null); checkError(res, 'clGetDeviceIDs'); const deviceId = sizeTPtr.get(deviceIdsPtr); const errPtr = ref.alloc(intPtr); // (3) コンテキスト作成 const ctx = openCl.clCreateContext(null, 1, deviceIdsPtr, null, null, errPtr); checkError(errPtr, 'clCreateContext'); releaseList.push( () => openCl.clReleaseContext(ctx) ); // (4) コマンドキュー作成 const queue = openCl.clCreateCommandQueue(ctx, deviceId, 0, errPtr); checkError(errPtr, 'clCreateCommandQueue'); releaseList.push( () => openCl.clReleaseCommandQueue(queue) ); const codeArray = new StringArray([code.toString()]); // (5) プログラム作成 const program = openCl.clCreateProgramWithSource(ctx, 1, codeArray, null, errPtr); checkError(errPtr, 'clCreateProgramWithSource'); releaseList.push( () => openCl.clReleaseProgram(program) ); // (6) プログラムのビルド res = openCl.clBuildProgram(program, 1, deviceIdsPtr, null, null, null) checkError(res, 'clBuildProgram'); // (7) カーネル作成 const kernel = openCl.clCreateKernel(program, functionName, errPtr); checkError(errPtr, 'clCreateKernel'); releaseList.push( () => openCl.clReleaseKernel(kernel) ); const FixedFloatArray = ArrayType('float', data.length); // 入力データ const inputData = new FixedFloatArray(data); const bufSize = inputData.buffer.length; // (8) 引数用のバッファ作成(入力用)し inputData の内容を書き込む const inClBuf = openCl.clCreateBuffer(ctx, CL_MEM_READ_ONLY | CL_MEM_COPY_HOST_PTR, bufSize, inputData.buffer, errPtr); checkError(errPtr, 'clCreateBuffer In'); releaseList.push( () => openCl.clReleaseMemObject(inClBuf) ); // (8) 引数用のバッファ作成(出力用) const outClBuf = openCl.clCreateBuffer(ctx, CL_MEM_WRITE_ONLY, bufSize, null, errPtr); checkError(errPtr, 'clCreateBuffer Out'); releaseList.push( () => openCl.clReleaseMemObject(outClBuf) ); const inClBufRef = inClBuf.ref(); // (9) 引数の設定 res = openCl.clSetKernelArg(kernel, 0, inClBufRef.length, inClBufRef); checkError(res, 'clSetKernelArg 0'); const outClBufRef = outClBuf.ref(); // (9) 引数の設定 res = openCl.clSetKernelArg(kernel, 1, outClBufRef.length, outClBufRef); checkError(res, 'clSetKernelArg 1'); const ct = ref.alloc(ref.types.uint32, data.length); // (9) 引数の設定 res = openCl.clSetKernelArg(kernel, 2, ct.length, ct); checkError(res, 'clSetKernelArg 2'); const globalPtr = ref.alloc(sizeTPtr); sizeTPtr.set(globalPtr, 0, data.length); // (10) 処理の実行 res = openCl.clEnqueueNDRangeKernel(queue, kernel, 1, null, globalPtr, null, 0, null, null); checkError(res, 'clEnqueueNDRangeKernel'); const resData = new FixedFloatArray(); // (11) 結果の取得 (outClBuf の内容を resData へ) res = openCl.clEnqueueReadBuffer(queue, outClBuf, true, 0, resData.buffer.length, resData.buffer, 0, null, null); checkError(res, 'clEnqueueReadBuffer'); // 結果出力 for (let i = 0; i < resData.length; i++) { console.log(resData[i]); } } finally { // リソースの解放 releaseList.reverse().forEach( f => f() ); }
動作確認
今回は以下の Node.js を使って Windows と Linux の両方で動作確認します。
- Node.js v6.3.1
(a) Windows で実行
「node-ffi で OpenCL を使う」 で構築した環境へ ref-array
をインストールして実行しました。
実行結果 (Windows)
> node calc.js cube cube.cl 1.3310000896453857 10.648000717163086 35.93699645996094 85.18400573730469 166.375 287.4959716796875 456.532958984375 681.4720458984375 970.2988891601562
(b) Linux で実行
前回 の Docker イメージを使って実行します。
calc.js と cube.cl を /vagrant/work へ配置し、Docker コンテナからは /work でアクセスできるようにマッピングしました。
Docker コンテナ実行
$ docker run --rm -it -v /vagrant/work:/work sample/opencl:0.1 bash
Node.js と必要なモジュールのインストール
# curl -o- https://raw.githubusercontent.com/creationix/nvm/v0.31.2/install.sh | bash ・・・ # source ~/.bashrc # nvm install v6.3.1 ・・・ # npm install -g node-gyp ・・・ # cd /work # npm install ffi ref-array ・・・
/work 内で実行します。
実行結果 (Linux)
# node calc.js cube cube.cl 1.3310000896453857 10.648000717163086 35.93699645996094 85.18400573730469 166.375 287.4959716796875 456.532958984375 681.4720458984375 970.2988891601562
Docker で OpenCL の実行環境を構築 - pocl
前回の 「node-ffi で OpenCL を使う」 では Windows 上で Node.js から OpenCL の API を呼び出してみましたが、これを Linux 上で行うために OpenCL の実行環境を Docker で構築してみました。
OpenCL の実行環境として pocl を使い、Linux は Fedora を使います。(Fedora にしたのは pocl のインストールが簡単だったから)
サンプルのソースは http://github.com/fits/try_samples/tree/master/blog/20160711/
Docker イメージ作成
Docker の公式 fedora イメージを使って OpenCL の Docker イメージを作成します。
最近の Fedora ではパッケージ管理に DNF (yum の後継) を使うようなので、dnf コマンドを使った Dockerfile を作成しました。
Dockerfile
FROM fedora RUN dnf update -y RUN dnf install -y clinfo pocl pocl-devel RUN ln -s /lib64/libpoclu.so /lib64/libOpenCL.so RUN dnf install -y tar findutils make python2 RUN dnf clean all
OpenCL を実行するだけなら pocl のインストールだけで良さそうですが、ついでに clinfo と pocl-devel もインストールしています。 (clinfo で OpenCL の環境情報を参照できます)
なお、OpenCL の標準的なライブラリ名は libOpenCL.so
のようですが、pocl をインストールしても libOpenCL.so を作ってくれなかったので、libpoclu.so
のシンボリックリンクとして作成するようにしています。※
※ node-ffi で指定するライブラリ名を libpoclu とするなら、 libOpenCL.so は無くてもよい
また、tar と findutils は nvm
で、make と python2 は node-gyp
で必要となるため、ついでにインストールしています。
Docker イメージは docker build コマンドで作成します。
ビルド例 (Docker イメージ作成)
$ docker build --no-cache -t sample/opencl:0.1 .
OpenCL 動作確認
まずは、今回の環境で動作するように 前回 のサンプルを少し書き換えます。 ライブラリ名をプラットフォームに合わせて切り替えるようにします。
/vagrant/work/platform_info.js (変更点)
・・・ const clLib = (process.platform == 'win32') ? 'OpenCL' : 'libOpenCL'; const openCl = ffi.Library(clLib, { ・・・ }); ・・・
先程作成した Docker イメージを使ってコンテナを起動します。
Docker コンテナ実行
$ docker run --rm -it -v /vagrant/work:/work sample/opencl:0.1 bash
ここからは Docker コンテナ内での操作となります。
dnf (もしくは yum) だと古いバージョンの Node.js をインストールしてしまうので、nvm を使って Node.js v6.2.2 をインストールします。
Node.js のインストール後、node-gyp と node-ffi を npm でそれぞれインストールします。
Node.js と必要なモジュールのインストール
# curl -o- https://raw.githubusercontent.com/creationix/nvm/v0.31.2/install.sh | bash ・・・ # source ~/.bashrc # nvm install v6.2.2 ・・・ # npm install -g node-gyp ・・・ # cd /work # npm install ffi ・・・
最後に platform_info.js を実行すると問題なく動作しました。
platform_info.js 実行
# node platform_info.js FULL_PROFILE OpenCL 2.0 pocl 0.13, LLVM 3.8.0 Portable Computing Language
node-ffi で OpenCL を使う
Windows 環境で node-ffi (Node.js Foreign Function Interface) を使って OpenCL の API を呼び出してみました。
サンプルソースは http://github.com/fits/try_samples/tree/master/blog/20160627/
なお、OpenCL 上での演算は今回扱いませんが、単純な演算のサンプルは ここ に置いてます。
はじめに
node-ffi のインストール
まずは、node-gyp をインストールしておきます。 node-gyp を Windows 環境で使うには VC++ や Python 2.7 が必要です。
node-gyp インストール例
> npm install -g node-gyp
node-ffi をインストールします。(モジュール名は node-ffi ではなく ffi
です)
node-ffi インストール例
> npm install ffi
node-ffi の使い方
node-ffi では Library
関数を使ってネイティブライブラリの関数をマッピングします。
ffi.Library(<ライブラリ名>, { <関数名>: [<戻り値の型>, [<第1引数の型>, <第2引数の型>, ・・・]], ・・・ })
引数の型などはライブラリのヘッダーファイルなどを参考にして設定します。
例えば、OpenCL.dll (Windows 環境の場合) の clGetPlatformIDs 関数を Node.js から openCl.clGetPlatformIDs(・・・)
で呼び出すには以下のようにします。
Library の使用例
const openCl = ffi.Library('OpenCL', { 'clGetPlatformIDs': ['int', ['uint', sizeTPtr, uintPtr]], ・・・ });
ref モジュールの refType
でポインタ用の型を定義する事が可能です。
refType の使用例
const uintPtr = ref.refType(ref.types.uint32); const sizeTPtr = ref.refType('size_t');
OpenCL の利用
それでは、下記 OpenCL ランタイムをインストールした Windows 環境で、OpenCL の API を 3つほど呼び出してみます。
1. OpenCL のデバイスID取得
まずは、以下を実施してみます。
- (1)
clGetPlatformIDs
を使ってプラットフォームIDを取得 - (2)
clGetDeviceIDs
を使ってデバイスIDを取得
OpenCL (v1.2) のヘッダーファイルを見てみると、プラットフォームIDの型 cl_platform_id
やデバイスIDの型 cl_device_id
はこれ自体がポインタのようなので ※、これらに該当する型は size_t
としました。
※ そのため、プラットフォームID や デバイスID という表現は 適切ではないかもしれません
node-ffi ではポインタを扱うために Buffer
を使います。
そのための補助関数が ref モジュールに用意されており、下記サンプルでは以下を使っています。
- ref モジュールの
alloc
を使って指定した型に応じた Buffer を作成 - 定義した型の
get
を使って Buffer から値を取得
get
を使えば、型のサイズやエンディアンに応じた値を Buffer から取り出してくれます。 (例えば、int32 なら Buffer の readInt32LE や readInt32BE を使って値を取得する)
なお、エラーの有無は clGetPlatformIDs・clGetDeviceIDs の戻り値が 0 かどうかで判定します。(0: 成功、0以外: エラー)
get_device_id.js
'use strict'; const ffi = require('ffi'); const ref = require('ref'); // 定数の定義 const CL_DEVICE_TYPE_DEFAULT = 1; // ポインタ用の型定義 const uintPtr = ref.refType(ref.types.uint32); const sizeTPtr = ref.refType('size_t'); // OpenCL の関数定義 const openCl = ffi.Library('OpenCL', { 'clGetPlatformIDs': ['int', ['uint', sizeTPtr, uintPtr]], 'clGetDeviceIDs': ['int', ['size_t', 'ulong', 'uint', sizeTPtr, uintPtr]] }); // エラーチェック処理 const checkError = (errCode, title = '') => { if (errCode != 0) { throw new Error(`${title} Error: ${errCode}`); } }; const platformIdsPtr = ref.alloc(sizeTPtr); // (1) プラットフォームIDを(1つ)取得 let res = openCl.clGetPlatformIDs(1, platformIdsPtr, null); checkError(res, 'clGetPlatformIDs'); // プラットフォームID(get を使って platformIdsPtr の先頭の値を取得) const platformId = sizeTPtr.get(platformIdsPtr); console.log(`platformId: ${platformId}`); const deviceIdsPtr = ref.alloc(sizeTPtr); // (2) デバイスIDを(1つ)取得 res = openCl.clGetDeviceIDs(platformId, CL_DEVICE_TYPE_DEFAULT, 1, deviceIdsPtr, null); checkError(res, 'clGetDeviceIDs'); // デバイスID(get を使って deviceIdsPtr の先頭の値を取得) const deviceId = sizeTPtr.get(deviceIdsPtr); console.log(`deviceId: ${deviceId}`);
実行結果
> node get_device_id.js platformId: 47812336 deviceId: 4404320
2. OpenCL のプラットフォーム情報取得
次は OpenCL のプラットフォーム情報を取得してみます。
プラットフォーム情報は clGetPlatformInfo
を使って取得します。
- (1)
clGetPlatformInfo
でデータサイズを取得 - (2) バッファを確保
- (3)
clGetPlatformInfo
でデータを取得
platform_info.js
'use strict'; const ffi = require('ffi'); const ref = require('ref'); // 定数の定義 const CL_PLATFORM_PROFILE = 0x0900; const CL_PLATFORM_VERSION = 0x0901; const CL_PLATFORM_NAME = 0x0902; const CL_PLATFORM_VENDOR = 0x0903; const CL_PLATFORM_EXTENSIONS = 0x0904; const CL_PLATFORM_HOST_TIMER_RESOLUTION = 0x0905; const uintPtr = ref.refType(ref.types.uint32); const sizeTPtr = ref.refType('size_t'); const openCl = ffi.Library('OpenCL', { 'clGetPlatformIDs': ['int', ['uint', sizeTPtr, uintPtr]], 'clGetPlatformInfo': ['int', ['size_t', 'uint', 'size_t', 'pointer', sizeTPtr]] }); const checkError = (errCode, title = '') => { if (errCode != 0) { throw new Error(`${title} Error: ${errCode}`); } }; // プラットフォーム情報の出力 const printPlatformInfo = (pid, paramName) => { const sPtr = ref.alloc(sizeTPtr); // (1) データサイズを取得 let res = openCl.clGetPlatformInfo(pid, paramName, 0, null, sPtr); checkError(res, 'clGetPlatformInfo size'); // データサイズの値を取り出す const size = sizeTPtr.get(sPtr); // (2) バッファを確保 const buf = Buffer.alloc(size); // (3) データを取得 res = openCl.clGetPlatformInfo(pid, paramName, size, buf, null); checkError(res, 'clGetPlatformInfo data'); // 出力 console.log(buf.toString()); }; const platformIdsPtr = ref.alloc(sizeTPtr); const res = openCl.clGetPlatformIDs(1, platformIdsPtr, null); checkError(res, 'clGetPlatformIDs'); const platformId = sizeTPtr.get(platformIdsPtr); [ CL_PLATFORM_PROFILE, CL_PLATFORM_VERSION, CL_PLATFORM_NAME ].forEach( p => printPlatformInfo(platformId, p) );
実行結果
> node platform_info.js FULL_PROFILE OpenCL 1.2 Intel(R) OpenCL
Keras で iris を分類
Theano・TensorFlow 用のディープラーニングライブラリ Keras を使って、階層型ニューラルネットによる iris の分類を試してみました。
- Keras 1.0.3
- Python 3.5.1
ソースは http://github.com/fits/try_samples/tree/master/blog/20160531/
準備
今回は Docker コンテナで Keras を実行するため、Docker の公式イメージ python をベースに Keras をインストールした Docker イメージを用意します。
Docker イメージ作成
/vagrant/work/keras/Dockerfile
FROM python RUN apt-get update && apt-get upgrade -y RUN pip install --upgrade pip RUN pip install keras RUN pip install scikit-learn RUN apt-get clean
sklearn の iris データセットを使うために scikit-learn もインストールしていますが、Keras を使うだけなら不要です。
また、Theano はデフォルトでインストールされるようですが、TensorFlow を使う場合は別途インストールする必要がありそうです。(今回は Theano を使います)
Dockerfile に対して docker build を実行して Docker イメージを作成します。
Docker ビルド
$ cd /vagrant/work/keras $ docker build --no-cache -t sample/py-keras:0.1 .
(1) 学習
まずは、iris のデータセットを全て使って学習してみます。
「ConvnetJS で iris を分類」 で実施したように、出力層の活性化関数はソフトマックス、損失関数(誤差関数)に交差エントロピーを使います。
sklearn の iris データセットのように、ラベルデータ (target
の値) が数値 (0 ~ 2) の場合 ※ に交差エントロピーを実施するには compile
の引数で loss = 'sparse_categorical_crossentropy'
と指定すれば良さそうです。
※ iris.target の内容 [0 0 0 0 0 0 0 ・・・ 0 0 0 0 0 0 0 ・・・ 1 1 1 1 1 1 1 ・・・ 2 2 2 2 2 2 2 ・・・ 2 2]
とりあえず、入力層 - 隠れ層 - 出力層
(隠れ層のニューロン数 8)というレイヤー構成を使い、学習処理(fit
)はミニバッチサイズを 1 として 50 回繰り返すように指定しました。
/vagrant/work/iris_sample1.py
from keras.models import Sequential from keras.layers.core import Dense, Activation from sklearn import datasets # モデルの定義 model = Sequential() # 隠れ層の定義 model.add(Dense(input_dim = 4, output_dim = 8)) # 隠れ層の活性化関数 model.add(Activation('relu')) # 出力層の定義 model.add(Dense(output_dim = 3)) # 出力層の活性化関数 model.add(Activation('softmax')) model.compile(loss = 'sparse_categorical_crossentropy', optimizer = 'sgd', metrics = ['accuracy']) iris = datasets.load_iris() # 学習 model.fit(iris.data, iris.target, nb_epoch = 50, batch_size = 1)
docker run で Keras 用の Docker コンテナを起動した後、コンテナ内で上記を実行します。
実行例
$ docker run --rm -it -v /vagrant/work:/work sample/py-keras:0.1 bash # cd /work # python iris_sample1.py Using Theano backend. Epoch 1/50 1/150 [..............................] - ETA: 0s - loss: 3.4213 - acc: 0.0000e 2/150 [..............................] - ETA: 0s - loss: 2.2539 - acc: 0.0000e・・・ Epoch 49/50 150/150 [==============================] - 0s - loss: 0.1225 - acc: 0.9533 Epoch 50/50 150/150 [==============================] - 0s - loss: 0.1525 - acc: 0.9333
誤差(loss)と正解率(acc)が出力されました。
(2) 学習と評価
次は、iris データセットを学習用と評価用に分割して学習と評価をそれぞれ実行してみます。
データセットを直接シャッフルする代わりに、0 ~ 149 の数値をランダムに配置した配列を numpy の random.permutation
で作成し、学習・評価用のデータ分割に利用しました。
/vagrant/work/iris_sample2.py
import sys from keras.models import Sequential from keras.layers.core import Dense, Activation from sklearn import datasets import numpy as np # 学習・評価用のデータ分割率 trainEvalRate = 0.7 # 学習の繰り返し回数 epoch = int(sys.argv[1]) # 隠れ層のニューロン数 neuNum = int(sys.argv[2]) # 隠れ層の活性化関数 act = sys.argv[3] optm = sys.argv[4] model = Sequential() model.add(Dense(input_dim = 4, output_dim = neuNum)) model.add(Activation(act)) model.add(Dense(output_dim = 3)) model.add(Activation('softmax')) model.compile( loss = 'sparse_categorical_crossentropy', optimizer = optm, metrics = ['accuracy'] ) iris = datasets.load_iris() data_size = len(iris.data) train_size = int(data_size * trainEvalRate) perm = np.random.permutation(data_size) # 学習用データ x_train = iris.data[ perm[0:train_size] ] y_train = iris.target[ perm[0:train_size] ] # 学習 model.fit(x_train, y_train, nb_epoch = epoch, batch_size = 1) print('-----') # 評価用データ x_test = iris.data[ perm[train_size:] ] y_test = iris.target[ perm[train_size:] ] # 評価 res = model.evaluate(x_test, y_test, batch_size = 1) print(res)
実行例
# python iris_sample2.py 50 6 sigmoid adam Using Theano backend. Epoch 1/50 105/105 [==============================] - 0s - loss: 1.0751 - acc: 0.3524 Epoch 2/50 105/105 [==============================] - 0s - loss: 1.0417 - acc: 0.3524 ・・・ Epoch 49/50 105/105 [==============================] - 0s - loss: 0.3503 - acc: 0.9714 Epoch 50/50 105/105 [==============================] - 0s - loss: 0.3458 - acc: 0.9714 ----- 45/45 [==============================] - 0s [0.35189295262098313, 0.97777777777777775]
JMX で Java Flight Recorder (JFR) を実行する
Java Flight Recorder (JFR) は Java Mission Control (jmc) や jcmd コマンドから実行できますが、今回は以下の MBean を使って JMX から実行してみます。
- com.sun.management:type=DiagnosticCommand
この MBean は以下のような操作を備えており(戻り値は全て String)、jcmd コマンドと同じ事ができるようです。
- jfrCheck
- jfrDump
- jfrStop
- jfrStart
- vmCheckCommercialFeatures
- vmCommandLine
- vmFlags
- vmSystemProperties
- vmUnlockCommercialFeatures
- vmUptime
- vmVersion
- vmNativeMemory
- gcRotateLog
- gcRun
- gcRunFinalization
- gcClassHistogram
- gcClassStats
- threadPrint
(a) JFR の実行
JMX を使う方法はいくつかありますが、今回は Attach API でローカルの VM へアタッチし、startLocalManagementAgent
メソッドで JMX エージェントを適用する方法を用いました。
DiagnosticCommand には java.lang.management.ThreadMXBean のようなラッパーが用意されていないようなので GroovyMBean
を使う事にします。
jfrStart
の引数は jcmd コマンドと同じものを String 配列にして渡すだけのようです。(jfrStart 以外も基本的に同じ)
また、JFR の実行には Commercial Features のアンロックが必要です。
jfr_run.groovy
import com.sun.tools.attach.VirtualMachine import javax.management.remote.JMXConnectorFactory import javax.management.remote.JMXServiceURL def pid = args[0] def duration = args[1] def fileName = args[2] // 指定の JVM プロセスへアタッチ def vm = VirtualMachine.attach(pid) try { // JMX エージェントを適用 def jmxuri = vm.startLocalManagementAgent() JMXConnectorFactory.connect(new JMXServiceURL(jmxuri)).withCloseable { def server = it.getMBeanServerConnection() // MBean の取得 def bean = new GroovyMBean(server, 'com.sun.management:type=DiagnosticCommand') // Commercial Features のアンロック (JFR の実行に必要) println bean.vmUnlockCommercialFeatures() // JFR の開始 println bean.jfrStart([ "duration=${duration}", "filename=${fileName}", 'delay=10s' ] as String[]) } } finally { vm.detach() }
実行例
apache-tomcat-9.0.0.M4 へ適用してみます。
Tomcat 実行
> startup
以下の環境で実行しました。
- Groovy 2.4.6
- Java SE 8u92 64bit版
JFR 実行
> jps 4576 Jps 2924 Bootstrap > groovy jfr_run.groovy 2924 1m sample1.jfr Commercial Features now unlocked. Recording 1 scheduled to start in 10 s. The result will be written to: C:\・・・\apache-tomcat-9.0.0.M4\apache-tomcat-9.0.0.M4\bin\sample1.jfr
jfrStart は JFR の完了を待たずに戻り値を返すため、JFR の実行状況は別途確認する事になります。
出力結果 Recording 1 scheduled
の 1
が recoding の番号で、この番号を使って JFR の状態を確認できます。
ファイル名を相対パスで指定すると対象プロセスのカレントディレクトリへ出力されるようです。 (今回は Tomcat の bin ディレクトリへ出力されました)
(b) JFR の状態確認
JFR の実行状況を確認するには jfrCheck
を使います。
下記では recording の番号を指定し、該当する JFR の実行状況を出力しています。
jfrCheck の引数が null の場合は全ての JFR 実行状態を取得するようです。
jfr_check.groovy
import com.sun.tools.attach.VirtualMachine import javax.management.remote.JMXConnectorFactory import javax.management.remote.JMXServiceURL def pid = args[0] String[] params = (args.length > 1)? ["recording=${args[1]}"]: null def vm = VirtualMachine.attach(pid) try { def jmxuri = vm.startLocalManagementAgent() JMXConnectorFactory.connect(new JMXServiceURL(jmxuri)).withCloseable { def server = it.getMBeanServerConnection() def bean = new GroovyMBean(server, 'com.sun.management:type=DiagnosticCommand') println bean.jfrCheck(params) } } finally { vm.detach() }
実行例
recording 番号(下記では 1
)を指定して実行します。
実行例1 (JFR 実行中)
> groovy jfr_check.groovy 2924 1 Recording: recording=1 name="sample1.jfr" duration=1m filename="sample1.jfr" compress=false (running)
実行例2 (JFR 完了後)
> groovy jfr_check.groovy 2924 1 Recording: recording=1 name="sample1.jfr" duration=1m filename="sample1.jfr" compress=false (stopped)
今回作成したサンプルのソースは http://github.com/fits/try_samples/tree/master/blog/20160519/