Dagger で Node.js アプリをビルドする
CI/CD のパイプラインを定義するためのツール Dagger を使って Node.js アプリのビルドを試してみました。
- Dagger v0.2.7
今回使用したソースは こちら
sample1. echo の実施
まずは以下の処理を Dagger で実施してみます。
- (1) ローカルの
input.txt
ファイルの内容を取得 - (2) alpine イメージでコンテナを実行し、(1) に
_echo
を付けた文字列を echo して/tmp/output.txt
へ出力 - (3) コンテナの
/tmp/output.txt
の内容を取得し、ローカルのoutput.txt
ファイルへ出力
Dagger プロジェクト作成
下記コマンドを実行して Dagger の実行に必要なファイルを作成しておきます。
$ dagger project init $ dagger project update
project init で cue.mod
が作成され、project update で cue.mod/pkg
内へ dagger.io
と universe.dagger.io
のコアモジュールがインストールされます。
パイプライン定義
Dagger では CUE で CI/CD パイプラインを定義する事になっており、dagger.#Plan & { パイプラインの内容 }
という風に記述します。※
CUE は一部 Go 言語風なので紛らわしいのですが、パワーアップした JSON のようなものだと捉えておくと理解し易いかもしれません。
※ 個人的に、dagger.#Plan の内容(cue.mod/pkg/dagger.io/dagger/plan.cue 参照)に対して { ・・・ } の内容をマージしているのだと解釈しています
パイプラインの処理は dagger.#Plan の actions
で定義する事になっており、アクション名(下記の sample
)は任意の名称を付けられるようです。(dagger do 時にアクション名を指定)
actions の処理はコンテナ内で実行する事になるので、ローカルのファイルや環境変数をコンテナとやりとりするために client
が用意されています。
前述の (1) ~ (3) を定義すると以下のようになりました。
sample.cue
package sample1 import ( "dagger.io/dagger" "dagger.io/dagger/core" ) dagger.#Plan & { actions: { sample: { // (2) alpine イメージの pull _alpine: core.#Pull & { source: "alpine:3" } // (1) ローカルの input.txt ファイルの内容 msg: client.filesystem."input.txt".read.contents // (2) コンテナで echo を実行し /tmp/output.txt へ出力 echo: core.#Exec & { input: _alpine.output args: [ "sh", "-c", "echo -n \(msg)_echo > /tmp/output.txt", ] always: true } // (3) コンテナの /tmp/output.txt の内容を取得 result: core.#ReadFile & { input: echo.output path: "/tmp/output.txt" } } } client: { filesystem: { // (1) ローカルの input.txt ファイルの内容を取得(ファイルの内容を取得するために string を設定) "input.txt": read: contents: string // (3) ローカルの output.txt ファイルへ出力 "output.txt": write: contents: actions.sample.result.contents } } }
コンテナの実行に core.#Exec
、コンテナで出力したファイルの内容を取得するために core.#ReadFile
を使っています。
client.filesystem.<パス>.read.contents
の値を string
とする事で ※、パスで指定したローカルファイルの内容を文字列として参照できます。
※ dagger.#FS とするとファイルシステムを参照できる
なお、前処理の output
を次の処理の input
に繋げていく事でパイプライン(処理の順序)を実現しているようです。
実行
それでは、実行してみます。
input.txt の内容を以下のようにしました。
input.txt
sample1
dagger do で任意のアクションを実行します。
実行例
$ dagger do sample [✔] actions.sample ・・・ [✔] client.filesystem."input.txt".read ・・・ [✔] actions.sample.echo ・・・ [✔] actions.sample.result ・・・ [✔] client.filesystem."output.txt".write ・・・
正常に終了し、ローカルファイル output.txt が作成されたので内容を確認すると以下のようになりました。
output.txt の内容確認
$ cat output.txt sample1_echo
sample2. Node.js アプリのビルド
それでは、本題の Node.js アプリのビルドです。
Node.js アプリ
ここでは、以下の TypeScript ファイルを esbuild
を使って Node.js 用にビルドする事にします。
src/app.ts
import * as E from 'fp-ts/Either' console.log(E.right('sample2'))
今回、esbuild による処理は package.json の scripts で定義しました。
package.json
{ "scripts": { "build": "npx esbuild ./src/app.ts --bundle --platform=node --outfile=build/bundle.js" }, "devDependencies": { "esbuild": "^0.14.37" }, "dependencies": { "fp-ts": "^2.11.10" } }
パイプライン定義
ここでは、以下の処理を実施します。
- (1) ローカルの
package.json
とsrc
ディレクトリをコンテナの/app
へコピー - (2)
npm install
を実行 - (3)
npm run build
を実行 - (4) コンテナの
/app/build
の内容をローカルの_build
へ出力
sample1 ではファイル単位でローカル側とやりとりしましたが、こちらはディレクトリ単位で実施するようにしています。
client.filesystem.".".read.contents
を dagger.#FS
とする事でローカルのカレントディレクトリを参照し、include
を使って package.json
と src
以外を除外しています。
ローカルのディレクトリとファイルをコンテナ環境へコピーするために core.#Copy
、コンテナの /app/build
の内容を参照するために core.#Subdir
を使っています。
node_build.cue
package sample2 import ( "dagger.io/dagger" "dagger.io/dagger/core" ) dagger.#Plan & { actions: { build: { _node: core.#Pull & { source: "node:18-alpine" } // (1) ローカルの package.json と src ディレクトリをコンテナの /app へコピー src: core.#Copy & { input: _node.output contents: client.filesystem.".".read.contents dest: "/app" } // (2) npm install を実行(/app をカレントディレクトリに指定) deps: core.#Exec & { input: src.output workdir: "/app" args: ["npm", "install"] always: true } // (3) npm run build を実行(/app をカレントディレクトリに指定) runBuild: core.#Exec & { input: deps.output workdir: "/app" args: ["npm", "run", "build"] always: true } // (4) /app/build の内容を参照 result: core.#Subdir & { input: runBuild.output path: "/app/build" } } } client: { filesystem: { // (1) ローカルの package.json と src を参照 ".": read: { contents: dagger.#FS include: ["package.json", "src"] } // (4) ローカルの _build へ出力 "_build": write: contents: actions.build.result.output } } }
実行
dagger do で実行します。
実行例
$ dagger do build ・・・ [✔] actions.build ・・・ [✔] client.filesystem.".".read ・・・ [✔] actions.build.src ・・・ [✔] actions.build.deps ・・・ [✔] actions.build.runBuild ・・・ [✔] actions.build.result ・・・ [✔] client.filesystem."_build".write ・・・
これにより、ローカルへ下記のファイルが出力されました。
_build/bundle.js
var __create = Object.create; ・・・ // src/app.ts var E = __toESM(require_Either()); console.log(E.right("sample2"));
問題なく実行できました。
_build/bundle.js の動作確認例
$ cd _build $ node bundle.js { _tag: 'Right', right: 'sample2' }
Amplify AppSync Simulator を直接使ってマッピングテンプレートを検証
Amplify AppSync Simulator は、AWS Amplify CLI に含まれているモジュールで、AppSync をローカル環境で動作確認するためのものです。(AppSync の GraphQL を処理する Web サーバーが起動するようになっている)
ソースコードを見てみたところ、AppSync 用の処理を適用した GraphQL.js の GraphQLSchema
を作り、これを graphql
関数で実行するようになっていました。
そこで試しに、Amplify AppSync Simulator の API を直接使う事で、Amplify CLI を使わず Web サーバーも起動せずに、リクエストマッピングテンプレートの処理結果を出力する Node.js スクリプトを作ってみました。
今回のソースは http://github.com/fits/try_samples/tree/master/blog/20210315/
準備
amplify-appsync-simulator
モジュールをインストールしておきます。
インストール例
> npm install amplify-appsync-simulator
Lambda 用マッピングテンプレートの処理
今回は、Lambda 用のマッピングテンプレートを処理してみます。
AppSync では下記を設定する事で GraphQL の処理を定義するようになっています。
※ リゾルバーは、GraphQL API とデータソースとの紐づけ、 入出力データの加工をマッピングテンプレートとして設定するようになっています
マッピングテンプレートは、VTL(Apache Velocity Template Language)の形式でリクエストやレスポンスの内容を加工して JSON 文字列を作ります。
Amplify AppSync Simulator では AmplifyAppSyncSimulator
クラスの init
メソッドに対して、上記 (1) ~ (3) と defaultAuthenticationType
を設定した AmplifyAppSyncSimulatorConfig
を渡す事で、AppSync 用の処理を適用した GraphQLSchema
を内部的に作るようになっており、それを schema
で取得する事が可能です。
defaultAuthenticationType
の設定は、今回のように GraphQLSchema を直接利用するケースでは使われませんが、設定自体は必須のようです。
また、Amplify AppSync Simulator が対応しているデータソースは今のところ下記 3タイプのようで、タイプ毎に用意されている DataLoader
が処理を担うようになっています。
各 DataLoader の load
メソッドの第一引数にリクエストマッピングテンプレートの処理結果がそのまま渡ってくるようになっています。
NONE データソースの利用
NONE タイプ用 NoneDataLoader
の load メソッドは、単純に payload
の値を処理結果として返すような実装になっています。
そこで、この load メソッドを上書きする事でリクエストマッピングテンプレートの結果を出力するようにしてみました。
なお、AmplifyAppSyncSimulator が作成した GraphQLSchema を graphql
関数で処理する際のコンテキストに requestAuthorizationMode
と appsyncErrors
の項目が最低限必要でした。
defaultAuthenticationType は必須のようなので API_KEY
を設定していますが、実際には使われないので API_KEY の値を設定したりする必要はありませんでした。
sample1.js
const { AmplifyAppSyncSimulator } = require('amplify-appsync-simulator') const { graphql } = require('graphql') // GraphQL スキーマ定義 const schema = ` type Item { id: ID! value: Int! } input FindInput { id: ID! } type Query { find(input: FindInput!): Item } ` // データソースの設定 const dataSources = [ { type: 'NONE', name: 'ItemFunc' } ] // リゾルバーの設定 const resolvers = [ { kind: 'UNIT', // GraphQL の Query で定義した find フィールドと ItemFunc データソースとのマッピング typeName: 'Query', fieldName: 'find', dataSourceName: 'ItemFunc' // リクエストマッピングテンプレートの設定(GraphQL クエリの input をそのまま payload に設定) requestMappingTemplate: ` { "version": "2018-05-29", "operation": "Invoke", "payload": $utils.toJson($ctx.args.input) } `, // レスポンスマッピングテンプレートの設定 responseMappingTemplate: '$utils.toJson($context.result)' } ] // 設定 const config = { schema: { content: schema }, resolvers, dataSources, appSync: { // 下記の設定は必須 defaultAuthenticationType: { authenticationType: 'API_KEY' } } } const simulator = new AmplifyAppSyncSimulator() simulator.init(config) // ItemFunc(NoneDataLoader)の load メソッド上書き simulator.getDataLoader('ItemFunc').load = (req) => { console.log(`*** load: ${JSON.stringify(req)}`) return { id: req.payload.id, value: 123 } } // AmplifyAppSyncSimulator用 GraphQL の実行コンテキスト const ctx = { requestAuthorizationMode: 'API_KEY', appsyncErrors: [] } // GraphQL クエリ const q = ` query FindItem($id: ID!) { find(input: {id: $id}) { id value } } ` const run = async () => { // GraphQL クエリ実行 const r = await graphql(simulator.schema, q, null, ctx, {id: 'id1'}) console.log(JSON.stringify(r)) } run().catch(err => console.error(err))
実行結果は以下の通り。 リクエストマッピングテンプレートの処理結果が出力されています。
実行結果
> node sample1.js *** load: {"version":"2018-05-29","operation":"Invoke","payload":{"id":"id1"}} {"data":{"find":{"id":"id1","value":123}}}
AWS_LAMBDA データソースの利用
AWS_LAMBDA タイプの場合は、データソース設定の invoke へ設定した関数が呼び出されるようになっているので ※、これを使って Lambda の関数ハンドラーを直接実行する事もできます。
※ リクエストマッピングテンプレート処理結果の payload の値が引数として与えられるようになっています
lambda_func.js(関数ハンドラー)
exports.handler = async (event) => { console.log(`*** handler: ${JSON.stringify(event)}`) return { id: event.id, value: 234 } }
sample2.js
const { AmplifyAppSyncSimulator } = require('amplify-appsync-simulator') const { graphql } = require('graphql') const schema = ` ・・・ ` const dataSources = [ // invoke へ lambda_func.js の handler を設定 { type: 'AWS_LAMBDA', name: 'ItemFunc', invoke: require('./lambda_func.js').handler } ] const resolvers = [ ・・・ ] const config = { schema: { content: schema }, resolvers, dataSources, appSync: { defaultAuthenticationType: { authenticationType: 'API_KEY' } } } const simulator = new AmplifyAppSyncSimulator() simulator.init(config) const ctx = { requestAuthorizationMode: 'API_KEY', appsyncErrors: [] } const q = ` query FindItem($id: ID!) { find(input: {id: $id}) { id value } } ` const run = async () => { const r = await graphql(simulator.schema, q, null, ctx, {id: 'id2'}) console.log(JSON.stringify(r)) } run().catch(err => console.error(err))
実行結果
> node sample2.js *** handler: {"id":"id2"} {"data":{"find":{"id":"id2","value":234}}}
AWS Lambda のランタイム API サーバーを自作して関数ハンドラーをローカル実行
AWS Lambda では、(Lambda 関数の)ランタイムがランタイム API(ランタイムインターフェース)から呼び出しイベントを受け取って、関数ハンドラーを実行し、その結果をランタイム API へ返すような流れで処理が実施されているようです。
そこで、自作のランタイム API サーバーを使って、Go と Node.js でそれぞれ実装した AWS Lambda の関数ハンドラーを(Docker 等を使わずに)ローカル実行してみました。
ソースは http://github.com/fits/try_samples/tree/master/blog/20210211/
ランタイム API サーバーの実装
以下のようなオープンソースのランタイム API サーバーが提供されており、自作する必要は無かったりするのですが。
今回は、fastify を使ってランタイム API サーバーを実装してみました。
AWS Lambda ランタイム API のページから OpenAPI 仕様(2018-06-01)を入手できるので、このスキーマ定義を参考に必要最小限の処理だけを実装します。
まず、ランタイム API として下記を実装する事になります。(ただし、今回の用途では (d) を使いません)
- (a) 次の呼び出し : GET
/runtime/invocation/next
- (b) 呼び出しレスポンス : POST
/runtime/invocation/{AwsRequestId}/response
- (c) 呼び出しエラー : POST
/runtime/invocation/{AwsRequestId}/error
- (d) 初期化エラー : POST
/runtime/init/error
(b) ~ (d) は成功時に 202 ステータスコードを返す事になります。
(b) や (c) における AwsRequestId
の値は、(a) のレスポンスヘッダーで通知する事になり、(a) のレスポンスヘッダーで最低限必要な項目は下記でした。(aws-lambda-go の場合)
- Lambda-Runtime-Aws-Request-Id (AwsRequestId の値)
- Lambda-Runtime-Deadline-Ms (実行期限)
一方で、ランタイム側は以下のような手順でランタイム API を呼び出すようになっているようです。
- (1) (a) を GET してレスポンスが返ってくるのを待つ
- (2) (a) からレスポンス(呼び出しイベント)が返ってくると関数ハンドラーを呼び出す
- (3) 関数ハンドラーの呼び出し結果を (b) へ POST、エラーが発生した場合は (c) へ POST
- (4) (1) へ戻る
つまり、(a) で固定の内容をすぐに返すような実装にしてしまうと、(1) ~ (4) が繰り返されてしまい不都合が生じます。
そこで、下記では /invoke
を呼び出した際に (a) のレスポンスを返すようにしています。
また、Go と Node.js の関数ハンドラーを同時に扱うために、(a) が呼び出された際の reply を replies へ溜めておいて、forEach で処理するようにしています。
runtime-api-server/server.js (ランタイム API サーバー)
const logger = false const fastify = require('fastify')({ logger }) const RUNTIME_PATH = '/2018-06-01/runtime' const TIMEOUT = 5 * 1000 const port = process.env['SERVER_PORT'] ? parseInt(process.env['SERVER_PORT']) : 8080 const replies = [] fastify.post('/invoke', (req, reply) => { const data = req.body console.log(`*** invoke: ${JSON.stringify(data)}`) replies.splice(0).forEach(r => { const deadline = Date.now() + TIMEOUT // (a) のレスポンスを返す r.code(200) .header('Lambda-Runtime-Aws-Request-Id', data['request-id']) .header('Lambda-Runtime-Deadline-Ms', deadline.toString()) .header('Lambda-Runtime-Trace-Id', data['trace-id']) .header('Lambda-Runtime-Invoked-Function-Arn', data['function-arn']) .send(data.body) }) reply.code(200).send({}) }) // Runtime API の実装 // (a) 次の呼び出し fastify.get(`${RUNTIME_PATH}/invocation/next`, (req, reply) => { console.log('*** next') console.log(req.body) replies.push(reply) }) // (b) 呼び出しレスポンス fastify.post(`${RUNTIME_PATH}/invocation/:id/response`, (req, reply) => { console.log(`*** response: id = ${req.params['id']}`) console.log(req.body) reply.code(202).send({ status: '' }) }) // (c) 呼び出しエラー fastify.post(`${RUNTIME_PATH}/invocation/:id/error`, (req, reply) => { console.log(`*** error: id = ${req.params['id']}`) console.log(req.body) reply.code(202).send({ status: '' }) }) // (d) 初期化エラー fastify.post(`${RUNTIME_PATH}/init/error`, (req, reply) => { console.log('*** init error') console.log(req.body) reply.code(202).send({ status: '' }) }) fastify.listen(port) .then(r => console.log(`started: ${r}`)) .catch(err => console.error(err))
実行
動作確認のために実行しておきます。
> cd runtime-api-server
> node server.js
started: http://127.0.0.1:8080
Go による関数ハンドラー
Go の場合、aws-lambda-go にランタイム API とやり取りを行うランタイムの処理が実装されています。
そのため、aws-lambda-go を使って実装した関数ハンドラーをローカル環境でビルドして実行するだけです。
接続するランタイム API サーバーのアドレスは AWS_LAMBDA_RUNTIME_API
環境変数で設定します。
ここでは、以下の関数ハンドラーを使用します。
sample_go/app.go (関数ハンドラー)
package main import ( "context" "fmt" "github.com/aws/aws-lambda-go/lambda" "github.com/aws/aws-lambda-go/lambdacontext" ) func handler(ctx context.Context, event interface{}) (string, error) { c, _ := lambdacontext.FromContext(ctx) fmt.Println("*** call handler") fmt.Printf("event = %#v\n", event) fmt.Printf("context = %#v\n", c) return "sample-go", nil } func main() { lambda.Start(handler) }
動作確認1
AWS_LAMBDA_RUNTIME_API 環境変数にランタイム API サーバーのアドレスを設定し、app.go をビルドして実行します。
ビルドと実行
> set AWS_LAMBDA_RUNTIME_API=localhost:8080 > cd sample_go > go build app.go > app
ランタイム API サーバー側の出力内容は下記のようになり、app.go から (a) の API が呼び出されている事を確認できます。
server.js(ランタイム API サーバー)の出力内容1
> node server.js started: http://127.0.0.1:8080 *** next null
/invoke を呼び出して、関数ハンドラーを実行してみます。
invoke の呼び出し1
$ curl -s -XPOST -H "Content-Type: application/json" http://localhost:8080/invoke -d '{"request-id":"a1", "function-arn":"sample", "body":{"name":"abc","value":1}}' {}
app.go の出力内容は以下のようになり、関数ハンドラーの実行を確認できました。
app.go の出力内容
> app *** call handler event = map[string]interface {}{"name":"abc", "value":1} context = &lambdacontext.LambdaContext{AwsRequestID:"a1", InvokedFunctionArn:"sample", Identity:lambdacontext.CognitoIdentity{CognitoIdentityID:"", CognitoIdentityPoolID:""}, ClientContext:lambdacontext.ClientContext{Client:lambdacontext.ClientApplication{InstallationID:"", AppTitle:"", AppVersionCode:"", AppPackageName:""}, Env:map[string]string(nil), Custom:map[string]string(nil)}}
ランタイム API サーバー側は以下のように、app.go からのレスポンスを受け取っている事が確認できます。
server.js(ランタイム API サーバー)の出力内容2
> node server.js started: http://127.0.0.1:8080 ・・・ *** invoke: {"request-id":"a1","function-arn":"sample","body":{"name":"abc","value":1}} *** response: id = a1 sample-go *** next null
続いて、エラー時の挙動を確認するために下記を実施します。
invoke の呼び出し2
$ curl -s -XPOST -H "Content-Type: application/json" http://localhost:8080/invoke -d '{"request-id":"b2"}' {}
app.go には何も出力されず、ランタイム API サーバー側は以下のようになり、エラーの内容を受け取っています。
server.js(ランタイム API サーバー)の出力内容3
> node server.js started: http://127.0.0.1:8080 ・・・ *** invoke: {"request-id":"b2"} *** error: id = b2 { errorMessage: 'unexpected end of JSON input', errorType: 'SyntaxError' } *** next null
Node.js による関数ハンドラー
Node.js の場合、通常は関数ハンドラーそのものを実装するだけなので、Go とは違ってランタイムが別途必要となります。
Node.js 用のランタイムとしては下記が提供されており、ネイティブコードなどを使った本格的な作りとなっています。(本番利用を想定していると思われる)
今回はこれを使わず、簡易的なランタイムを node-fetch を使って自作してみました。
node_runtime/runtime.js (Node.js 用の簡易ランタイム)
const fetch = require('node-fetch') const runtimeUrl = `http://${process.env["AWS_LAMBDA_RUNTIME_API"]}/2018-06-01/runtime/invocation` const appRoot = process.cwd() const [moduleName, handlerName] = process.argv[2].split('.') const app = require(`${appRoot}/${moduleName}`) const envData = { functionVersion: process.env["AWS_LAMBDA_FUNCTION_VERSION"], functionName: process.env["AWS_LAMBDA_FUNCTION_NAME"], memoryLimitInMB: process.env["AWS_LAMBDA_FUNCTION_MEMORY_SIZE"], logGroupName: process.env["AWS_LAMBDA_LOG_GROUP_NAME"], logStreamName: process.env["AWS_LAMBDA_LOG_STREAM_NAME"], } const run = async () => { // (a) の API を呼び出す const nextRes = await fetch(`${runtimeUrl}/next`) const deadline = parseInt(nextRes.headers.get('lambda-runtime-deadline-ms')) const context = Object.assign( { getRemainingTimeInMillis: () => deadline - Date.now(), awsRequestId: nextRes.headers.get('lambda-runtime-aws-request-id'), invokedFunctionArn: nextRes.headers.get('lambda-runtime-invoked-function-arn'), identity: {}, clientContext: {}, }, envData ) try { const event = await nextRes.json() // 関数ハンドラーの呼び出し const res = await app[handlerName](event, context) console.log(`* handler result: ${res}`) // (b) の API を呼び出す await fetch( `${runtimeUrl}/${context.awsRequestId}/response`, {method: 'POST', body: res} ) } catch (e) { // (c) の API を呼び出す await fetch( `${runtimeUrl}/${context.awsRequestId}/error`, {method: 'POST', body: JSON.stringify({type: e.type, message: e.message})} ) } } const loop = async () => { while (true) { await run() } } loop().catch(err => console.error(err))
ここでは、下記の関数ハンドラーを使う事にします。
sample_node/app.js (関数ハンドラー)
exports.handler = async (event, context) => { console.log('*** call handler') console.log(`event = ${JSON.stringify(event)}`) console.log(`context = ${JSON.stringify(context)}`) console.log(`remaining time = ${context.getRemainingTimeInMillis()}`) return 'sample-node' }
動作確認2
runtime.js を使って app.js の handler を呼び出すように実行します。
実行
> set AWS_LAMBDA_RUNTIME_API=localhost:8080 > cd sample_node > node ../node_runtime/runtime.js app.handler
ランタイム API サーバーの next が呼び出されました。
server.js(ランタイム API サーバー)の出力内容4
> node server.js started: http://127.0.0.1:8080 ・・・ *** next null
/invoke を呼び出します。
invoke の呼び出し3
$ curl -s -XPOST -H "Content-Type: application/json" http://localhost:8080/invoke -d '{"request-id":"c3", "function-arn":"sample2", "body":{"name":"def","value":2}}' {}
runtime.js + app.js の出力内容は以下のようになり、関数ハンドラーが呼び出されました。
runtime.js + app.js の出力内容
> node ../node_runtime/runtime.js app.handler *** call handler event = {"name":"def","value":2} context = {"awsRequestId":"c3","invokedFunctionArn":"sample2","identity":{},"clientContext":{}} remaining time = 4945 * handler result: sample-node
ランタイム API サーバーの出力も以下のようになり、問題は無さそうです。
server.js(ランタイム API サーバー)の出力内容5
> node server.js started: http://127.0.0.1:8080 ・・・ *** invoke: {"request-id":"c3","function-arn":"sample2","body":{"name":"def","value":2}} ・・・ *** response: id = c3 sample-node *** next null
エラーが発生するように /invoke を呼び出します。
invoke の呼び出し4
$ curl -s -XPOST -H "Content-Type: application/json" http://localhost:8080/invoke -d '{"request-id":"d4"}' {}
こちらも問題は無さそうです。
server.js(ランタイム API サーバー)の出力内容6
> node server.js started: http://127.0.0.1:8080 ・・・ *** invoke: {"request-id":"d4"} ・・・ *** error: id = d4 {"type":"invalid-json","message":"invalid json response body at http://localhost:8080/2018-06-01/runtime/invocation/next reason: Unexpected end of JSON input"} *** next null
イベントベースで考える在庫管理モデル
従来のイベントソーシングのような手法だと、特定の State(というよりは Entity かも)を永続化するための手段として Event を用いるというような、あくまでも State 中心の発想になると思います。
そこで、ここでは下記のような Event 中心の発想に切り替えて、在庫管理(在庫数を把握するだけの単純なもの)を考えてみました。
State
は本質ではなく、Event
を解釈した結果にすぎない(解釈の仕方は様々)Event
を得たり、伝えたりするための手段としてState
を用いる
要するに、Event こそが重要で State(Entity とか)は取るに足らない存在だと(実験的に)考えてみたって事です。
従来のイベントソーシング | 本件 |
---|---|
State が目的、Event が手段 | Event が目的、State が手段 |
なお、ここでイメージしている Event は、特定のドメインに依存しないような本質的なものです。
在庫管理
本件の在庫管理は、以下を把握するだけの単純なものです。
- 何処に何の在庫(とりあえず現物のあるもの)がいくつあるか
実装コードは http://github.com/fits/try_samples/tree/master/blog/20201213/
1. 本質的なイベント
在庫管理で起こりそうなイベントを考えてみます。
在庫(数)は入庫と出庫の結果だと考えられるので、以下のようなイベントが考えられそうです。
- 入庫イベント
- 出庫イベント
また、シンプルに物が移動 ※ した結果が在庫なのだと考えると、(2地点間の)在庫の移動という形で抽象化できそうな気がします。
※ 概念的な移動も含める
そうすると、以下のようなイベントも考えられそうです。
- 在庫移動の開始イベント
- 在庫移動の終了(完了)イベント
ついでに、在庫の引当も区別して考えると ※、以下のようなイベントも考えられます。
- 引当イベント
※ 引当用の場所へ移動するという事にするのであれば区別しなくてもよさそう
まとめると、とりあえずは以下のようなイベントが考えられそうです。
- 在庫移動の開始イベント
- 在庫移動の完了イベント
- 在庫移動のキャンセルイベント
- 引当イベント
- 引当した場合の出庫イベント
- 引当しなかった場合の出庫イベント
- 入庫イベント
ついでに、引当や出庫などの成否をイベントとして明確に分けたい場合は、引当失敗イベント等の失敗イベントを別途設ければ良さそうな気がします。
2. イベント定義
これらのイベントを Rust と TypeScript で型定義してみました。
商品や在庫のロケーション(在庫の保管場所)の具体的な内容はどうでもよいので(ここで具体化する必要がない)、Generics の型変数で表現しておきます。
本質的に必要そうな最低限の情報のみを持たせ、余計な情報は取り除いておきます。※
※ 在庫移動を一意に限定する ID や日付のような メタデータ(と考えられるもの)に関しても除外しました
用語はとりあえず以下のようにしています。
- 引当: assign
- 出庫: ship
- 入庫: arrive
何(item
)をいくつ(qty
)、何処(from
)から何処(to
)へ移動する予定なのかという情報を持たせて在庫の移動を開始するようにしてみました。
入出庫等で予定とは異なる内容になっても不都合が生じないように、それぞれのイベントに必要な情報を持たせています。
また、全体的に ADT(代数的データ型)を意識した内容にしています。
Rust で型定義したイベント
models/events.rs
pub enum StockMoveEvent<Item, Location, Quantity> { // 開始 Started { item: Item, qty: Quantity, from: Location, to: Location, }, // 完了 Completed, // キャンセル Cancelled, // 引当 Assigned { item: Item, from: Location, assigned: Quantity, }, // 出庫 Shipped { item: Item, from: Location, outgoing: Quantity, }, // 引当に対する出庫 AssignShipped { item: Item, from: Location, outgoing: Quantity, assigned: Quantity, }, // 入庫 Arrived { item: Item, to: Location, incoming: Quantity, }, }
TypeScript で型定義したイベント
models/events.ts
export interface StockMoveEventStarted<Item, Location, Quantity> { tag: 'stock-move-event.started' item: Item qty: Quantity from: Location to: Location } export interface StockMoveEventCompleted { tag: 'stock-move-event.completed' } export interface StockMoveEventCancelled { tag: 'stock-move-event.cancelled' } export interface StockMoveEventAssigned<Item, Location, Quantity> { tag: 'stock-move-event.assigned' item: Item from: Location assigned: Quantity } export interface StockMoveEventShipped<Item, Location, Quantity> { tag: 'stock-move-event.shipped' item: Item from: Location outgoing: Quantity } export interface StockMoveEventAssignShipped<Item, Location, Quantity> { tag: 'stock-move-event.assign-shipped' item: Item from: Location outgoing: Quantity assigned: Quantity } export interface StockMoveEventArrived<Item, Location, Quantity> { tag: 'stock-move-event.arrived' item: Item to: Location incoming: Quantity } export type StockMoveEvent<Item, Location, Quantity> = StockMoveEventStarted<Item, Location, Quantity> | StockMoveEventCompleted | StockMoveEventCancelled | StockMoveEventAssigned<Item, Location, Quantity> | StockMoveEventShipped<Item, Location, Quantity> | StockMoveEventAssignShipped<Item, Location, Quantity> | StockMoveEventArrived<Item, Location, Quantity>
3. 在庫移動処理サンプル
上記で定義したイベントを以下のような(在庫移動の)ステートマシンで扱ってみます。※
※ 本件の考え方では、 (本質的な)イベントは特定の処理やルールになるべく依存していない事が重要なので、 このステートマシン(イベントを扱う手段の一つでしかない)に対して 特化しないように注意します
- 在庫移動の状態遷移の基本パターンは 3通り
- (a) 引当 -> 出庫 -> 入庫
- (b) 出庫 -> 入庫
- (c) 入庫
- 入庫の失敗状態は無し(0個の入庫で代用)
(c) は出庫側の状況が不明なケースで入庫の記録だけを残すような用途を想定しています。
3.1 ステートマシンの実装
Rust と TypeScript でそれぞれ実装してみます。
この辺のレイヤーまでは、外界の都合(※1)から隔離しておきたいと考え、関数言語的な作りにしています。
イベントと同様に在庫移動や在庫を ADT(代数的データ型) で表現し、下記のような関数(+ ユーティリティ関数)を提供するだけの作りにしてみました。(※2)
(※1)フレームワーク、UI、永続化、非同期処理、排他制御やその他諸々の都合 (※2)こうしておくと、WebAssembly 等でコンポーネント化して 再利用するなんて事も実現し易くなるかもしれませんし
- (1) 初期状態を返す関数
- (2) 現在の状態とアクションから次の状態とそれに伴って発生したイベントを返す関数(イメージとしては
State -> Action -> Container<(State, Event)>
) - (3) ある時点の状態とそれ以降に起きたイベントの内容から任意の状態を復元して返す関数
なお、ここでは (2) のアクションに相当する部分は関数(と引数の一部)として実装しています。
また、(2) で状態遷移が発生しなかった場合に undefined
を返すように実装していますが、実際は成功時と失敗時の両方を扱うようなコンテナ型(Rust の Result や Either モナドとか)で包むのが望ましいと思います。
ついでに、実装に際して以下のようなルールを加えています。
- 引当、入庫、出庫のロケーションは開始時に予定したものを使用
- 引当時にのみ在庫数を確認(残りの在庫をチェック)
- 在庫のタイプは 2種類
- 在庫数を管理するタイプ(引当分の在庫が余っている場合にのみ引当が成功、在庫数は入庫イベントと出庫イベントから算出)
- 在庫数を管理しないタイプ(引当は常に成功、在庫数は管理せず実質的に無限)
- 引当数や出庫数が 0 の場合は(引当や出庫の)失敗として扱う
引当はこの処理内における単なる数値上の予約であり、入出庫は実際の作業の結果を反映するような用途をとりあえず想定しています。
そのため、数値上の引当に成功しても実際の出庫が成功するとは限らず、数値上の在庫数以上の出庫が発生するようなケースも考えられるので、この処理ではそれらを許容するようにしています。※
※ 在庫の整合性等をどのように制御・調整するかは 使う側(外側のレイヤー)に任せる
Rust による実装
ここで、商品(以下の ItemCode)や在庫ロケーション(以下の LocationCode)の具体的な型を決めていますが、これより外側のレイヤーに具体型を決めさせるようにした方が望ましいかもしれません。
models/stockmove.rs
use std::slice; use super::events::StockMoveEvent; // 商品を識別するための型 pub type ItemCode = String; // 在庫ロケーションを識別するための型 pub type LocationCode = String; pub type Quantity = u32; pub trait Event<S> { type Output; fn apply_to(&self, state: S) -> Self::Output; } pub trait Restore<E> { fn restore(self, events: slice::Iter<E>) -> Self; } // 在庫の型定義 #[allow(dead_code)] #[derive(Debug, Clone)] pub enum Stock { Unmanaged { item: ItemCode, location: LocationCode }, Managed { item: ItemCode, location: LocationCode, qty: Quantity, assigned: Quantity }, } // 在庫に関する処理 #[allow(dead_code)] impl Stock { pub fn unmanaged_new(item: ItemCode, location: LocationCode) -> Self { Self::Unmanaged { item, location } } pub fn managed_new(item: ItemCode, location: LocationCode) -> Self { Self::Managed { item, location, qty: 0, assigned: 0 } } pub fn eq_id(&self, item: &ItemCode, location: &LocationCode) -> bool { match self { Self::Managed { item: it, location: loc, .. } | Self::Unmanaged { item: it, location: loc } => it == item && loc == location } } // 在庫数のチェック pub fn is_sufficient(&self, v: Quantity) -> bool { match self { Self::Managed { qty, assigned, .. } => v + assigned <= *qty, Self::Unmanaged { .. } => true, } } fn update(&self, qty: Quantity, assigned: Quantity) -> Self { match self { Self::Managed { item, location, .. } => { Self::Managed { item: item.clone(), location: location.clone(), qty, assigned, } }, Self::Unmanaged { .. } => self.clone(), } } fn update_qty(&self, qty: Quantity) -> Self { match self { Self::Managed { assigned, .. } => self.update(qty, *assigned), Self::Unmanaged { .. } => self.clone(), } } fn update_assigned(&self, assigned: Quantity) -> Self { match self { Self::Managed { qty, .. } => self.update(*qty, assigned), Self::Unmanaged { .. } => self.clone(), } } } // 在庫に対するイベントの適用 impl Event<Stock> for MoveEvent { type Output = Stock; fn apply_to(&self, state: Stock) -> Self::Output { match &state { Stock::Unmanaged { .. } => state, Stock::Managed { item: s_item, location: s_loc, qty: s_qty, assigned: s_assigned } => { match self { Self::Assigned { item, from, assigned } if s_item == item && s_loc == from => { state.update_assigned( s_assigned + assigned ) }, Self::Shipped { item, from, outgoing } if s_item == item && s_loc == from => { state.update_qty( s_qty.checked_sub(*outgoing).unwrap_or(0) ) }, Self::AssignShipped { item, from, outgoing, assigned } if s_item == item && s_loc == from => { state.update( s_qty.checked_sub(*outgoing).unwrap_or(0), s_assigned.checked_sub(*assigned).unwrap_or(0), ) }, Self::Arrived { item, to, incoming } if s_item == item && s_loc == to => { state.update_qty( s_qty + incoming ) }, _ => state, } }, } } } #[derive(Debug, Default, Clone, PartialEq)] pub struct StockMoveInfo { item: ItemCode, qty: Quantity, from: LocationCode, to: LocationCode, } // 在庫移動の型(状態)定義 #[allow(dead_code)] #[derive(Debug, Clone, PartialEq)] pub enum StockMove { Nothing, Draft { info: StockMoveInfo }, Completed { info: StockMoveInfo, outgoing: Quantity, incoming: Quantity }, Cancelled { info: StockMoveInfo }, Assigned { info: StockMoveInfo, assigned: Quantity }, Shipped { info: StockMoveInfo, outgoing: Quantity }, Arrived { info: StockMoveInfo, outgoing: Quantity, incoming: Quantity }, AssignFailed { info: StockMoveInfo }, ShipmentFailed { info: StockMoveInfo }, } type MoveEvent = StockMoveEvent<ItemCode, LocationCode, Quantity>; type MoveResult = Option<(StockMove, MoveEvent)>; // 在庫移動に関する処理 #[allow(dead_code)] impl StockMove { // 初期状態の取得 pub fn initial_state() -> Self { Self::Nothing } // 開始 pub fn start(&self, item: ItemCode, qty: Quantity, from: LocationCode, to: LocationCode) -> MoveResult { if qty < 1 { return None } let event = StockMoveEvent::Started { item: item.clone(), qty: qty, from: from.clone(), to: to.clone() }; self.apply_event(event) } // 引当 pub fn assign(&self, stock: &Stock) -> MoveResult { if let Some(info) = self.info() { if stock.eq_id(&info.item, &info.from) { let assigned = if stock.is_sufficient(info.qty) { info.qty } else { 0 }; return self.apply_event( StockMoveEvent::Assigned { item: info.item.clone(), from: info.from.clone(), assigned, } ) } } None } // 出庫 pub fn ship(&self, outgoing: Quantity) -> MoveResult { let ev = match self { Self::Assigned { info, assigned } => { Some(StockMoveEvent::AssignShipped { item: info.item.clone(), from: info.from.clone(), outgoing, assigned: assigned.clone(), }) }, _ => { self.info() .map(|i| StockMoveEvent::Shipped { item: i.item.clone(), from: i.from.clone(), outgoing, } ) }, }; ev.and_then(|e| self.apply_event(e)) } // 入庫 pub fn arrive(&self, incoming: Quantity) -> MoveResult { self.info() .and_then(|i| self.apply_event(StockMoveEvent::Arrived { item: i.item.clone(), to: i.to.clone(), incoming, }) ) } pub fn complete(&self) -> MoveResult { self.apply_event(StockMoveEvent::Completed) } pub fn cancel(&self) -> MoveResult { self.apply_event(StockMoveEvent::Cancelled) } fn info(&self) -> Option<StockMoveInfo> { match self { Self::Draft { info } | Self::Completed { info, .. } | ・・・ Self::Arrived { info, .. } => { Some(info.clone()) }, Self::Nothing => None, } } fn apply_event(&self, event: MoveEvent) -> MoveResult { let new_state = event.apply_to(self.clone()); Some((new_state, event)) .filter(|r| r.0 != *self) } } // 在庫移動に対するイベントの適用 impl Event<StockMove> for MoveEvent { type Output = StockMove; fn apply_to(&self, state: StockMove) -> Self::Output { match self { Self::Started { item, qty, from, to } => { if state == StockMove::Nothing { StockMove::Draft { info: StockMoveInfo { item: item.clone(), qty: qty.clone(), from: from.clone(), to: to.clone(), } } } else { state } }, Self::Completed => { if let StockMove::Arrived { info, outgoing, incoming } = state { StockMove::Completed { info: info.clone(), outgoing, incoming } } else { state } }, Self::Cancelled => { if let StockMove::Draft { info } = state { StockMove::Cancelled { info: info.clone() } } else { state } }, Self::Assigned { item, from, assigned } => { match state { StockMove::Draft { info } if info.item == *item && info.from == *from => { if *assigned > 0 { StockMove::Assigned { info: info.clone(), assigned: assigned.clone(), } } else { StockMove::AssignFailed { info: info.clone() } } }, _ => state, } }, Self::Shipped { item, from, outgoing } => { match state { StockMove::Draft { info } if info.item == *item && info.from == *from => { if *outgoing > 0 { StockMove::Shipped { info: info.clone(), outgoing: outgoing.clone(), } } else { StockMove::ShipmentFailed { info: info.clone() } } }, _ => state, } }, Self::AssignShipped { item, from, outgoing, .. } => { match state { StockMove::Assigned { info, .. } if info.item == *item && info.from == *from => { if *outgoing > 0 { StockMove::Shipped { info: info.clone(), outgoing: outgoing.clone(), } } else { StockMove::ShipmentFailed { info: info.clone() } } }, _ => state, } }, Self::Arrived { item, to, incoming } => { match state { StockMove::Draft { info } if info.item == *item && info.to == *to => { StockMove::Arrived { info: info.clone(), outgoing: 0, incoming: *incoming, } }, StockMove::Shipped { info, outgoing } if info.item == *item && info.to == *to => { StockMove::Arrived { info: info.clone(), outgoing, incoming: *incoming, } }, _ => state, } }, } } } // 在庫や在庫移動の状態復元 impl<S, E> Restore<&E> for S where Self: Clone, E: Event<Self, Output = Self>, { fn restore(self, events: slice::Iter<&E>) -> Self { events.fold(self, |acc, ev| ev.apply_to(acc.clone())) } }
TypeScript による実装
実装の仕方が多少違っていますが、Rust 版の処理内容と概ね同じ(にしたつもり)です。
models/stockmove.ts
import { StockMoveEvent, StockMoveEventShipped, StockMoveEventAssignShipped } from './events' export type ItemCode = string export type LocationCode = string export type Quantity = number export type MoveEvent = StockMoveEvent<ItemCode, LocationCode, Quantity> type ShippedMoveEvent = StockMoveEventShipped<ItemCode, LocationCode, Quantity> type AssignShippedMoveEvent = StockMoveEventAssignShipped<ItemCode, LocationCode, Quantity> interface StockUnmanaged { tag: 'stock.unmanaged' item: ItemCode location: LocationCode } interface StockManaged { tag: 'stock.managed' item: ItemCode location: LocationCode qty: Quantity assigned: Quantity } // 在庫の型定義 export type Stock = StockUnmanaged | StockManaged // 在庫に関する処理 export class StockAction { static newUnmanaged(item: ItemCode, location: LocationCode): Stock { return { tag: 'stock.unmanaged', item, location } } static newManaged(item: ItemCode, location: LocationCode): Stock { return { tag: 'stock.managed', item, location, qty: 0, assigned: 0 } } // 在庫数のチェック static isSufficient(stock: Stock, qty: Quantity): boolean { switch (stock.tag) { case 'stock.unmanaged': return true case 'stock.managed': return qty + Math.max(0, stock.assigned) <= Math.max(0, stock.qty) } } } // 在庫の復元処理 export class StockRestore { static restore(state: Stock, events: MoveEvent[]): Stock { return events.reduce(StockRestore.applyTo, state) } // 在庫に対するイベントの適用 private static applyTo(state: Stock, event: MoveEvent): Stock { if (state.tag == 'stock.managed') { switch (event.tag) { case 'stock-move-event.assigned': if (state.item == event.item && state.location == event.from) { return StockRestore.updateAssigned( state, state.assigned + event.assigned ) } break case 'stock-move-event.assign-shipped': if (state.item == event.item && state.location == event.from) { return StockRestore.updateStock( state, state.qty - event.outgoing, state.assigned - event.assigned ) } break ・・・ } } return state } private static updateStock(stock: Stock, qty: Quantity, assigned: Quantity): Stock { switch (stock.tag) { case 'stock.unmanaged': return stock case 'stock.managed': return { tag: stock.tag, item: stock.item, location: stock.location, qty, assigned } } } ・・・ } interface StockMoveInfo { item: ItemCode qty: Quantity from: LocationCode to: LocationCode } interface StockMoveNothing { tag: 'stock-move.nothing' } interface StockMoveDraft { tag: 'stock-move.draft' info: StockMoveInfo } ・・・ // 在庫移動の型(状態)定義 export type StockMove = StockMoveNothing | StockMoveDraft | StockMoveCompleted | StockMoveCancelled | StockMoveAssigned | StockMoveShipped | StockMoveArrived | StockMoveAssignFailed | StockMoveShipmentFailed export type StockMoveResult = [StockMove, MoveEvent] | undefined // 在庫移動に関する処理 export class StockMoveAction { // 初期状態を取得 static initialState(): StockMove { return { tag: 'stock-move.nothing' } } // 開始 static start(state: StockMove, item: ItemCode, qty: Quantity, from: LocationCode, to: LocationCode): StockMoveResult { if (qty < 1) { return undefined } const event: MoveEvent = { tag: 'stock-move-event.started', item, qty, from, to } return StockMoveAction.applyTo(state, event) } // 引当 static assign(state: StockMove, stock: Stock): StockMoveResult { const info = StockMoveAction.info(state) if (info && info.item == stock.item && info.from == stock.location) { const assigned = (stock && StockAction.isSufficient(stock, info.qty)) ? info.qty : 0 const event: MoveEvent = { tag: 'stock-move-event.assigned', item: info.item, from: info.from, assigned } return StockMoveAction.applyTo(state, event) } return undefined } // 出庫 static ship(state: StockMove, outgoing: Quantity): StockMoveResult { if (outgoing < 0) { return undefined } const event = StockMoveAction.toShippedEvent(state, outgoing) return event ? StockMoveAction.applyTo(state, event) : undefined } // 入庫 static arrive(state: StockMove, incoming: Quantity): StockMoveResult { if (incoming < 0) { return undefined } const info = StockMoveAction.info(state) if (info) { const event: MoveEvent = { tag: 'stock-move-event.arrived', item: info.item, to: info.to, incoming } return StockMoveAction.applyTo(state, event) } return undefined } ・・・ static info(state: StockMove) { if (state.tag != 'stock-move.nothing') { return state.info } return undefined } private static applyTo(state: StockMove, event: MoveEvent): StockMoveResult { const nextState = StockMoveRestore.restore(state, [event]) return (nextState != state) ? [nextState, event] : undefined } private static toShippedEvent(state: StockMove, outgoing: number): MoveEvent | undefined { const info = StockMoveAction.info(state) if (info) { if (state.tag == 'stock-move.assigned') { return { tag: 'stock-move-event.assign-shipped', item: info.item, from: info.from, assigned: state.assigned, outgoing } } else { return { tag: 'stock-move-event.shipped', item: info.item, from: info.from, outgoing } } } return undefined } } // 在庫移動の復元処理 export class StockMoveRestore { static restore(state: StockMove, events: MoveEvent[]): StockMove { return events.reduce(StockMoveRestore.applyTo, state) } // 在庫移動に対するイベントの適用 private static applyTo(state: StockMove, event: MoveEvent): StockMove { switch (state.tag) { case 'stock-move.nothing': if (event.tag == 'stock-move-event.started') { return { tag: 'stock-move.draft', info: { item: event.item, qty: event.qty, from: event.from, to: event.to } } } break case 'stock-move.draft': return StockMoveRestore.applyEventToDraft(state, event) case 'stock-move.assigned': if (event.tag == 'stock-move-event.assign-shipped') { return StockMoveRestore.applyShipped(state, event) } break case 'stock-move.shipped': if (event.tag == 'stock-move-event.arrived' && state.info.item == event.item && state.info.to == event.to) { return { tag: 'stock-move.arrived', info: state.info, outgoing: state.outgoing, incoming: event.incoming } } break case 'stock-move.arrived': if (event.tag == 'stock-move-event.completed') { return { tag: 'stock-move.completed', info: state.info, outgoing: state.outgoing, incoming: state.incoming } } break case 'stock-move.completed': case 'stock-move.cancelled': case 'stock-move.assign-failed': case 'stock-move.shipment-failed': break } return state } private static applyShipped(state: StockMoveDraft | StockMoveAssigned, event: ShippedMoveEvent | AssignShippedMoveEvent): StockMove { if (state.info.item == event.item && state.info.from == event.from) { if (event.outgoing > 0) { return { tag: 'stock-move.shipped', info: state.info, outgoing: event.outgoing } } else { return { tag: 'stock-move.shipment-failed', info: state.info } } } return state } private static applyEventToDraft(state: StockMoveDraft, event: MoveEvent): StockMove { switch (event.tag) { case 'stock-move-event.cancelled': return { tag: 'stock-move.cancelled', info: state.info } case 'stock-move-event.assigned': if (state.info.item == event.item && state.info.from == event.from) { if (event.assigned > 0) { return { tag: 'stock-move.assigned', info: state.info, assigned: event.assigned } } else { return { tag: 'stock-move.assign-failed', info: state.info } } } break case 'stock-move-event.shipped': return StockMoveRestore.applyShipped(state, event) case 'stock-move-event.arrived': if (state.info.item == event.item && state.info.to == event.to) { return { tag: 'stock-move.arrived', info: state.info, outgoing: 0, incoming: Math.max(event.incoming, 0) } } break } return state } }
3.2 GraphQL 化 + MongoDB へ永続化
ついでに、前述のステートマシン(TypeScript 実装版)を Apollo Server で GraphQL 化し、MongoDB へ永続化するようにしてみました。
index.ts
import { ApolloServer, gql } from 'apollo-server' import { v4 as uuidv4 } from 'uuid' import { MongoClient, Collection } from 'mongodb' import { ItemCode, LocationCode, MoveEvent, StockMoveAction, StockMoveRestore, StockMove, StockMoveResult, StockAction, StockRestore, Stock } from './models' const mongoUrl = 'mongodb://localhost' const dbName = 'stockmoves' const colName = 'events' const stocksColName = 'stocks' type MoveId = string type Revision = number // MongoDB へ保存するイベント内容 interface StoredEvent { move_id: MoveId revision: Revision item: ItemCode from: LocationCode to: LocationCode event: MoveEvent } interface RestoredStockMove { state: StockMove revision: Revision } // MongoDB への永続化処理 class Store { ・・・ async loadStock(item: ItemCode, location: LocationCode): Promise<Stock | undefined> { const id = this.stockId(item, location) const stock = await this.stocksCol.findOne({ _id: id }) if (!stock) { return undefined } const query = { '$and': [ { item }, { '$or': [ { from: location }, { to: location } ]} ] } const events = await this.eventsCol .find(query) .map(r => r.event) .toArray() return StockRestore.restore(stock, events) } async saveStock(stock: Stock): Promise<void> { const id = this.stockId(stock.item, stock.location) const res = await this.stocksCol.updateOne( { _id: id }, { '$setOnInsert': stock }, { upsert: true } ) if (res.upsertedCount == 0) { return Promise.reject('conflict stock') } } async loadMove(moveId: MoveId): Promise<RestoredStockMove | undefined> { const events: StoredEvent[] = await this.eventsCol .find({ move_id: moveId }) .sort({ revision: 1 }) .toArray() const state = StockMoveAction.initialState() const revision = events.reduce((acc, e) => Math.max(acc, e.revision), 0) const res = StockMoveRestore.restore(state, events.map(e => e.event)) return (res == state) ? undefined : { state: res, revision } } async saveEvent(event: StoredEvent): Promise<void> { const res = await this.eventsCol.updateOne( { move_id: event.move_id, revision: event.revision }, { '$setOnInsert': event }, { upsert: true } ) if (res.upsertedCount == 0) { return Promise.reject(`conflict event revision=${event.revision}`) } } private stockId(item: ItemCode, location: LocationCode): string { return `${item}/${location}` } } // GraphQL スキーマ定義 const typeDefs = gql(` type StockMoveInfo { item: ID! qty: Int! from: ID! to: ID! } interface StockMove { id: ID! info: StockMoveInfo! } type DraftStockMove implements StockMove { id: ID! info: StockMoveInfo! } type CompletedStockMove implements StockMove { id: ID! info: StockMoveInfo! outgoing: Int! incoming: Int! } ・・・ interface Stock { item: ID! location: ID! } type UnmanagedStock implements Stock { item: ID! location: ID! } type ManagedStock implements Stock { item: ID! location: ID! qty: Int! assigned: Int! } input CreateStockInput { item: ID! location: ID! } input StartMoveInput { item: ID! qty: Int! from: ID! to: ID! } type Query { findStock(item: ID!, location: ID!): Stock findMove(id: ID!): StockMove } type Mutation { createManaged(input: CreateStockInput!): ManagedStock createUnmanaged(input: CreateStockInput!): UnmanagedStock start(input: StartMoveInput!): StockMove assign(id: ID!): StockMove ship(id: ID!, outgoing: Int!): StockMove arrive(id: ID!, incoming: Int!): StockMove complete(id: ID!): StockMove cancel(id: ID!): StockMove } `) const toStockMoveForGql = (id: MoveId, state: StockMove | undefined) => { if (state) { return { id, ...state } } return undefined } type MoveAction = (state: StockMove) => StockMoveResult const doMoveAction = async (store: Store, rs: RestoredStockMove | undefined, id: MoveId, action: MoveAction) => { if (rs) { const res = action(rs.state) if (res) { const [mv, ev] = res const info = StockMoveAction.info(mv) if (info) { const event = { move_id: id, revision: rs.revision + 1, item: info.item, from: info.from, to: info.to, event: ev } await store.saveEvent(event) return toStockMoveForGql(id, mv) } } } return undefined } // GraphQL 処理の実装 const resolvers = { Stock: { __resolveType: (obj, ctx, info) => { if (obj.tag == 'stock.managed') { return 'ManagedStock' } return 'UnmanagedStock' } }, StockMove: { __resolveType: (obj: StockMove, ctx, info) => { switch (obj.tag) { case 'stock-move.draft': return 'DraftStockMove' case 'stock-move.completed': return 'CompletedStockMove' ・・・ case 'stock-move.shipment-failed': return 'ShipmentFailedStockMove' } return undefined } }, Query: { findStock: async (parent, { item, location }, { store }, info) => { return store.loadStock(item, location) }, findMove: async (parent, { id }, { store }, info) => { const res = await store.loadMove(id) return toStockMoveForGql(id, res?.state) } }, Mutation: { createManaged: async (parent, { input: { item, location } }, { store }, info) => { const s = StockAction.newManaged(item, location) await store.saveStock(s) return s }, ・・・ start: async (parent, { input: { item, qty, from, to } }, { store }, info) => { const rs = { state: StockMoveAction.initialState(), revision: 0 } const id = `move-${uuidv4()}` return doMoveAction( store, rs, id, s => StockMoveAction.start(s, item, qty, from, to) ) }, assign: async(parent, { id }, { store }, info) => { const rs = await store.loadMove(id) if (rs) { const info = StockMoveAction.info(rs.state) if (info) { const stock = await store.loadStock(info.item, info.from) return doMoveAction( store, rs, id, s => StockMoveAction.assign(s, stock) ) } } return undefined }, ship: async(parent, { id, outgoing }, { store }, info) => { const rs = await store.loadMove(id) return doMoveAction( store, rs, id, s => StockMoveAction.ship(s, outgoing) ) }, ・・・ } } const run = async () => { const mongo = await MongoClient.connect(mongoUrl, { useUnifiedTopology: true }) const eventsCol = mongo.db(dbName).collection(colName) const stocksCol = mongo.db(dbName).collection(stocksColName) const store = new Store(eventsCol, stocksCol) const server = new ApolloServer({ typeDefs, resolvers, context: { store } }) const res = await server.listen() console.log(res.url) } run().catch(err => console.error(err))
クライアント実装例
以下のように GraphQL クエリを送信する事で操作できます。
client/create_stock.ts (在庫の作成)
import { request, gql } from 'graphql-request' const endpoint = 'http://localhost:4000' const item = process.argv[2] const location = process.argv[3] const q1 = gql` mutation CreateUnmanaged($item: ID!, $location: ID!) { createUnmanaged(input: { item: $item, location: $location }) { __typename item location } } ` const q2 = gql` mutation CreateManaged($item: ID!, $location: ID!) { createManaged(input: { item: $item, location: $location }) { __typename item location } } ` const query = process.argv.length > 4 ? q1 : q2 request(endpoint, query, { item, location }) .then(r => console.log(r)) .catch(err => console.error(err))
create_stock.ts 実行例
> ts-node create_stock.ts item-1 store-A { createManaged: { __typename: 'ManagedStock', item: 'item-1', location: 'store-A' } }
client/start_move.ts (在庫移動の開始)
・・・ const item = process.argv[2] const qty = parseInt(process.argv[3]) const from = process.argv[4] const to = process.argv[5] const query = gql` mutation { start(input: { item: "${item}", qty: ${qty}, from: "${from}", to: "${to}" }) { __typename id info { item qty from to } } } ` request(endpoint, query) .then(r => console.log(r)) .catch(err => console.error(err))
start_move.ts 実行例
> ts-node start_move.ts item-1 5 store-A store-B { start: { __typename: 'DraftStockMove', id: 'move-cfa1fc9c-b599-4854-8385-207cbb77e8a3', info: { item: 'item-1', qty: 5, from: 'store-A', to: 'store-B' } } }
client/find_move.ts (在庫移動の取得)
・・・ const id = process.argv[2] const query = gql` { findMove(id: "${id}") { __typename id info { item qty from to } ... on AssignedStockMove { assigned } ... on ShippedStockMove { outgoing } ... on ArrivedStockMove { outgoing incoming } ... on CompletedStockMove { outgoing incoming } } } ` request(endpoint, query) .then(r => console.log(r)) .catch(err => console.error(err))
find_move.ts 実行例
> ts-node find_move.ts move-cfa1fc9c-b599-4854-8385-207cbb77e8a3 { findMove: { __typename: 'CompletedStockMove', id: 'move-cfa1fc9c-b599-4854-8385-207cbb77e8a3', info: { item: 'item-1', qty: 5, from: 'store-A', to: 'store-B' }, outgoing: 5, incoming: 5 } }
Node.js で GraphQL over gRPC 的な事をやってみる
gRPC 上で GraphQL を扱う GraphQL over gRPC 的な処理を Node.js で試しに実装してみました。
今回のコードは http://github.com/fits/try_samples/tree/master/blog/20201124/
はじめに
GraphQL はクエリ言語なので基本的に通信プロトコルには依存していません。
Web フロントエンドの用途では Apollo GraphQL が公開している GraphQL over WebSocket Protocol が有力そうですが、マイクロサービス等の用途で GraphQL を利用する事を考えると WebSocket よりも gRPC の方が適しているように思います。
- GraphQL の Query や Mutation は gRPC の Unary RPC で実現可能
- GraphQL の Subscription は gRPC の Server streaming RPC で実現可能 ※
そこで、とりあえず実装し確認してみたというのが本件の趣旨となっています。
※ GraphQL の Subscription を使わずに gRPC の streaming RPC で代用する事も考えられる
なお、GraphQL の処理に関しては「Deno で GraphQL」の内容をベースに、gRPC は「Node.js で gRPC を試す」の静的コード生成を用いて実装しています。
Query と Mutation - sample1
まずは Subscription を除いた Query と Mutation について実装してみます。
gRPC サービス定義
gRPC のサービス定義を下記のように定義してみました。
GraphQL のクエリは文字列で扱うので型は string
、結果は実質的に JSON となるので型は google.protobuf.Struct
としました。
ついでに、クエリの変数も渡せるようにして型は google.protobuf.Value
としています。
ここで、google.protobuf.Struct
は JSON Object を Protocol Buffers で表現するための型として定義されたもので、google.protobuf.Value
は JSON Value(null、文字列、数値、配列、JSON Object 等)を表現するための型です。(参照 google/protobuf/struct.proto)
proto/graphql.proto
syntax = "proto3"; import "google/protobuf/struct.proto"; package gql; message QueryRequest { string query = 1; google.protobuf.Value variables = 2; } service GraphQL { rpc Query(QueryRequest) returns (google.protobuf.Struct); }
この proto/graphql.proto から gRPC のコードを生成しておきます。
静的コード生成
> mkdir generated > npm run gen-grpc ・・・ grpc_tools_node_protoc --grpc_out=grpc_js:generated --js_out=import_style=commonjs:generated proto/*.proto
package.json の内容は以下の通りです。
package.json
{ "name": "sample1", "version": "1.0.0", "description": "", "scripts": { "gen-grpc": "grpc_tools_node_protoc --grpc_out=grpc_js:generated --js_out=import_style=commonjs:generated proto/*.proto" }, "dependencies": { "@grpc/grpc-js": "^1.2.1", "google-protobuf": "^3.14.0", "graphql": "^15.4.0", "uuid": "^8.3.1" }, "devDependencies": { "grpc-tools": "^1.10.0" } }
サーバー実装
GraphQL を扱う gRPC サーバーを実装します。
gRPC のリクエストから GraphQL のクエリやその変数の内容を取得し、graphql
関数で処理した結果をレスポンスとして返すような処理となります。
google.protobuf.Struct や Value に該当する型は google-protobuf/google/protobuf/struct_pb.js
にて Struct
や Value
として定義されており、プレーンな JavaScript オブジェクトと相互変換するための fromJavaScript
や toJavaScript
メソッドが用意されています。
下記コードでは QueryRequest の variables の内容を JavaScript オブジェクトとして取得するために toJavaScript
を、graphql の処理結果を Struct で返すために fromJavaScript
をそれぞれ使用しています。
server.js
const grpc = require('@grpc/grpc-js') const { GraphQLService } = require('./generated/proto/graphql_grpc_pb') const { Struct } = require('google-protobuf/google/protobuf/struct_pb') const { graphql, buildSchema } = require('graphql') const { v4: uuidv4 } = require('uuid') // GraphQL スキーマ定義 const schema = buildSchema(` enum Category { Standard Extra } input CreateItem { category: Category! value: Int! } type Item { id: ID! category: Category! value: Int! } type Mutation { create(input: CreateItem!): Item } type Query { find(id: ID!): Item } `) const store = {} // スキーマ定義に応じた GraphQL 処理の実装 const root = { create: ({ input: { category, value } }) => { console.log(`*** call create: category = ${category}, value = ${value}`) const id = `item-${uuidv4()}` const item = { id, category, value } store[id] = item return item }, find: ({ id }) => { console.log(`*** call find: ${id}`) return store[id] } } const server = new grpc.Server() server.addService(GraphQLService, { async query(call, callback) { try { const query = call.request.getQuery() const variables = call.request.getVariables().toJavaScript() // GraphQL の処理 const r = await graphql(schema, query, root, {}, variables) callback(null, Struct.fromJavaScript(r)) } catch(e) { console.error(e) callback(e) } } }) server.bindAsync( '127.0.0.1:50051', grpc.ServerCredentials.createInsecure(), (err, port) => { if (err) { console.error(err) return } console.log(`start server: ${port}`) server.start() } )
クライアント実装
クライアント側は以下のようになります。
client.js
const grpc = require('@grpc/grpc-js') const { QueryRequest } = require('./generated/proto/graphql_pb') const { GraphQLClient } = require('./generated/proto/graphql_grpc_pb') const { Value } = require('google-protobuf/google/protobuf/struct_pb') const client = new GraphQLClient( '127.0.0.1:50051', grpc.credentials.createInsecure() ) const promisify = (obj, methodName) => args => new Promise((resolve, reject) => { obj[methodName](args, (err, res) => { if (err) { reject(err) } else { resolve(res) } }) }) const query = promisify(client, 'query') const createRequest = (q, v = null) => { const req = new QueryRequest() req.setQuery(q) req.setVariables(Value.fromJavaScript(v)) return req } const run = async () => { // Item の作成 const r1 = await query(createRequest(` mutation { create(input: { category: Extra, value: 123 }) { id } } `)) console.log(r1.toJavaScript()) // 存在しない Item の find const r2 = await query(createRequest(` { find(id: "a1") { id value } } `)) console.log(r2.toJavaScript()) const id = r1.toJavaScript().data.create.id // 作成した Item の find (クエリ変数の使用) const r3 = await query(createRequest( ` query findItem($id: ID!) { find(id: $id) { id category value } } `, { id } )) console.log(r3.toJavaScript()) } run().catch(err => console.error(err))
動作確認
server.js を実行しておきます。
server.js 実行
> node server.js start server: 50051
client.js を実行した結果は以下の通りで、特に問題無く動作しているようです。
client.js 実行
> node client.js { data: { create: { id: 'item-63bb7704-27b6-44ae-b955-61cbad83248d' } } } { data: { find: null } } { data: { find: { category: 'Extra', id: 'item-63bb7704-27b6-44ae-b955-61cbad83248d', value: 123 } } }
Subscription - sample2
次は Subscription の機能を追加します。
gRPC サービス定義
gRPC のサービス定義に Subscription 用のメソッドを追加し、sample1 と同様にコードを生成しておきます。
proto/graphql.proto
syntax = "proto3"; import "google/protobuf/struct.proto"; package gql; message QueryRequest { string query = 1; google.protobuf.Value variables = 2; } service GraphQL { rpc Query(QueryRequest) returns (google.protobuf.Struct); rpc Subscription(QueryRequest) returns (stream google.protobuf.Struct); }
サーバー実装
「Deno で GraphQL」では単一の Subscription を処理するだけの実装だったので、複数クライアントからの Subscription を処理するために PubSub
というクラスを追加し、subscription の呼び出し毎に MessageBox を作成、(クライアントが接続中の)有効な全ての MessageBox へメッセージを配信するようにしています。
server.js
const grpc = require('@grpc/grpc-js') const { GraphQLService } = require('./generated/proto/graphql_grpc_pb') const { Struct } = require('google-protobuf/google/protobuf/struct_pb') const { graphql, buildSchema, subscribe, parse } = require('graphql') const { v4: uuidv4 } = require('uuid') // GraphQL スキーマ定義 const schema = buildSchema(` enum Category { Standard Extra } input CreateItem { category: Category! value: Int! } type Item { id: ID! category: Category! value: Int! } type Mutation { create(input: CreateItem!): Item } type Query { find(id: ID!): Item } type Subscription { created: Item } `) class MessageBox { #promises = [] #resolves = [] #appendPromise = () => this.#promises.push( new Promise(res => this.#resolves.push(res)) ) publish(msg) { if (this.#resolves.length == 0) { this.#appendPromise() } this.#resolves.shift()(msg) } [Symbol.asyncIterator]() { return { next: async () => { console.log('*** asyncIterator next') if (this.#promises.length == 0) { this.#appendPromise() } const value = await this.#promises.shift() return { value, done: false } } } } } // クライアント毎の MessageBox を管理 class PubSub { #subscribes = [] publish(msg) { this.#subscribes.forEach(s => s.publish(msg)) } subscribe() { const sub = new MessageBox() this.#subscribes.push(sub) return sub } unsubscribe(sub) { this.#subscribes = this.#subscribes.filter(s => s != sub) } } const store = {} const pubsub = new PubSub() const root = { create: ({ input: { category, value } }) => { ・・・ }, find: ({ id }) => { ・・・ } } const server = new grpc.Server() server.addService(GraphQLService, { async query(call, callback) { ・・・ }, async subscription(call) { console.log('*** subscribed') try { const query = call.request.getQuery() const variables = call.request.getVariables().toJavaScript() const sub = pubsub.subscribe() call.on('cancelled', () => { console.log('*** unsubscribed') pubsub.unsubscribe(sub) }) const subRoot = { created: () => sub } // GraphQL の Subscription 処理 const aiter = await subscribe(schema, parse(query), subRoot, {}, variables) for await (const r of aiter) { // メッセージの配信 call.write(Struct.fromJavaScript(r)) } } catch(e) { console.error(e) call.destroy(e) } } }) server.bindAsync( ・・・ )
Subscription 用クライアント実装
Subscription を呼び出すクライアントは以下のようになります。
client_subscribe.js
const grpc = require('@grpc/grpc-js') const { QueryRequest } = require('./generated/proto/graphql_pb') const { GraphQLClient } = require('./generated/proto/graphql_grpc_pb') const { Value } = require('google-protobuf/google/protobuf/struct_pb') const client = new GraphQLClient( '127.0.0.1:50051', grpc.credentials.createInsecure() ) const req = new QueryRequest() req.setQuery(` subscription { created { id category } } `) req.setVariables(Value.fromJavaScript(null)) const stream = client.subscription(req) stream.on('data', msg => { const event = msg.toJavaScript() console.log(`*** received event = ${JSON.stringify(event)}`) }) stream.on('end', () => console.log('*** stream end')) stream.on('error', err => console.log(`*** stream error: ${err}`))
動作確認
server.js を実行した後、client_subscribe.js を 2つ実行して sample1 で作成した client.js を実行すると以下のようになりました。
server.js の出力結果
> node server.js start server: 50051 *** subscribed *** asyncIterator next *** subscribed *** asyncIterator next *** call create: category = Extra, value = 123 *** asyncIterator next *** asyncIterator next *** call find: a1 *** call find: item-5e3f81ed-774a-4f7f-afc5-000a2db34859
client_subscribe.js の出力結果
> node client_subscribe.js *** received event = {"data":{"created":{"category":"Extra","id":"item-5e3f81ed-774a-4f7f-afc5-000a2db34859"}}}
Node.js で gRPC を試す
「gRPC Server Reflection のクライアント処理」では Node.js で gRPC クライアントを実装しましたが、今回はサーバー側も実装してみます。
サンプルコードは http://github.com/fits/try_samples/tree/master/blog/20201115/
はじめに
gRPC on Node.js では、以下の 2通りの手法が用意されており、それぞれ使用するパッケージが異なります。
- (a) 動的コード生成 (
@grpc/proto-loader
パッケージを使用) - (b) 静的コード生成 (
grpc-tools
パッケージを使用)
更に、gRPC の実装ライブラリとして以下の 2種類が用意されており、どちらかを使う事になります。
- C-based Client and Server (
grpc
パッケージを使用) - Pure JavaScript Client (
@grpc/grpc-js
パッケージを使用)
@grpc/grpc-js
は現時点で Pure JavaScript Client と表現されていますが、クライアントだけではなくサーバーの実装にも使えます。
ここでは、Pure JavaScript 実装の @grpc/grpc-js
を使って、(a) と (b) の両方を試してみます。
サービス定義(proto ファイル)
gRPC のサービス定義として下記ファイルを使用します。
Unary RPC(1リクエスト / 1レスポンス)と Server streaming RPC(1リクエスト / 多レスポンス)、message の oneof
、google.protobuf.Empty
の扱い等を確認するような内容にしてみました。
proto/item.proto
syntax = "proto3"; import "google/protobuf/empty.proto"; package item; message AddItemRequest { string item_id = 1; uint64 price = 2; } message ItemRequest { string item_id = 1; } message Item { string item_id = 1; uint64 price = 2; } message ItemSubscribeRequest { } message AddedItem { string item_id = 1; uint64 price = 2; } message RemovedItem { string item_id = 1; } message ItemEvent { oneof event { AddedItem added = 1; RemovedItem removed = 2; } } service ItemManage { rpc AddItem(AddItemRequest) returns (google.protobuf.Empty); rpc RemoveItem(ItemRequest) returns (google.protobuf.Empty); rpc GetItem(ItemRequest) returns (Item); rpc Subscribe(ItemSubscribeRequest) returns (stream ItemEvent); }
(a) 動的コード生成(@grpc/proto-loader)
まずは、@grpc/proto-loader
を使った動的コード生成を試します。
インストール
@grpc/proto-loader
と @grpc/grpc-js
をインストールしておきます。
> npm install --save @grpc/proto-loader @grpc/grpc-js
サーバー実装
proto-loader の loadSync
関数 ※ で proto ファイルをロードした結果を grpc-js の loadPackageDefinition
で処理する事で型定義などを動的に生成します。
※ 非同期版の load 関数も用意されています
addService
で gRPC のサービス定義と処理をマッピングし、bindAsync
後に start
を呼び出す事でサーバー処理を開始します。
proto ファイルで定義したメッセージ型と同じフィールドを持つ JavaScript オブジェクトを gRPC のリクエストやレスポンスで使う事ができるようです。
Unary RPC の場合は、第二引数の callback
へ失敗時の値と成功時の値をそれぞれ渡す事で処理結果を返します。
任意のエラーを返したい場合は、code
で gRPC のステータスコードを、details
でエラー内容を指定します。
google.protobuf.Empty の箇所は null
もしくは undefined
で代用できるようです。
Server streaming RPC の場合は、第一引数(下記コードでは call
)の write
を呼び出す事で処理結果を返す事ができます。
クライアントが途中で切断したりすると cancelled
が発生するようになっており、cancelled 発生後に write
を呼び出してもエラー等は発生しないようになっていました。
server.js
const protoLoader = require('@grpc/proto-loader') const grpc = require('@grpc/grpc-js') const protoFile = './proto/item.proto' // proto ファイルのロード const pd = protoLoader.loadSync(protoFile) // gPRC 用の動的な型定義生成 const proto = grpc.loadPackageDefinition(pd) let store = [] let subscribeList = [] const findItem = itemId => store.find(i => i.itemId == itemId) const addItem = (itemId, price) => { if (findItem(itemId)) { return undefined } const item = { itemId, price } store.push(item) return item } const removeItem = itemId => { const item = findItem(itemId) if (item) { store = store.filter(i => i.itemId != item.itemId) } return item } // ItemEvent の配信 const publishEvent = event => { console.log(`*** publish event: ${JSON.stringify(event)}`) subscribeList.forEach(s => s.write(event)) } const server = new grpc.Server() // サービス定義と処理のマッピング server.addService(proto.item.ItemManage.service, { AddItem(call, callback) { const itemId = call.request.itemId const price = call.request.price const item = addItem(itemId, price) if (item) { callback() publishEvent({ added: { itemId, price }}) } else { const err = { code: grpc.status.ALREADY_EXISTS, details: 'exists item' } callback(err) } }, RemoveItem(call, callback) { const itemId = call.request.itemId if (removeItem(itemId)) { callback() publishEvent({ removed: { itemId }}) } else { const err = { code: grpc.status.NOT_FOUND, details: 'item not found' } callback(err) } }, GetItem(call, callback) { const itemId = call.request.itemId const item = findItem(itemId) if (item) { callback(null, item) } else { const err = { code: grpc.status.NOT_FOUND, details: 'item not found' } callback(err) } }, Subscribe(call) { console.log('*** subscribed') subscribeList.push(call) // クライアント切断時の処理 call.on('cancelled', () => { console.log('*** unsubscribed') subscribeList = subscribeList.filter(s => s != call) }) } }) server.bindAsync( '127.0.0.1:50051', grpc.ServerCredentials.createInsecure(), (err, port) => { if (err) { console.error(err) return } console.log(`start server: ${port}`) // 開始 server.start() } )
クライアント実装1
まずは、Unary RPC の API のみ(Subscribe 以外)を呼び出すクライアントを実装してみます。
loadPackageDefinition を実施するところまではサーバーと同じです。
Unary RPC はコールバック関数を伴ったメソッドとして用意されますが、このメソッドに Node.js の util.promisify
を直接適用すると不都合が生じたため、Promise 化は自前の関数(下記の promisify
)で実施するようにしました。
client.js
const protoLoader = require('@grpc/proto-loader') const grpc = require('@grpc/grpc-js') const protoFile = './proto/item.proto' const pd = protoLoader.loadSync(protoFile) const proto = grpc.loadPackageDefinition(pd) const id = process.argv[2] const client = new proto.item.ItemManage( '127.0.0.1:50051', grpc.credentials.createInsecure() ) // Unary RPC の Promise 化 const promisify = (obj, methodName) => args => new Promise((resolve, reject) => { obj[methodName](args, (err, res) => { if (err) { reject(err) } else { resolve(res) } }) }) const addItem = promisify(client, 'AddItem') const removeItem = promisify(client, 'RemoveItem') const getItem = promisify(client, 'GetItem') const printItem = item => { console.log(`id = ${item.itemId}, price = ${item.price}`) } const run = async () => { await addItem({ itemId: `${id}_item-1`, price: 100 }) const item1 = await getItem({ itemId: `${id}_item-1` }) printItem(item1) await addItem({ itemId: `${id}_item-2`, price: 20 }) const item2 = await getItem({ itemId: `${id}_item-2` }) printItem(item2) await addItem({ itemId: `${id}_item-1`, price: 50 }) .catch(err => console.error(`*** ERROR = ${err.message}`)) await removeItem({ itemId: `${id}_item-1` }) await getItem({ itemId: `${id}_item-1` }) .catch(err => console.error(`*** ERROR = ${err.message}`)) await removeItem({ itemId: `${id}_item-2` }) } run().catch(err => console.error(err))
クライアント実装2
次は Server streaming RPC のクライアント実装です。
client_subscribe.js
const protoLoader = require('@grpc/proto-loader') const grpc = require('@grpc/grpc-js') const protoFile = './proto/item.proto' const pd = protoLoader.loadSync(protoFile) const proto = grpc.loadPackageDefinition(pd) const client = new proto.item.ItemManage( '127.0.0.1:50051', grpc.credentials.createInsecure() ) const stream = client.Subscribe({}) // メッセージ受信時 stream.on('data', event => { console.log(`*** received event = ${JSON.stringify(event)}`) }) // サーバー終了時 stream.on('end', () => console.log('*** stream end')) stream.on('error', err => console.log(`*** stream error: ${err}`))
動作確認
server.js の実行後、client_subscribe.js を 2つ起動した後に client.js を実行してみます。
Server 実行
> node server.js start server: 50051
Client2-1 実行
> node client_subscribe.js
Client2-2 実行
> node client_subscribe.js
Client1 実行
> node client.js a1 id = a1_item-1, price = 100 id = a1_item-2, price = 20 *** ERROR = 6 ALREADY_EXISTS: exists item *** ERROR = 5 NOT_FOUND: item not found
この時点で出力内容は以下のようになりました。
Server 出力内容
> node server.js start server: 50051 *** subscribed *** subscribed *** publish event: {"added":{"itemId":"a1_item-1","price":100}} *** publish event: {"added":{"itemId":"a1_item-2","price":20}} *** publish event: {"removed":{"itemId":"a1_item-1"}} *** publish event: {"removed":{"itemId":"a1_item-2"}}
Client2-1、Client2-2 出力内容
> node client_subscribe.js *** received event = {"added":{"itemId":"a1_item-1","price":{"low":100,"high":0,"unsigned":true}}} *** received event = {"added":{"itemId":"a1_item-2","price":{"low":20,"high":0,"unsigned":true}}} *** received event = {"removed":{"itemId":"a1_item-1"}} *** received event = {"removed":{"itemId":"a1_item-2"}}
Client2-2 を Ctrl + c で終了後に、Server も Ctrl + c で終了すると以下のようになり、Client2-1 のプロセスは終了しました。
Server 出力内容
> node server.js start server: 50051 ・・・ *** publish event: {"removed":{"itemId":"a1_item-2"}} *** unsubscribed ^C
Client2-1 出力内容
> node client_subscribe.js ・・・ *** received event = {"removed":{"itemId":"a1_item-2"}} *** stream error: Error: 13 INTERNAL: Received RST_STREAM with code 2 (Internal server error) *** stream end
特に問題はなく、正常に動作しているようです。
(b) 静的コード生成(grpc-tools)
grpc-tools
を使った静的コード生成を試します。
インストールとコード生成
grpc-tools
、@grpc/grpc-js
、google-protobuf
をインストールしておきます。
> npm install --save-dev grpc-tools ・・・ > npm install --save @grpc/grpc-js google-protobuf ・・・
grpc-tools をインストールする事で使えるようになる grpc_tools_node_protoc
コマンドで proto ファイルからコードを生成します。
grpc_tools_node_protoc コマンドは内部的に protoc
コマンドを grpc_node_plugin
プラグインを伴って呼び出すようになっています。
--grpc_out
でサービス定義用のファイル xxx_grpc_pb.js が生成され、--js_out
でメッセージ定義用のファイルが生成されます。
サービス定義 xxx_grpc_pb.js は --js_out で import_style=commonjs
オプションを指定する事を前提としたコードになっています。※
※ import_style=commonjs オプションを指定した際に生成される xxx_pb.js を参照するようになっている
また、--grpc_out はデフォルトで grpc
パッケージ用のコードを生成するため、ここでは grpc_js
オプションを指定して @grpc/grpc-js
用のコードを生成するようにしています。
静的コード生成例(grpc_tools_node_protoc コマンド)
> mkdir generated > grpc_tools_node_protoc --grpc_out=grpc_js:generated --js_out=import_style=commonjs:generated proto/*.proto ・・・
サーバー実装
(a) の場合と処理内容に大きな違いはありませんが、リクエストやレスポンスでは生成された型を使います。
アクセサメソッド(getter、setter)で値の取得や設定ができるようになっており、new 時に配列として全フィールドの値を指定する事もできるようです。
JavaScript オブジェクトへ変換したい場合は toObject
メソッドを使用します。
addService でマッピングする際のメソッド名の一文字目が小文字になっています。
proto ファイルで定義したサービス名の後に Service
を付けた型(ここでは ItemManageService
)がサーバー処理用、Client
を付けた型がクライアント処理用の型定義となるようです。
server.js
const grpc = require('@grpc/grpc-js') const { Item, AddedItem, RemovedItem, ItemEvent } = require('./generated/proto/item_pb') const { ItemManageService } = require('./generated/proto/item_grpc_pb') const { Empty } = require('google-protobuf/google/protobuf/empty_pb') let store = [] let subscribeList = [] const findItem = itemId => store.find(i => i.getItemId() == itemId) const addItem = (itemId, price) => { if (findItem(itemId)) { return undefined } const item = new Item([itemId, price]) store.push(item) return item } const removeItem = itemId => { const item = findItem(itemId) if (item) { store = store.filter(i => i.getItemId() != item.getItemId()) } return item } const createAddedEvent = (itemId, price) => { const event = new ItemEvent() event.setAdded(new AddedItem([itemId, price])) return event } const createRemovedEvent = itemId => { const event = new ItemEvent() event.setRemoved(new RemovedItem([itemId])) return event } const publishEvent = event => { // toObject で JavaScript オブジェクトへ変換 console.log(`*** publish event: ${JSON.stringify(event.toObject())}`) subscribeList.forEach(s => s.write(event)) } const server = new grpc.Server() server.addService(ItemManageService, { addItem(call, callback) { const itemId = call.request.getItemId() const price = call.request.getPrice() const item = addItem(itemId, price) if (item) { callback(null, new Empty()) publishEvent(createAddedEvent(itemId, price)) } else { const err = { code: grpc.status.ALREADY_EXISTS, details: 'exists item' } callback(err) } }, removeItem(call, callback) { const itemId = call.request.getItemId() if (removeItem(itemId)) { callback(null, new Empty()) publishEvent(createRemovedEvent(itemId)) } else { const err = { code: grpc.status.NOT_FOUND, details: 'item not found' } callback(err) } }, getItem(call, callback) { const itemId = call.request.getItemId() const item = findItem(itemId) if (item) { callback(null, item) } else { const err = { code: grpc.status.NOT_FOUND, details: 'item not found' } callback(err) } }, subscribe(call) { console.log('*** subscribed') subscribeList.push(call) call.on('cancelled', () => { console.log('*** unsubscribed') subscribeList = subscribeList.filter(s => s != call) }) } }) server.bindAsync( ・・・ )
クライアント実装1
生成された型を使う点とメソッド名の先頭が小文字になっている点を除くと、基本的に (a) と同じです。
client.js
const grpc = require('@grpc/grpc-js') const { AddItemRequest, ItemRequest } = require('./generated/proto/item_pb') const { ItemManageClient } = require('./generated/proto/item_grpc_pb') const id = process.argv[2] const client = new ItemManageClient( '127.0.0.1:50051', grpc.credentials.createInsecure() ) const promisify = (obj, methodName) => args => new Promise((resolve, reject) => { ・・・ }) const addItem = promisify(client, 'addItem') const removeItem = promisify(client, 'removeItem') const getItem = promisify(client, 'getItem') const printItem = item => { console.log(`id = ${item.getItemId()}, price = ${item.getPrice()}`) } const run = async () => { await addItem(new AddItemRequest([`${id}_item-1`, 100])) const item1 = await getItem(new ItemRequest([`${id}_item-1`])) printItem(item1) await addItem(new AddItemRequest([`${id}_item-2`, 20])) const item2 = await getItem(new ItemRequest([`${id}_item-2`])) printItem(item2) await addItem(new AddItemRequest([`${id}_item-1`, 50])) .catch(err => console.error(`*** ERROR = ${err.message}`)) await removeItem(new ItemRequest([`${id}_item-1`])) await getItem(new ItemRequest([`${id}_item-1`])) .catch(err => console.error(`*** ERROR = ${err.message}`)) await removeItem(new ItemRequest([`${id}_item-2`])) } run().catch(err => console.error(err))
クライアント実装2
こちらも同様です。
client_subscribe.js
const grpc = require('@grpc/grpc-js') const { ItemSubscribeRequest } = require('./generated/proto/item_pb') const { ItemManageClient } = require('./generated/proto/item_grpc_pb') const client = new ItemManageClient( ・・・ ) const stream = client.subscribe(new ItemSubscribeRequest()) stream.on('data', event => { // toObject で JavaScript オブジェクトへ変換 console.log(`*** received event = ${JSON.stringify(event.toObject())}`) }) ・・・
動作確認
(a) と同じ操作を行った結果は以下のようになりました。
Server 出力内容
> node server.js start server: 50051 *** subscribed *** subscribed *** publish event: {"added":{"itemId":"a1_item-1","price":100}} *** publish event: {"added":{"itemId":"a1_item-2","price":20}} *** publish event: {"removed":{"itemId":"a1_item-1"}} *** publish event: {"removed":{"itemId":"a1_item-2"}} *** unsubscribed ^C
Client1 出力内容
> node client.js a1 id = a1_item-1, price = 100 id = a1_item-2, price = 20 *** ERROR = 6 ALREADY_EXISTS: exists item *** ERROR = 5 NOT_FOUND: item not found
Client2-1 出力内容
> node client_subscribe.js *** received event = {"added":{"itemId":"a1_item-1","price":100}} *** received event = {"added":{"itemId":"a1_item-2","price":20}} *** received event = {"removed":{"itemId":"a1_item-1"}} *** received event = {"removed":{"itemId":"a1_item-2"}} *** stream error: Error: 13 INTERNAL: Received RST_STREAM with code 2 (Internal server error) *** stream end
Client2-2 出力内容
> node client_subscribe.js *** received event = {"added":{"itemId":"a1_item-1","price":100}} *** received event = {"added":{"itemId":"a1_item-2","price":20}} *** received event = {"removed":{"itemId":"a1_item-1"}} *** received event = {"removed":{"itemId":"a1_item-2"}} ^C
(a) と (b) は同一の gRPC サービス(proto ファイル)を実装したものなので当然ですが、(a) と (b) を相互接続しても特に問題はありませんでした。
Rust で WASI 対応の WebAssembly を作成して実行
Rust で WASI 対応の WebAssembly を作って、スタンドアロン実行や Web ブラウザ上での実行を試してみました。
WASI(WebAssembly System Interface) は WebAssembly のコードを様々なプラットフォームで実行するためのインターフェースで、これに対応した WebAssembly であれば Web ブラウザ外で実行できます。
Rust で WASI 対応の WebAssembly を作るのは簡単で、ビルドターゲットに wasm32-wasi
を追加しておいて、rustc
や cargo build
によるビルド時に --target wasm32-wasi
を指定するだけでした。
wasm32-wasi の追加
> rustup target add wasm32-wasi
標準出力へ文字列を出力するだけの下記サンプルコードを --target wasm32-wasi
でビルドすると sample1.wasm
ファイルが作られました。
sample1.rs
fn main() { for i in 1..=3 { println!("count-{}", i); } print!("aaa"); print!("bbb"); }
WASI 対応の WebAssembly として sample1.rs をビルド
> rustc --target wasm32-wasi sample1.rs
なお、今回のビルドに使用した Rust のバージョンは以下の通りです。
- Rust 1.43.0
また、使用したソースコードは http://github.com/fits/try_samples/tree/master/blog/20200429/ に置いてあります。
(1) スタンドアロン用ランタイムで実行
sample1.wasm を WebAssembly のランタイム wasmtime と wasmer でそれぞれ実行してみます。
(1-a) wasmtime で実行
wasmtime v0.15.0 による sample1.wasm 実行結果
> wasmtime sample1.wasm count-1 count-2 count-3 aaabbb
(1-b) wasmer で実行
wasmer v0.16.2 による sample1.wasm 実行結果
> wasmer sample1.wasm count-1 count-2 count-3 aaabbb
どちらのランタイムでも問題なく実行できました。
(2) Web ブラウザ上で実行
次は、sample1.wasm を外部ライブラリ等を使わずに Web ブラウザ上で実行してみます。
主要な Web ブラウザや Node.js は JavaScript 用の WebAssembly API に対応済みのため、WebAssembly を実行可能です。
WASI 対応 WebAssembly の場合、実行対象の WebAssembly がインポートしている WASI の関数(の実装)を WebAssembly インスタンス化関数(WebAssembly.instantiate()
や WebAssembly.instantiateStreaming()
)の第二引数(引数名 importObject)として渡す必要があるようです。
(2-a) WebAssembly のインポート内容を確認
WebAssembly.compile()
関数で取得した WebAssembly.Module
オブジェクトを WebAssembly.Module.imports()
関数へ渡す事で、その WebAssembly がインポートしている内容を取得できます。
ここでは、以下の Node.js スクリプトを使って WebAssembly のインポート内容を確認してみました。
wasm_listup_imports.js (WebAssembly のインポート内容を出力)
const fs = require('fs') const wasmFile = process.argv[2] const run = async () => { const module = await WebAssembly.compile(fs.readFileSync(wasmFile)) const imports = WebAssembly.Module.imports(module) console.log(imports) } run().catch(err => console.error(err))
sample1.wasm へ適用してみると以下のような結果となりました。
インポート内容の出力結果(Node.js v12.16.2 で実行)
> node wasm_listup_imports.js sample1.wasm [ { module: 'wasi_snapshot_preview1', name: 'proc_exit', kind: 'function' }, { module: 'wasi_snapshot_preview1', name: 'fd_write', kind: 'function' }, { module: 'wasi_snapshot_preview1', name: 'fd_prestat_get', kind: 'function' }, { module: 'wasi_snapshot_preview1', name: 'fd_prestat_dir_name', kind: 'function' }, { module: 'wasi_snapshot_preview1', name: 'environ_sizes_get', kind: 'function' }, { module: 'wasi_snapshot_preview1', name: 'environ_get', kind: 'function' } ]
この結果から、sample1.wasm は以下のようにしてインスタンス化できる事になります。
WebAssembly インスタンス化の例
const importObject = { wasi_snapshot_preview1: { proc_exit: () => {・・・}, fd_write: () => {・・・}, fd_prestat_get: () => {・・・}, fd_prestat_dir_name: () => {・・・}, environ_sizes_get: () => {・・・}, environ_get: () => {・・・} } } WebAssembly.instantiate(・・・, importObject) ・・・
(2-b) fd_write 関数の実装
Rust の println!
で呼び出される WASI の関数は fd_write
なので、これを実装してみます。
fd_write の引数は 4つで、第一引数 fd
は出力先のファイルディスクリプタで標準出力の場合は 1、第二引数 iovs
は出力内容へのポインタ、第三引数 iovsLen
は出力内容の数、第四引数 nwritten
は出力済みのバイト数を設定するポインタとなっています。
なお、ポインタの対象は WebAssembly.instantiate()
で取得した WebAssembly のインスタンスに含まれている WebAssembly.Memory です。
出力内容は iovs ポインタの位置から 4バイト毎に以下のような並びで情報が格納されているようなので、これを基に出力対象の文字列を取得して出力する事になります。
- 1個目の出力内容の格納先ポインタ(4バイト)
- 1個目の出力内容のバイトサイズ(4バイト)
- ・・・
- iovsLen 個目の出力内容の格納先ポインタ(4バイト)
- iovsLen 個目の出力内容のバイトサイズ(4バイト)
何処まで処理を行ったか(出力したか)を返すために、nwritten ポインタの位置へ出力の完了したバイトサイズを設定します。
fd_write の実装例(wasmInstance には WebAssembly のインスタンスを設定)
・・・ fd_write: (fd, iovs, iovsLen, nwritten) => { const memory = wasmInstance.exports.memory.buffer const view = new DataView(memory) const sizeList = Array.from(Array(iovsLen), (v, i) => { const ptr = iovs + i * 8 // 出力内容の格納先のポインタ取得 const bufStart = view.getUint32(ptr, true) // 出力内容のバイトサイズを取得 const bufLen = view.getUint32(ptr + 4, true) const buf = new Uint8Array(memory, bufStart, bufLen) // 出力内容の String 化 const msg = String.fromCharCode(...buf) // 出力 console.log(msg) return buf.byteLength }) // 出力済みのバイトサイズ合計 const totalSize = sizeList.reduce((acc, v) => acc + v) // 出力済みのバイトサイズを設定 view.setUint32(nwritten, totalSize, true) return 0 }, ・・・
最終的な HTML は下記のようになりました。
fd_write 以外の WASI 関数を空実装にして main
関数を呼び出して実行するようにしていますが、WASI の仕様としては _start
関数を呼び出すのが正しいようです ※。(WASI Application ABI 参照)
※ _start 関数を使う場合、fd_prestat_get 等の実装も必要となります
WebAssembly がインポートしている WASI 関数の実装をインスタンス化時(WebAssembly.instantiateStreaming
)に渡す事になりますが、WASI の関数(fd_write 等)はインスタンス化の結果を使って処理する点に注意が必要です。
index.html(main 関数版)
<!DOCTYPE html> <html> <head> <meta charset="utf-8" /> </head> <body> <h1>WASI WebAssembly Sample</h1> <div id="res"></div> <script> const WASM_URL = './sample1.wasm' const wasiObj = { wasmInstance: null, importObject: { wasi_snapshot_preview1: { fd_write: (fd, iovs, iovsLen, nwritten) => { console.log(`*** call fd_write: fd=${fd}, iovs=${iovs}, iovsLen=${iovsLen}, nwritten=${nwritten}`) const memory = wasiObj.wasmInstance.exports.memory.buffer const view = new DataView(memory) const sizeList = Array.from(Array(iovsLen), (v, i) => { const ptr = iovs + i * 8 const bufStart = view.getUint32(ptr, true) const bufLen = view.getUint32(ptr + 4, true) const buf = new Uint8Array(memory, bufStart, bufLen) const msg = String.fromCharCode(...buf) // 出力 console.log(msg) document.getElementById('res').innerHTML += `<p>${msg}</p>` return buf.byteLength }) const totalSize = sizeList.reduce((acc, v) => acc + v) view.setUint32(nwritten, totalSize, true) return 0 }, proc_exit: () => {}, fd_prestat_get: () => {}, fd_prestat_dir_name: () => {}, environ_sizes_get: () => {}, environ_get: () => {} } } } WebAssembly.instantiateStreaming(fetch(WASM_URL), wasiObj.importObject) .then(res => { console.log(res) // fd_write で参照できるようにインスタンスを wasmInstance へ設定 wasiObj.wasmInstance = res.instance // main 関数の実行 wasiObj.wasmInstance.exports.main() }) .catch(err => console.error(err)) </script> </body> </html>
main 関数の代わりに _start 関数を呼び出す場合は下記のようになりました。
_start 関数版の場合、fd_prestat_get
の実装が重要となります ※。
※ fd_prestat_get を正しく実装していないと、 fd_prestat_get の呼び出しが延々と繰り返されてしまいました
今回はファイル等を使っていないので(file descriptor 3
以降を開いていない)、fd_prestat_get は単に 8
(WASI_EBADF, Bad file descriptor)を返すだけで良さそうです。
index2.html(_start 関数版)
・・・ <script> const WASM_URL = './sample1.wasm' const wasiObj = { wasmInstance: null, importObject: { wasi_snapshot_preview1: { ・・・ fd_prestat_get: () => 8, ・・・ } } } WebAssembly.instantiateStreaming(fetch(WASM_URL), wasiObj.importObject) .then(res => { console.log(res) wasiObj.wasmInstance = res.instance // _start 関数の実行 wasiObj.wasmInstance.exports._start() }) .catch(err => console.error(err)) </script> ・・・
(2-c) 実行
上記の .html ファイルを Web ブラウザで直接開いても WebAssembly を実行できないため、HTTP サーバーを使う事になります。
更に、Web ブラウザ上で WebAssembly を実行するには、.wasm ファイルを MIME Type application/wasm
で取得する必要があるようです。
Python の http.server は application/wasm
に対応していたため(Python 3.8.2 と 3.7.6 で確認)、以下のスクリプトで HTTP サーバーを立ち上げる事にしました。
web_server.py
import http.server import socketserver PORT = 8080 Handler = http.server.SimpleHTTPRequestHandler with socketserver.TCPServer(("", PORT), Handler) as httpd: print(f"start server port:{PORT}") httpd.serve_forever()
HTTP サーバー起動(Python 3.8.2 で実行)
> python web_server.py start server port:8080
Web ブラウザ(Chrome)で http://localhost:8080/index.html
へアクセスしたところ(index2.html でも同様)、sample1.wasm の実行を確認できました。
Chrome の実行結果
(3) Node.js で組み込み実行
次は、Node.js で WebAssembly を組み込み実行してみます。
(3-a) fd_write 実装
上記 index2.html の処理をベースにローカルの .wasm ファイルを読み込んで実行するようにしました。
sample1.wasm のインポート内容に合わせたものなので、インポート内容の異なる WebAssembly の実行には使えません。
wasm_run_sample.js
const fs = require('fs') const WASI_ESUCCESS = 0; const WASI_EBADF = 8; // Bad file descriptor const wasmFile = process.argv[2] const wasiObj = { wasmInstance: null, importObject: { wasi_snapshot_preview1: { fd_write: (fd, iovs, iovsLen, nwritten) => { ・・・ const sizeList = Array.from(Array(iovsLen), (v, i) => { ・・・ process.stdout.write(msg) return buf.byteLength }) ・・・ return WASI_ESUCCESS }, ・・・ fd_prestat_get: (fd, bufPtr) => { console.log(`*** call fd_prestat_get: fd=${fd}, bufPtr=${bufPtr}`) return WASI_EBADF }, ・・・ } } } const buf = fs.readFileSync(wasmFile) WebAssembly.instantiate(buf, wasiObj.importObject) .then(res => { wasiObj.wasmInstance = res.instance wasiObj.wasmInstance.exports._start() }) .catch(err => console.error(err))
実行結果(Node.js v12.16.2 で実行)
> node wasm_run_sample.js sample1.wasm *** call fd_prestat_get : fd=3, bufPtr=1048568 *** call fd_write: fd=1, iovs=1047968, iovsLen=1, nwritten=1047948 count-1 *** call fd_write: fd=1, iovs=1047968, iovsLen=1, nwritten=1047948 count-2 *** call fd_write: fd=1, iovs=1047968, iovsLen=1, nwritten=1047948 count-3 *** call fd_write: fd=1, iovs=1048432, iovsLen=1, nwritten=1048412 aaabbb
(3-b) Wasmer-JS 使用
Wasmer-JS の @wasmer/wasi
モジュールを使って、もっと汎用的に組み込み実行できるようにしてみます。
@wasmer/wasi インストール例
> npm install @wasmer/wasi
@wasmer/wasi の WASI
を使う事で、インポート内容に合わせた WASI 関数の取得や _start 関数の呼び出しを任せる事ができます。
run_wasmer_js/index.js
const fs = require('fs') const { WASI } = require('@wasmer/wasi') const wasmFile = process.argv[2] const wasi = new WASI() const run = async () => { const module = await WebAssembly.compile(fs.readFileSync(wasmFile)) // インポート内容に合わせた WASI 関数の実装を取得 const importObject = wasi.getImports(module) const instance = await WebAssembly.instantiate(module, importObject) // 実行 wasi.start(instance) } run().catch(err => console.error(err))
実行結果(Node.js v12.16.2 で実行)
> node index.js ../sample1.wasm count-1 count-2 count-3 aaabbb
(4) 標準出力以外の機能
最後に、現時点でどんな機能を使えるのか気になったので、いくつか試してみました。
まず、TcpStream
を使ったコードの wasm32-wasi ビルドは一応成功しました。
sample2.rs
use std::net::TcpStream; fn main() { let res = TcpStream::connect("127.0.0.1:8080"); println!("{:?}", res); }
sample2.rs ビルド
> rustc --target wasm32-wasi sample2.rs
ただし、実行してみると以下のような結果となりました。(wasmtime 以外で実行しても同じ)
wasmtime による sample2.wasm 実行結果
> wasmtime sample2.wasm Err(Custom { kind: Other, error: "operation not supported on wasm yet" })
Rust のソースコードで該当(すると思われる)箇所を確認してみると、unsupported()
を返すようになっていました。
https://github.com/rust-lang/rust/blob/master/src/libstd/sys/wasi/net.rs(2020/4/26 時点)
・・・ impl TcpStream { pub fn connect(_: io::Result<&SocketAddr>) -> io::Result<TcpStream> { unsupported() } ・・・ } ・・・
https://github.com/rust-lang/rust/blob/master/src/libstd/sys/wasi/ のソースを確認してみると、(スレッド系の処理等)他にも未対応の機能がいくつもありました。
一方で、環境変数・システム時間・スリープ処理は使えそうだったので、以下のコードで確認してみました。
sample3.rs
use std::env; use std::thread::sleep; use std::time::{ Duration, SystemTime }; fn main() { // 環境変数 SLEEP_TIME からスリープする秒数を取得(デフォルトは 1) let sleep_sec = env::var("SLEEP_TIME").ok() .and_then(|v| v.parse::<u64>().ok()) .unwrap_or(1); // システム時間の取得 let time = SystemTime::now(); println!("start: sleep {}s", sleep_sec); // スリープの実施 sleep(Duration::from_secs(sleep_sec)); // 経過時間の出力 match time.elapsed() { Ok(s) => println!("end: elapsed {}s", s.as_secs()), Err(e) => println!("error: {:?}", e), } }
sample3.rs のビルド
> rustc --target wasm32-wasi sample3.rs
wasmtime では正常に実行できましたが、wasmer は今のところスリープ処理に対応していないようでエラーとなりました。
ちなみに、環境変数はどちらのコマンドも --env
で指定できました。
wasmtime v0.15.0 による sample3.wasm 実行結果
> wasmtime --env SLEEP_TIME=5 sample3.wasm start: sleep 5s end: elapsed 5s
wasmer v0.16.2 による sample3.wasm 実行結果
> wasmer sample3.wasm --env SLEEP_TIME=5 start: sleep 5s thread 'main' panicked at 'not yet implemented: Polling not implemented for clocks yet', lib\wasi\src\syscalls\mod.rs:2373:21 note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace. Error: error: "unhandled trap at 7fffd474a799 - code #e06d7363: unknown exception code"