Dagger で Node.js アプリをビルドする

CI/CD のパイプラインを定義するためのツール Dagger を使って Node.js アプリのビルドを試してみました。

  • Dagger v0.2.7

今回使用したソースは こちら

sample1. echo の実施

まずは以下の処理を Dagger で実施してみます。

  • (1) ローカルの input.txt ファイルの内容を取得
  • (2) alpine イメージでコンテナを実行し、(1) に _echo を付けた文字列を echo して /tmp/output.txt へ出力
  • (3) コンテナの /tmp/output.txt の内容を取得し、ローカルの output.txt ファイルへ出力

Dagger プロジェクト作成

下記コマンドを実行して Dagger の実行に必要なファイルを作成しておきます。

$ dagger project init
$ dagger project update

project init で cue.mod が作成され、project update で cue.mod/pkg 内へ dagger.iouniverse.dagger.io のコアモジュールがインストールされます。

パイプライン定義

Dagger では CUE で CI/CD パイプラインを定義する事になっており、dagger.#Plan & { パイプラインの内容 } という風に記述します。※

CUE は一部 Go 言語風なので紛らわしいのですが、パワーアップした JSON のようなものだと捉えておくと理解し易いかもしれません。

 ※ 個人的に、dagger.#Plan の内容(cue.mod/pkg/dagger.io/dagger/plan.cue 参照)に対して
    { ・・・ } の内容をマージしているのだと解釈しています

パイプラインの処理は dagger.#Plan の actions で定義する事になっており、アクション名(下記の sample)は任意の名称を付けられるようです。(dagger do 時にアクション名を指定)

actions の処理はコンテナ内で実行する事になるので、ローカルのファイルや環境変数をコンテナとやりとりするために client が用意されています。

前述の (1) ~ (3) を定義すると以下のようになりました。

sample.cue
package sample1

import (
    "dagger.io/dagger"
    "dagger.io/dagger/core"
)

dagger.#Plan & {
    actions: {
        sample: {
            // (2) alpine イメージの pull
            _alpine: core.#Pull & {
                source: "alpine:3"
            }
            // (1) ローカルの input.txt ファイルの内容
            msg: client.filesystem."input.txt".read.contents
            // (2) コンテナで echo を実行し /tmp/output.txt へ出力 
            echo: core.#Exec & {
                input: _alpine.output
                args: [
                    "sh", "-c",
                    "echo -n \(msg)_echo > /tmp/output.txt",
                ]
                always: true
            }
            // (3) コンテナの /tmp/output.txt の内容を取得
            result: core.#ReadFile & {
                input: echo.output
                path: "/tmp/output.txt"
            }
        }
    }
    client: {
        filesystem: {
            // (1) ローカルの input.txt ファイルの内容を取得(ファイルの内容を取得するために string を設定)
            "input.txt": read: contents: string
            // (3) ローカルの output.txt ファイルへ出力
            "output.txt": write: contents: actions.sample.result.contents
        }
    }
}

コンテナの実行に core.#Exec、コンテナで出力したファイルの内容を取得するために core.#ReadFile を使っています。

client.filesystem.<パス>.read.contents の値を string とする事で ※、パスで指定したローカルファイルの内容を文字列として参照できます。

 ※ dagger.#FS とするとファイルシステムを参照できる

なお、前処理の output を次の処理の input に繋げていく事でパイプライン(処理の順序)を実現しているようです。

実行

それでは、実行してみます。

input.txt の内容を以下のようにしました。

input.txt
sample1

dagger do で任意のアクションを実行します。

実行例
$ dagger do sample
[✔] actions.sample                        ・・・
[✔] client.filesystem."input.txt".read    ・・・
[✔] actions.sample.echo                   ・・・
[✔] actions.sample.result                 ・・・
[✔] client.filesystem."output.txt".write  ・・・

正常に終了し、ローカルファイル output.txt が作成されたので内容を確認すると以下のようになりました。

output.txt の内容確認
$ cat output.txt
sample1_echo

sample2. Node.js アプリのビルド

それでは、本題の Node.js アプリのビルドです。

Node.js アプリ

ここでは、以下の TypeScript ファイルを esbuild を使って Node.js 用にビルドする事にします。

src/app.ts
import * as E from 'fp-ts/Either'

console.log(E.right('sample2'))

今回、esbuild による処理は package.json の scripts で定義しました。

package.json
{
  "scripts": {
    "build": "npx esbuild ./src/app.ts --bundle --platform=node --outfile=build/bundle.js"
  },
  "devDependencies": {
    "esbuild": "^0.14.37"
  },
  "dependencies": {
    "fp-ts": "^2.11.10"
  }
}

パイプライン定義

ここでは、以下の処理を実施します。

  • (1) ローカルの package.jsonsrc ディレクトリをコンテナの /app へコピー
  • (2) npm install を実行
  • (3) npm run build を実行
  • (4) コンテナの /app/build の内容をローカルの _build へ出力

sample1 ではファイル単位でローカル側とやりとりしましたが、こちらはディレクトリ単位で実施するようにしています。

client.filesystem.".".read.contentsdagger.#FS とする事でローカルのカレントディレクトリを参照し、include を使って package.jsonsrc 以外を除外しています。

ローカルのディレクトリとファイルをコンテナ環境へコピーするために core.#Copy、コンテナの /app/build の内容を参照するために core.#Subdir を使っています。

node_build.cue
package sample2

import (
    "dagger.io/dagger"
    "dagger.io/dagger/core"
)

dagger.#Plan & {
    actions: {
        build: {
            _node: core.#Pull & {
                source: "node:18-alpine"
            }
            // (1) ローカルの package.json と src ディレクトリをコンテナの /app へコピー
            src: core.#Copy & {
                input: _node.output
                contents: client.filesystem.".".read.contents
                dest: "/app"
            }
            // (2) npm install を実行(/app をカレントディレクトリに指定)
            deps: core.#Exec & {
                input: src.output
                workdir: "/app"
                args: ["npm", "install"]
                always: true
            }
            // (3) npm run build を実行(/app をカレントディレクトリに指定)
            runBuild: core.#Exec & {
                input: deps.output
                workdir: "/app"
                args: ["npm", "run", "build"]
                always: true
            }
            // (4) /app/build の内容を参照
            result: core.#Subdir & {
                input: runBuild.output
                path: "/app/build"
            }
        }
    }
    client: {
        filesystem: {
            // (1) ローカルの package.json と src を参照
            ".": read: {
                contents: dagger.#FS
                include: ["package.json", "src"]
            }
            // (4) ローカルの _build へ出力
            "_build": write: contents: actions.build.result.output
        }
    }
}

実行

dagger do で実行します。

実行例
$ dagger do build
・・・
[✔] actions.build                     ・・・
[✔] client.filesystem.".".read        ・・・
[✔] actions.build.src                 ・・・
[✔] actions.build.deps                ・・・
[✔] actions.build.runBuild            ・・・
[✔] actions.build.result              ・・・
[✔] client.filesystem."_build".write  ・・・

これにより、ローカルへ下記のファイルが出力されました。

_build/bundle.js
var __create = Object.create;
・・・
// src/app.ts
var E = __toESM(require_Either());
console.log(E.right("sample2"));

問題なく実行できました。

_build/bundle.js の動作確認例
$ cd _build
$ node bundle.js
{ _tag: 'Right', right: 'sample2' }

Amplify AppSync Simulator を直接使ってマッピングテンプレートを検証

Amplify AppSync Simulator は、AWS Amplify CLI に含まれているモジュールで、AppSync をローカル環境で動作確認するためのものです。(AppSync の GraphQL を処理する Web サーバーが起動するようになっている)

ソースコードを見てみたところ、AppSync 用の処理を適用した GraphQL.jsGraphQLSchema を作り、これを graphql 関数で実行するようになっていました。

そこで試しに、Amplify AppSync Simulator の API を直接使う事で、Amplify CLI を使わず Web サーバーも起動せずに、リクエスマッピングテンプレートの処理結果を出力する Node.js スクリプトを作ってみました。

今回のソースは http://github.com/fits/try_samples/tree/master/blog/20210315/

準備

amplify-appsync-simulator モジュールをインストールしておきます。

インストール例
> npm install amplify-appsync-simulator

Lambda 用マッピングテンプレートの処理

今回は、Lambda 用のマッピングテンプレートを処理してみます。

AppSync では下記を設定する事で GraphQL の処理を定義するようになっています。

  • (1) GraphQL スキーマから GraphQL API を作成
  • (2) データソース(実際の処理を行う部分)を追加
  • (3) リゾルバー ※ を作成
 ※ リゾルバーは、GraphQL API とデータソースとの紐づけ、
    入出力データの加工をマッピングテンプレートとして設定するようになっています

マッピングテンプレートは、VTL(Apache Velocity Template Language)の形式でリクエストやレスポンスの内容を加工して JSON 文字列を作ります。

Amplify AppSync Simulator では AmplifyAppSyncSimulator クラスの init メソッドに対して、上記 (1) ~ (3) と defaultAuthenticationType を設定した AmplifyAppSyncSimulatorConfig を渡す事で、AppSync 用の処理を適用した GraphQLSchema を内部的に作るようになっており、それを schema で取得する事が可能です。

defaultAuthenticationType の設定は、今回のように GraphQLSchema を直接利用するケースでは使われませんが、設定自体は必須のようです。

また、Amplify AppSync Simulator が対応しているデータソースは今のところ下記 3タイプのようで、タイプ毎に用意されている DataLoader が処理を担うようになっています。

各 DataLoader の load メソッドの第一引数にリクエスマッピングテンプレートの処理結果がそのまま渡ってくるようになっています。

NONE データソースの利用

NONE タイプ用 NoneDataLoader の load メソッドは、単純に payload の値を処理結果として返すような実装になっています。

そこで、この load メソッドを上書きする事でリクエスマッピングテンプレートの結果を出力するようにしてみました。

なお、AmplifyAppSyncSimulator が作成した GraphQLSchema を graphql 関数で処理する際のコンテキストに requestAuthorizationModeappsyncErrors の項目が最低限必要でした。

defaultAuthenticationType は必須のようなので API_KEY を設定していますが、実際には使われないので API_KEY の値を設定したりする必要はありませんでした。

sample1.js
const { AmplifyAppSyncSimulator } = require('amplify-appsync-simulator')
const { graphql } = require('graphql')
// GraphQL スキーマ定義
const schema = `
    type Item {
        id: ID!
        value: Int!
    }

    input FindInput {
        id: ID!
    }

    type Query {
        find(input: FindInput!): Item
    }
`
// データソースの設定
const dataSources = [
    { type: 'NONE', name: 'ItemFunc' }
]
// リゾルバーの設定
const resolvers = [
    {
        kind: 'UNIT', 
        // GraphQL の Query で定義した find フィールドと ItemFunc データソースとのマッピング
        typeName: 'Query', fieldName: 'find',
        dataSourceName: 'ItemFunc'
        // リクエストマッピングテンプレートの設定(GraphQL クエリの input をそのまま payload に設定)
        requestMappingTemplate: `
            {
                "version": "2018-05-29",
                "operation": "Invoke",
                "payload": $utils.toJson($ctx.args.input)
            }
        `,
        // レスポンスマッピングテンプレートの設定
        responseMappingTemplate: '$utils.toJson($context.result)'
    }
]
// 設定
const config = {
    schema: { content: schema },
    resolvers,
    dataSources,
    appSync: {
        // 下記の設定は必須
        defaultAuthenticationType: {
            authenticationType: 'API_KEY'
        }
    }
}

const simulator = new AmplifyAppSyncSimulator()

simulator.init(config)

// ItemFunc(NoneDataLoader)の load メソッド上書き
simulator.getDataLoader('ItemFunc').load = (req) => {
    console.log(`*** load: ${JSON.stringify(req)}`)

    return { id: req.payload.id, value: 123 }
}
// AmplifyAppSyncSimulator用 GraphQL の実行コンテキスト
const ctx = {
    requestAuthorizationMode: 'API_KEY',
    appsyncErrors: []
}
// GraphQL クエリ
const q = `
    query FindItem($id: ID!) {
        find(input: {id: $id}) {
            id
            value
        }
    }
`

const run = async () => {
    // GraphQL クエリ実行
    const r = await graphql(simulator.schema, q, null, ctx, {id: 'id1'})
    console.log(JSON.stringify(r))
}

run().catch(err => console.error(err))

実行結果は以下の通り。 リクエスマッピングテンプレートの処理結果が出力されています。

実行結果
> node sample1.js
*** load: {"version":"2018-05-29","operation":"Invoke","payload":{"id":"id1"}}
{"data":{"find":{"id":"id1","value":123}}}

AWS_LAMBDA データソースの利用

AWS_LAMBDA タイプの場合は、データソース設定の invoke へ設定した関数が呼び出されるようになっているので ※、これを使って Lambda の関数ハンドラーを直接実行する事もできます。

 ※ リクエストマッピングテンプレート処理結果の
    payload の値が引数として与えられるようになっています
lambda_func.js(関数ハンドラー)
exports.handler = async (event) => {
    console.log(`*** handler: ${JSON.stringify(event)}`)

    return { id: event.id, value: 234 }
}
sample2.js
const { AmplifyAppSyncSimulator } = require('amplify-appsync-simulator')
const { graphql } = require('graphql')

const schema = `
    ・・・
`

const dataSources = [
    // invoke へ lambda_func.js の handler を設定
    { type: 'AWS_LAMBDA', name: 'ItemFunc', invoke: require('./lambda_func.js').handler }
]

const resolvers = [
    ・・・
]

const config = {
    schema: { content: schema },
    resolvers,
    dataSources,
    appSync: {
        defaultAuthenticationType: {
            authenticationType: 'API_KEY'
        }
    }
}

const simulator = new AmplifyAppSyncSimulator()

simulator.init(config)

const ctx = {
    requestAuthorizationMode: 'API_KEY',
    appsyncErrors: []
}

const q = `
    query FindItem($id: ID!) {
        find(input: {id: $id}) {
            id
            value
        }
    }
`

const run = async () => {
    const r = await graphql(simulator.schema, q, null, ctx, {id: 'id2'})
    console.log(JSON.stringify(r))
}

run().catch(err => console.error(err))
実行結果
> node sample2.js
*** handler: {"id":"id2"}
{"data":{"find":{"id":"id2","value":234}}}

AWS Lambda のランタイム API サーバーを自作して関数ハンドラーをローカル実行

AWS Lambda では、(Lambda 関数の)ランタイムがランタイム API(ランタイムインターフェース)から呼び出しイベントを受け取って、関数ハンドラーを実行し、その結果をランタイム API へ返すような流れで処理が実施されているようです。

そこで、自作のランタイム API サーバーを使って、Go と Node.js でそれぞれ実装した AWS Lambda の関数ハンドラーを(Docker 等を使わずに)ローカル実行してみました。

ソースは http://github.com/fits/try_samples/tree/master/blog/20210211/

ランタイム API サーバーの実装

以下のようなオープンソースのランタイム API サーバーが提供されており、自作する必要は無かったりするのですが。

今回は、fastify を使ってランタイム API サーバーを実装してみました。

AWS Lambda ランタイム API のページから OpenAPI 仕様(2018-06-01)を入手できるので、このスキーマ定義を参考に必要最小限の処理だけを実装します。

まず、ランタイム API として下記を実装する事になります。(ただし、今回の用途では (d) を使いません)

  • (a) 次の呼び出し : GET /runtime/invocation/next
  • (b) 呼び出しレスポンス : POST /runtime/invocation/{AwsRequestId}/response
  • (c) 呼び出しエラー : POST /runtime/invocation/{AwsRequestId}/error
  • (d) 初期化エラー : POST /runtime/init/error

(b) ~ (d) は成功時に 202 ステータスコードを返す事になります。

(b) や (c) における AwsRequestId の値は、(a) のレスポンスヘッダーで通知する事になり、(a) のレスポンスヘッダーで最低限必要な項目は下記でした。(aws-lambda-go の場合)

  • Lambda-Runtime-Aws-Request-Id (AwsRequestId の値)
  • Lambda-Runtime-Deadline-Ms (実行期限)

一方で、ランタイム側は以下のような手順でランタイム API を呼び出すようになっているようです。

  • (1) (a) を GET してレスポンスが返ってくるのを待つ
  • (2) (a) からレスポンス(呼び出しイベント)が返ってくると関数ハンドラーを呼び出す
  • (3) 関数ハンドラーの呼び出し結果を (b) へ POST、エラーが発生した場合は (c) へ POST
  • (4) (1) へ戻る

つまり、(a) で固定の内容をすぐに返すような実装にしてしまうと、(1) ~ (4) が繰り返されてしまい不都合が生じます。

そこで、下記では /invoke を呼び出した際に (a) のレスポンスを返すようにしています。

また、Go と Node.js の関数ハンドラーを同時に扱うために、(a) が呼び出された際の reply を replies へ溜めておいて、forEach で処理するようにしています。

runtime-api-server/server.js (ランタイム API サーバー)
const logger = false
const fastify = require('fastify')({ logger })

const RUNTIME_PATH = '/2018-06-01/runtime'
const TIMEOUT = 5 * 1000

const port = process.env['SERVER_PORT'] ? 
    parseInt(process.env['SERVER_PORT']) : 8080

const replies = []

fastify.post('/invoke', (req, reply) => {
    const data = req.body

    console.log(`*** invoke: ${JSON.stringify(data)}`)

    replies.splice(0).forEach(r => {
        const deadline = Date.now() + TIMEOUT
        // (a) のレスポンスを返す
        r.code(200)
            .header('Lambda-Runtime-Aws-Request-Id', data['request-id'])
            .header('Lambda-Runtime-Deadline-Ms', deadline.toString())
            .header('Lambda-Runtime-Trace-Id', data['trace-id'])
            .header('Lambda-Runtime-Invoked-Function-Arn', data['function-arn'])
            .send(data.body)
    })

    reply.code(200).send({})
})

// Runtime API の実装

// (a) 次の呼び出し
fastify.get(`${RUNTIME_PATH}/invocation/next`, (req, reply) => {
    console.log('*** next')
    console.log(req.body)

    replies.push(reply)
})
// (b) 呼び出しレスポンス
fastify.post(`${RUNTIME_PATH}/invocation/:id/response`, (req, reply) => {
    console.log(`*** response: id = ${req.params['id']}`)
    console.log(req.body)

    reply.code(202).send({ status: '' })
})
// (c) 呼び出しエラー
fastify.post(`${RUNTIME_PATH}/invocation/:id/error`, (req, reply) => {
    console.log(`*** error: id = ${req.params['id']}`)
    console.log(req.body)

    reply.code(202).send({ status: '' })
})
// (d) 初期化エラー
fastify.post(`${RUNTIME_PATH}/init/error`, (req, reply) => {
    console.log('*** init error')
    console.log(req.body)

    reply.code(202).send({ status: '' })
})

fastify.listen(port)
    .then(r => console.log(`started: ${r}`))
    .catch(err => console.error(err))

実行

動作確認のために実行しておきます。

> cd runtime-api-server
> node server.js
started: http://127.0.0.1:8080

Go による関数ハンドラー

Go の場合、aws-lambda-go にランタイム API とやり取りを行うランタイムの処理が実装されています。

そのため、aws-lambda-go を使って実装した関数ハンドラーをローカル環境でビルドして実行するだけです。

接続するランタイム API サーバーのアドレスは AWS_LAMBDA_RUNTIME_API 環境変数で設定します。

ここでは、以下の関数ハンドラーを使用します。

sample_go/app.go (関数ハンドラー)
package main

import (
    "context"
    "fmt"

    "github.com/aws/aws-lambda-go/lambda"
    "github.com/aws/aws-lambda-go/lambdacontext"
)

func handler(ctx context.Context, event interface{}) (string, error) {
    c, _ := lambdacontext.FromContext(ctx)

    fmt.Println("*** call handler")
    fmt.Printf("event = %#v\n", event)
    fmt.Printf("context = %#v\n", c)

    return "sample-go", nil
}

func main() {
    lambda.Start(handler)
}

動作確認1

AWS_LAMBDA_RUNTIME_API 環境変数にランタイム API サーバーのアドレスを設定し、app.go をビルドして実行します。

ビルドと実行
> set AWS_LAMBDA_RUNTIME_API=localhost:8080
> cd sample_go
> go build app.go
> app

ランタイム API サーバー側の出力内容は下記のようになり、app.go から (a) の API が呼び出されている事を確認できます。

server.js(ランタイム API サーバー)の出力内容1
> node server.js
started: http://127.0.0.1:8080
*** next
null

/invoke を呼び出して、関数ハンドラーを実行してみます。

invoke の呼び出し1
$ curl -s -XPOST -H "Content-Type: application/json" http://localhost:8080/invoke -d '{"request-id":"a1", "function-arn":"sample", "body":{"name":"abc","value":1}}'
{}

app.go の出力内容は以下のようになり、関数ハンドラーの実行を確認できました。

app.go の出力内容
> app
*** call handler
event = map[string]interface {}{"name":"abc", "value":1}
context = &lambdacontext.LambdaContext{AwsRequestID:"a1", InvokedFunctionArn:"sample", Identity:lambdacontext.CognitoIdentity{CognitoIdentityID:"", CognitoIdentityPoolID:""}, ClientContext:lambdacontext.ClientContext{Client:lambdacontext.ClientApplication{InstallationID:"", AppTitle:"", AppVersionCode:"", AppPackageName:""}, Env:map[string]string(nil), Custom:map[string]string(nil)}}

ランタイム API サーバー側は以下のように、app.go からのレスポンスを受け取っている事が確認できます。

server.js(ランタイム API サーバー)の出力内容2
> node server.js
started: http://127.0.0.1:8080
・・・
*** invoke: {"request-id":"a1","function-arn":"sample","body":{"name":"abc","value":1}}
*** response: id = a1
sample-go
*** next
null

続いて、エラー時の挙動を確認するために下記を実施します。

invoke の呼び出し2
$ curl -s -XPOST -H "Content-Type: application/json" http://localhost:8080/invoke -d '{"request-id":"b2"}'
{}

app.go には何も出力されず、ランタイム API サーバー側は以下のようになり、エラーの内容を受け取っています。

server.js(ランタイム API サーバー)の出力内容3
> node server.js
started: http://127.0.0.1:8080
・・・
*** invoke: {"request-id":"b2"}
*** error: id = b2
{
  errorMessage: 'unexpected end of JSON input',
  errorType: 'SyntaxError'
}
*** next
null

Node.js による関数ハンドラー

Node.js の場合、通常は関数ハンドラーそのものを実装するだけなので、Go とは違ってランタイムが別途必要となります。

Node.js 用のランタイムとしては下記が提供されており、ネイティブコードなどを使った本格的な作りとなっています。(本番利用を想定していると思われる)

今回はこれを使わず、簡易的なランタイムを node-fetch を使って自作してみました。

node_runtime/runtime.js (Node.js 用の簡易ランタイム)
const fetch = require('node-fetch')

const runtimeUrl = `http://${process.env["AWS_LAMBDA_RUNTIME_API"]}/2018-06-01/runtime/invocation`

const appRoot = process.cwd()
const [moduleName, handlerName] = process.argv[2].split('.')

const app = require(`${appRoot}/${moduleName}`)

const envData = {
    functionVersion: process.env["AWS_LAMBDA_FUNCTION_VERSION"],
    functionName: process.env["AWS_LAMBDA_FUNCTION_NAME"],
    memoryLimitInMB: process.env["AWS_LAMBDA_FUNCTION_MEMORY_SIZE"],
    logGroupName: process.env["AWS_LAMBDA_LOG_GROUP_NAME"],
    logStreamName: process.env["AWS_LAMBDA_LOG_STREAM_NAME"],
}

const run = async () => {
    // (a) の API を呼び出す
    const nextRes = await fetch(`${runtimeUrl}/next`)

    const deadline = parseInt(nextRes.headers.get('lambda-runtime-deadline-ms'))

    const context = Object.assign(
        {
            getRemainingTimeInMillis: () => deadline - Date.now(),
            awsRequestId: nextRes.headers.get('lambda-runtime-aws-request-id'),
            invokedFunctionArn: 
                nextRes.headers.get('lambda-runtime-invoked-function-arn'),
            identity: {},
            clientContext: {},
        }, 
        envData
    )

    try {
        const event = await nextRes.json()
        // 関数ハンドラーの呼び出し
        const res = await app[handlerName](event, context)
        console.log(`* handler result: ${res}`)
        // (b) の API を呼び出す
        await fetch(
            `${runtimeUrl}/${context.awsRequestId}/response`, 
            {method: 'POST', body: res}
        )

    } catch (e) {
        // (c) の API を呼び出す
        await fetch(
            `${runtimeUrl}/${context.awsRequestId}/error`, 
            {method: 'POST', body: JSON.stringify({type: e.type, message: e.message})}
        )
    }
}

const loop = async () => {
    while (true) {
        await run()
    }
}

loop().catch(err => console.error(err))

ここでは、下記の関数ハンドラーを使う事にします。

sample_node/app.js (関数ハンドラー)
exports.handler = async (event, context) => {
    console.log('*** call handler')
    console.log(`event = ${JSON.stringify(event)}`)
    console.log(`context = ${JSON.stringify(context)}`)
    console.log(`remaining time = ${context.getRemainingTimeInMillis()}`)

    return 'sample-node'
}

動作確認2

runtime.js を使って app.js の handler を呼び出すように実行します。

実行
> set AWS_LAMBDA_RUNTIME_API=localhost:8080
> cd sample_node
> node ../node_runtime/runtime.js app.handler

ランタイム API サーバーの next が呼び出されました。

server.js(ランタイム API サーバー)の出力内容4
> node server.js
started: http://127.0.0.1:8080
・・・
*** next
null

/invoke を呼び出します。

invoke の呼び出し3
$ curl -s -XPOST -H "Content-Type: application/json" http://localhost:8080/invoke -d '{"request-id":"c3", "function-arn":"sample2", "body":{"name":"def","value":2}}'
{}

runtime.js + app.js の出力内容は以下のようになり、関数ハンドラーが呼び出されました。

runtime.js + app.js の出力内容
> node ../node_runtime/runtime.js app.handler
*** call handler
event = {"name":"def","value":2}
context = {"awsRequestId":"c3","invokedFunctionArn":"sample2","identity":{},"clientContext":{}}
remaining time = 4945
* handler result: sample-node

ランタイム API サーバーの出力も以下のようになり、問題は無さそうです。

server.js(ランタイム API サーバー)の出力内容5
> node server.js
started: http://127.0.0.1:8080
・・・
*** invoke: {"request-id":"c3","function-arn":"sample2","body":{"name":"def","value":2}}
・・・
*** response: id = c3
sample-node
*** next
null

エラーが発生するように /invoke を呼び出します。

invoke の呼び出し4
$ curl -s -XPOST -H "Content-Type: application/json" http://localhost:8080/invoke -d '{"request-id":"d4"}'
{}

こちらも問題は無さそうです。

server.js(ランタイム API サーバー)の出力内容6
> node server.js
started: http://127.0.0.1:8080
・・・
*** invoke: {"request-id":"d4"}
・・・
*** error: id = d4
{"type":"invalid-json","message":"invalid json response body at http://localhost:8080/2018-06-01/runtime/invocation/next reason: Unexpected end of JSON input"}
*** next
null

イベントベースで考える在庫管理モデル

従来のイベントソーシングのような手法だと、特定の State(というよりは Entity かも)を永続化するための手段として Event を用いるというような、あくまでも State 中心の発想になると思います。

そこで、ここでは下記のような Event 中心の発想に切り替えて、在庫管理(在庫数を把握するだけの単純なもの)を考えてみました。

  • State は本質ではなく、Event を解釈した結果にすぎない(解釈の仕方は様々)
  • Event を得たり、伝えたりするための手段として State を用いる

要するに、Event こそが重要で State(Entity とか)は取るに足らない存在だと(実験的に)考えてみたって事です。

従来のイベントソーシング 本件
State が目的、Event が手段 Event が目的、State が手段

なお、ここでイメージしている Event は、特定のドメインに依存しないような本質的なものです。

在庫管理

本件の在庫管理は、以下を把握するだけの単純なものです。

  • 何処に何の在庫(とりあえず現物のあるもの)がいくつあるか

実装コードは http://github.com/fits/try_samples/tree/master/blog/20201213/

1. 本質的なイベント

在庫管理で起こりそうなイベントを考えてみます。

在庫(数)は入庫と出庫の結果だと考えられるので、以下のようなイベントが考えられそうです。

  • 入庫イベント
  • 出庫イベント

また、シンプルに物が移動 ※ した結果が在庫なのだと考えると、(2地点間の)在庫の移動という形で抽象化できそうな気がします。

 ※ 概念的な移動も含める

そうすると、以下のようなイベントも考えられそうです。

  • 在庫移動の開始イベント
  • 在庫移動の終了(完了)イベント

ついでに、在庫の引当も区別して考えると ※、以下のようなイベントも考えられます。

  • 引当イベント
 ※ 引当用の場所へ移動するという事にするのであれば区別しなくてもよさそう

まとめると、とりあえずは以下のようなイベントが考えられそうです。

  • 在庫移動の開始イベント
  • 在庫移動の完了イベント
  • 在庫移動のキャンセルイベント
  • 引当イベント
  • 引当した場合の出庫イベント
  • 引当しなかった場合の出庫イベント
  • 入庫イベント

ついでに、引当や出庫などの成否をイベントとして明確に分けたい場合は、引当失敗イベント等の失敗イベントを別途設ければ良さそうな気がします。

2. イベント定義

これらのイベントを Rust と TypeScript で型定義してみました。

商品や在庫のロケーション(在庫の保管場所)の具体的な内容はどうでもよいので(ここで具体化する必要がない)、Generics の型変数で表現しておきます。

本質的に必要そうな最低限の情報のみを持たせ、余計な情報は取り除いておきます。※

 ※ 在庫移動を一意に限定する ID や日付のような
    メタデータ(と考えられるもの)に関しても除外しました

用語はとりあえず以下のようにしています。

  • 引当: assign
  • 出庫: ship
  • 入庫: arrive

何(item)をいくつ(qty)、何処(from)から何処(to)へ移動する予定なのかという情報を持たせて在庫の移動を開始するようにしてみました。

入出庫等で予定とは異なる内容になっても不都合が生じないように、それぞれのイベントに必要な情報を持たせています。

また、全体的に ADT(代数的データ型)を意識した内容にしています。

Rust で型定義したイベント

models/events.rs
pub enum StockMoveEvent<Item, Location, Quantity> {
    // 開始
    Started {
        item: Item, 
        qty: Quantity, 
        from: Location, 
        to: Location,
    },
    // 完了
    Completed,
    // キャンセル
    Cancelled,
    // 引当
    Assigned {
        item: Item, 
        from: Location,
        assigned: Quantity, 
    },
    // 出庫
    Shipped {
        item: Item, 
        from: Location,
        outgoing: Quantity, 
    },
    // 引当に対する出庫
    AssignShipped {
        item: Item, 
        from: Location,
        outgoing: Quantity,
        assigned: Quantity,
    },
    // 入庫
    Arrived {
        item: Item, 
        to: Location,
        incoming: Quantity, 
    },
}

TypeScript で型定義したイベント

models/events.ts
export interface StockMoveEventStarted<Item, Location, Quantity> {
    tag: 'stock-move-event.started'
    item: Item
    qty: Quantity
    from: Location
    to: Location
}

export interface StockMoveEventCompleted {
    tag: 'stock-move-event.completed'
}

export interface StockMoveEventCancelled {
    tag: 'stock-move-event.cancelled'
}

export interface StockMoveEventAssigned<Item, Location, Quantity> {
    tag: 'stock-move-event.assigned'
    item: Item
    from: Location
    assigned: Quantity
}

export interface StockMoveEventShipped<Item, Location, Quantity> {
    tag: 'stock-move-event.shipped'
    item: Item
    from: Location
    outgoing: Quantity
}

export interface StockMoveEventAssignShipped<Item, Location, Quantity> {
    tag: 'stock-move-event.assign-shipped'
    item: Item
    from: Location
    outgoing: Quantity
    assigned: Quantity
}

export interface StockMoveEventArrived<Item, Location, Quantity> {
    tag: 'stock-move-event.arrived'
    item: Item
    to: Location
    incoming: Quantity
}

export type StockMoveEvent<Item, Location, Quantity> = 
    StockMoveEventStarted<Item, Location, Quantity> | 
    StockMoveEventCompleted | 
    StockMoveEventCancelled | 
    StockMoveEventAssigned<Item, Location, Quantity> | 
    StockMoveEventShipped<Item, Location, Quantity> | 
    StockMoveEventAssignShipped<Item, Location, Quantity> | 
    StockMoveEventArrived<Item, Location, Quantity>

3. 在庫移動処理サンプル

上記で定義したイベントを以下のような(在庫移動の)ステートマシンで扱ってみます。※

f:id:fits:20201213191951p:plain

 ※ 本件の考え方では、
    (本質的な)イベントは特定の処理やルールになるべく依存していない事が重要なので、
    このステートマシン(イベントを扱う手段の一つでしかない)に対して
    特化しないように注意します
  • 在庫移動の状態遷移の基本パターンは 3通り
    • (a) 引当 -> 出庫 -> 入庫
    • (b) 出庫 -> 入庫
    • (c) 入庫
  • 入庫の失敗状態は無し(0個の入庫で代用)

(c) は出庫側の状況が不明なケースで入庫の記録だけを残すような用途を想定しています。

3.1 ステートマシンの実装

Rust と TypeScript でそれぞれ実装してみます。

この辺のレイヤーまでは、外界の都合(※1)から隔離しておきたいと考え、関数言語的な作りにしています。

イベントと同様に在庫移動や在庫を ADT(代数的データ型) で表現し、下記のような関数(+ ユーティリティ関数)を提供するだけの作りにしてみました。(※2)

(※1)フレームワーク、UI、永続化、非同期処理、排他制御やその他諸々の都合
(※2)こうしておくと、WebAssembly 等でコンポーネント化して
       再利用するなんて事も実現し易くなるかもしれませんし
  • (1) 初期状態を返す関数
  • (2) 現在の状態とアクションから次の状態とそれに伴って発生したイベントを返す関数(イメージとしては State -> Action -> Container<(State, Event)>
  • (3) ある時点の状態とそれ以降に起きたイベントの内容から任意の状態を復元して返す関数

なお、ここでは (2) のアクションに相当する部分は関数(と引数の一部)として実装しています。

また、(2) で状態遷移が発生しなかった場合に undefined を返すように実装していますが、実際は成功時と失敗時の両方を扱うようなコンテナ型(Rust の Result や Either モナドとか)で包むのが望ましいと思います。

ついでに、実装に際して以下のようなルールを加えています。

  • 引当、入庫、出庫のロケーションは開始時に予定したものを使用
  • 引当時にのみ在庫数を確認(残りの在庫をチェック)
  • 在庫のタイプは 2種類
    • 在庫数を管理するタイプ(引当分の在庫が余っている場合にのみ引当が成功、在庫数は入庫イベントと出庫イベントから算出)
    • 在庫数を管理しないタイプ(引当は常に成功、在庫数は管理せず実質的に無限)
  • 引当数や出庫数が 0 の場合は(引当や出庫の)失敗として扱う

引当はこの処理内における単なる数値上の予約であり、入出庫は実際の作業の結果を反映するような用途をとりあえず想定しています。

そのため、数値上の引当に成功しても実際の出庫が成功するとは限らず、数値上の在庫数以上の出庫が発生するようなケースも考えられるので、この処理ではそれらを許容するようにしています。※

 ※ 在庫の整合性等をどのように制御・調整するかは
    使う側(外側のレイヤー)に任せる

Rust による実装

ここで、商品(以下の ItemCode)や在庫ロケーション(以下の LocationCode)の具体的な型を決めていますが、これより外側のレイヤーに具体型を決めさせるようにした方が望ましいかもしれません。

models/stockmove.rs
use std::slice;

use super::events::StockMoveEvent;
// 商品を識別するための型
pub type ItemCode = String;
// 在庫ロケーションを識別するための型
pub type LocationCode = String;
pub type Quantity = u32;

pub trait Event<S> {
    type Output;

    fn apply_to(&self, state: S) -> Self::Output;
}

pub trait Restore<E> {
    fn restore(self, events: slice::Iter<E>) -> Self;
}
// 在庫の型定義
#[allow(dead_code)]
#[derive(Debug, Clone)]
pub enum Stock {
    Unmanaged { item: ItemCode, location: LocationCode },
    Managed { 
        item: ItemCode, 
        location: LocationCode, 
        qty: Quantity, 
        assigned: Quantity
    },
}
// 在庫に関する処理
#[allow(dead_code)]
impl Stock {
    pub fn unmanaged_new(item: ItemCode, location: LocationCode) -> Self {
        Self::Unmanaged { item, location }
    }

    pub fn managed_new(item: ItemCode, location: LocationCode) -> Self {
        Self::Managed { item, location, qty: 0, assigned: 0 }
    }

    pub fn eq_id(&self, item: &ItemCode, location: &LocationCode) -> bool {
        match self {
            Self::Managed { item: it, location: loc, .. } | 
            Self::Unmanaged { item: it, location: loc } => 
                it == item && loc == location
        }
    }
    // 在庫数のチェック
    pub fn is_sufficient(&self, v: Quantity) -> bool {
        match self {
            Self::Managed { qty, assigned, .. } =>
                v + assigned <= *qty,
            Self::Unmanaged { .. } => true, 
        }
    }

    fn update(&self, qty: Quantity, assigned: Quantity) -> Self {
        match self {
            Self::Managed { item, location, .. } => {
                Self::Managed {
                    item: item.clone(),
                    location: location.clone(),
                    qty,
                    assigned,
                }
            },
            Self::Unmanaged { .. } => self.clone(),
        }
    }

    fn update_qty(&self, qty: Quantity) -> Self {
        match self {
            Self::Managed { assigned, .. } => self.update(qty, *assigned),
            Self::Unmanaged { .. } => self.clone(),
        }
    }

    fn update_assigned(&self, assigned: Quantity) -> Self {
        match self {
            Self::Managed { qty, .. } => self.update(*qty, assigned),
            Self::Unmanaged { .. } => self.clone(),
        }
    }
}
// 在庫に対するイベントの適用
impl Event<Stock> for MoveEvent {
    type Output = Stock;

    fn apply_to(&self, state: Stock) -> Self::Output {
        match &state {
            Stock::Unmanaged { .. } => state,
            Stock::Managed { item: s_item, location: s_loc, 
                qty: s_qty, assigned: s_assigned } => {

                match self {
                    Self::Assigned { item, from, assigned } 
                    if s_item == item && s_loc == from => {

                        state.update_assigned(
                            s_assigned + assigned
                        )
                    },
                    Self::Shipped { item, from, outgoing }
                    if s_item == item && s_loc == from => {

                        state.update_qty(
                            s_qty.checked_sub(*outgoing).unwrap_or(0)
                        )
                    },
                    Self::AssignShipped { item, from, outgoing, assigned }
                    if s_item == item && s_loc == from => {

                        state.update(
                            s_qty.checked_sub(*outgoing).unwrap_or(0),
                            s_assigned.checked_sub(*assigned).unwrap_or(0),
                        )
                    },
                    Self::Arrived { item, to, incoming }
                    if s_item == item && s_loc == to => {

                        state.update_qty(
                            s_qty + incoming
                        )
                    },
                    _ => state,
                }
            },
        }
    }
}

#[derive(Debug, Default, Clone, PartialEq)]
pub struct StockMoveInfo {
    item: ItemCode,
    qty: Quantity,
    from: LocationCode,
    to: LocationCode,
}
// 在庫移動の型(状態)定義
#[allow(dead_code)]
#[derive(Debug, Clone, PartialEq)]
pub enum StockMove {
    Nothing,
    Draft { info: StockMoveInfo },
    Completed { info: StockMoveInfo, outgoing: Quantity, incoming: Quantity },
    Cancelled { info: StockMoveInfo },
    Assigned { info: StockMoveInfo, assigned: Quantity },
    Shipped { info: StockMoveInfo, outgoing: Quantity },
    Arrived { info: StockMoveInfo, outgoing: Quantity, incoming: Quantity },
    AssignFailed { info: StockMoveInfo },
    ShipmentFailed { info: StockMoveInfo },
}

type MoveEvent = StockMoveEvent<ItemCode, LocationCode, Quantity>;
type MoveResult = Option<(StockMove, MoveEvent)>;
// 在庫移動に関する処理
#[allow(dead_code)]
impl StockMove {
    // 初期状態の取得
    pub fn initial_state() -> Self {
        Self::Nothing
    }
    // 開始
    pub fn start(&self, item: ItemCode, qty: Quantity, 
        from: LocationCode, to: LocationCode) -> MoveResult {

        if qty < 1 {
            return None
        }

        let event = StockMoveEvent::Started {
            item: item.clone(), 
            qty: qty, 
            from: from.clone(), 
            to: to.clone()
        };

        self.apply_event(event)
    }
    // 引当
    pub fn assign(&self, stock: &Stock) -> MoveResult {
        if let Some(info) = self.info() {
            if stock.eq_id(&info.item, &info.from) {
                let assigned = if stock.is_sufficient(info.qty) {
                    info.qty
                } else {
                    0
                };

                return self.apply_event(
                    StockMoveEvent::Assigned {
                        item: info.item.clone(),
                        from: info.from.clone(),
                        assigned,
                    }
                )
            }
        }

        None
    }
    // 出庫
    pub fn ship(&self, outgoing: Quantity) -> MoveResult {
        let ev = match self {
            Self::Assigned { info, assigned } => {
                Some(StockMoveEvent::AssignShipped {
                    item: info.item.clone(),
                    from: info.from.clone(),
                    outgoing,
                    assigned: assigned.clone(),
                })
            },
            _ => {
                self.info()
                    .map(|i|
                        StockMoveEvent::Shipped {
                            item: i.item.clone(),
                            from: i.from.clone(),
                            outgoing,
                        }
                    )
            },
        };

        ev.and_then(|e| self.apply_event(e))
    }
    // 入庫
    pub fn arrive(&self, incoming: Quantity) -> MoveResult {
        self.info()
            .and_then(|i|
                self.apply_event(StockMoveEvent::Arrived {
                    item: i.item.clone(),
                    to: i.to.clone(),
                    incoming,
                })
            )
    }

    pub fn complete(&self) -> MoveResult {
        self.apply_event(StockMoveEvent::Completed)
    }

    pub fn cancel(&self) -> MoveResult {
        self.apply_event(StockMoveEvent::Cancelled)
    }

    fn info(&self) -> Option<StockMoveInfo> {
        match self {
            Self::Draft { info } |
            Self::Completed { info, .. } |
            ・・・
            Self::Arrived { info, .. } => {
                Some(info.clone())
            },
            Self::Nothing => None,
        }
    }

    fn apply_event(&self, event: MoveEvent) -> MoveResult {
        let new_state = event.apply_to(self.clone());

        Some((new_state, event))
            .filter(|r| r.0 != *self)
    }
}
// 在庫移動に対するイベントの適用
impl Event<StockMove> for MoveEvent {
    type Output = StockMove;

    fn apply_to(&self, state: StockMove) -> Self::Output {
        match self {
            Self::Started { item, qty, from, to } => {
                if state == StockMove::Nothing {
                    StockMove::Draft {
                        info: StockMoveInfo { 
                            item: item.clone(), 
                            qty: qty.clone(), 
                            from: from.clone(), 
                            to: to.clone(),
                        }
                    }
                } else {
                    state
                }
            },
            Self::Completed => {
                if let StockMove::Arrived { info, outgoing, incoming } = state {
                    StockMove::Completed { info: info.clone(), outgoing, incoming }
                } else {
                    state
                }
            },
            Self::Cancelled => {
                if let StockMove::Draft { info } = state {
                    StockMove::Cancelled { info: info.clone() }
                } else {
                    state
                }
            },
            Self::Assigned { item, from, assigned } => {
                match state {
                    StockMove::Draft { info } 
                    if info.item == *item && info.from == *from => {

                        if *assigned > 0 {
                            StockMove::Assigned { 
                                info: info.clone(), 
                                assigned: assigned.clone(),
                            }
                        } else {
                            StockMove::AssignFailed { info: info.clone() }
                        }
                    },
                    _ => state,
                }
            },
            Self::Shipped { item, from, outgoing } => {
                match state {
                    StockMove::Draft { info }
                    if info.item == *item && info.from == *from => {

                        if *outgoing > 0 {
                            StockMove::Shipped { 
                                info: info.clone(), 
                                outgoing: outgoing.clone(),
                            }
                        } else {
                            StockMove::ShipmentFailed { info: info.clone() }
                        }
                    },
                    _ => state,
                }
            },
            Self::AssignShipped { item, from, outgoing, .. } => {
                match state {
                    StockMove::Assigned { info, .. }
                    if info.item == *item && info.from == *from => {

                        if *outgoing > 0 {
                            StockMove::Shipped { 
                                info: info.clone(), 
                                outgoing: outgoing.clone(),
                            }
                        } else {
                            StockMove::ShipmentFailed { info: info.clone() }
                        }
                    },
                    _ => state,
                }
            },
            Self::Arrived { item, to, incoming } => {
                match state {
                    StockMove::Draft { info }
                    if info.item == *item && info.to == *to => {
                        StockMove::Arrived {
                            info: info.clone(),
                            outgoing: 0,
                            incoming: *incoming,
                        }
                    },
                    StockMove::Shipped { info, outgoing }
                    if info.item == *item && info.to == *to => {
                        StockMove::Arrived {
                            info: info.clone(),
                            outgoing,
                            incoming: *incoming,
                        }
                    },
                    _ => state,
                }
            },
        }
    }
}
// 在庫や在庫移動の状態復元
impl<S, E> Restore<&E> for S
where
    Self: Clone,
    E: Event<Self, Output = Self>,
{
    fn restore(self, events: slice::Iter<&E>) -> Self {
        events.fold(self, |acc, ev| ev.apply_to(acc.clone()))
    }
}

TypeScript による実装

実装の仕方が多少違っていますが、Rust 版の処理内容と概ね同じ(にしたつもり)です。

models/stockmove.ts
import { 
    StockMoveEvent, StockMoveEventShipped, StockMoveEventAssignShipped 
} from './events'

export type ItemCode = string
export type LocationCode = string
export type Quantity = number

export type MoveEvent = StockMoveEvent<ItemCode, LocationCode, Quantity>

type ShippedMoveEvent = StockMoveEventShipped<ItemCode, LocationCode, Quantity>
type AssignShippedMoveEvent = StockMoveEventAssignShipped<ItemCode, LocationCode, Quantity>

interface StockUnmanaged {
    tag: 'stock.unmanaged'
    item: ItemCode
    location: LocationCode
}

interface StockManaged {
    tag: 'stock.managed'
    item: ItemCode
    location: LocationCode
    qty: Quantity
    assigned: Quantity
}
// 在庫の型定義
export type Stock = StockUnmanaged | StockManaged
// 在庫に関する処理
export class StockAction {
    static newUnmanaged(item: ItemCode, location: LocationCode): Stock {
        return {
            tag: 'stock.unmanaged',
            item,
            location
        }
    }

    static newManaged(item: ItemCode, location: LocationCode): Stock {
        return {
            tag: 'stock.managed',
            item,
            location,
            qty: 0,
            assigned: 0
        }
    }
    // 在庫数のチェック
    static isSufficient(stock: Stock, qty: Quantity): boolean {
        switch (stock.tag) {
            case 'stock.unmanaged':
                return true
            case 'stock.managed':
                return qty + Math.max(0, stock.assigned) <= Math.max(0, stock.qty)
        }
    }
}
// 在庫の復元処理
export class StockRestore {
    static restore(state: Stock, events: MoveEvent[]): Stock {
        return events.reduce(StockRestore.applyTo, state)
    }
    // 在庫に対するイベントの適用
    private static applyTo(state: Stock, event: MoveEvent): Stock {
        if (state.tag == 'stock.managed') {
            switch (event.tag) {
                case 'stock-move-event.assigned':
                    if (state.item == event.item && state.location == event.from) {
                        return StockRestore.updateAssigned(
                            state, 
                            state.assigned + event.assigned
                        )
                    }
                    break
                case 'stock-move-event.assign-shipped':
                    if (state.item == event.item && state.location == event.from) {
                        return StockRestore.updateStock(
                            state,
                            state.qty - event.outgoing,
                            state.assigned - event.assigned
                        )
                    }
                    break
                ・・・
            }
        }
        return state
    }

    private static updateStock(stock: Stock, qty: Quantity, assigned: Quantity): Stock {
        switch (stock.tag) {
            case 'stock.unmanaged':
                return stock
            case 'stock.managed':
                return {
                    tag: stock.tag,
                    item: stock.item,
                    location: stock.location,
                    qty,
                    assigned
                }
        }
    }

    ・・・
}

interface StockMoveInfo {
    item: ItemCode
    qty: Quantity
    from: LocationCode
    to: LocationCode
}

interface StockMoveNothing {
    tag: 'stock-move.nothing'
}

interface StockMoveDraft {
    tag: 'stock-move.draft'
    info: StockMoveInfo
}

・・・

// 在庫移動の型(状態)定義
export type StockMove = 
    StockMoveNothing | StockMoveDraft | StockMoveCompleted | 
    StockMoveCancelled | StockMoveAssigned | StockMoveShipped |
    StockMoveArrived | StockMoveAssignFailed | StockMoveShipmentFailed


export type StockMoveResult = [StockMove, MoveEvent] | undefined
// 在庫移動に関する処理
export class StockMoveAction {
    // 初期状態を取得
    static initialState(): StockMove {
        return { tag: 'stock-move.nothing' }
    }
    // 開始
    static start(state: StockMove, item: ItemCode, qty: Quantity, 
        from: LocationCode, to: LocationCode): StockMoveResult {

        if (qty < 1) {
            return undefined
        }

        const event: MoveEvent = {
            tag: 'stock-move-event.started',
            item,
            qty,
            from,
            to
        }

        return StockMoveAction.applyTo(state, event)
    }
    // 引当
    static assign(state: StockMove, stock: Stock): StockMoveResult {
        const info = StockMoveAction.info(state)

        if (info && info.item == stock.item && info.from == stock.location) {
            const assigned = 
                (stock && StockAction.isSufficient(stock, info.qty)) ? info.qty : 0
            
            const event: MoveEvent = {
                tag: 'stock-move-event.assigned',
                item: info.item,
                from: info.from,
                assigned
            }

            return StockMoveAction.applyTo(state, event)
        }

        return undefined
    }
    // 出庫
    static ship(state: StockMove, outgoing: Quantity): StockMoveResult {
        if (outgoing < 0) {
            return undefined
        }

        const event = StockMoveAction.toShippedEvent(state, outgoing)

        return event ? StockMoveAction.applyTo(state, event) : undefined
    }
    // 入庫
    static arrive(state: StockMove, incoming: Quantity): StockMoveResult {
        if (incoming < 0) {
            return undefined
        }

        const info = StockMoveAction.info(state)

        if (info) {
            const event: MoveEvent = {
                tag: 'stock-move-event.arrived',
                item: info.item,
                to: info.to,
                incoming
            }

            return StockMoveAction.applyTo(state, event)
        }
        return undefined
    }

    ・・・

    static info(state: StockMove) {
        if (state.tag != 'stock-move.nothing') {
            return state.info
        }

        return undefined
    }

    private static applyTo(state: StockMove, event: MoveEvent): StockMoveResult {
        const nextState = StockMoveRestore.restore(state, [event])

        return (nextState != state) ? [nextState, event] : undefined
    }

    private static toShippedEvent(state: StockMove, outgoing: number): MoveEvent | undefined {

        const info = StockMoveAction.info(state)

        if (info) {
            if (state.tag == 'stock-move.assigned') {
                return {
                    tag: 'stock-move-event.assign-shipped',
                    item: info.item,
                    from: info.from,
                    assigned: state.assigned,
                    outgoing
                }
            }
            else {
                return {
                    tag: 'stock-move-event.shipped',
                    item: info.item,
                    from: info.from,
                    outgoing
                }
            }
        }
        return undefined
    }
}
// 在庫移動の復元処理
export class StockMoveRestore {
    static restore(state: StockMove, events: MoveEvent[]): StockMove {
        return events.reduce(StockMoveRestore.applyTo, state)
    }
    // 在庫移動に対するイベントの適用
    private static applyTo(state: StockMove, event: MoveEvent): StockMove {
        switch (state.tag) {
            case 'stock-move.nothing':
                if (event.tag == 'stock-move-event.started') {
                    return {
                        tag: 'stock-move.draft',
                        info: {
                            item: event.item,
                            qty: event.qty,
                            from: event.from,
                            to: event.to
                        }
                    }
                }
                break
            case 'stock-move.draft':
                return StockMoveRestore.applyEventToDraft(state, event)
            case 'stock-move.assigned':
                if (event.tag == 'stock-move-event.assign-shipped') {
                    return StockMoveRestore.applyShipped(state, event)
                }
                break
            case 'stock-move.shipped':
                if (event.tag == 'stock-move-event.arrived' &&
                    state.info.item == event.item && 
                    state.info.to == event.to) {

                    return {
                        tag: 'stock-move.arrived',
                        info: state.info,
                        outgoing: state.outgoing,
                        incoming: event.incoming
                    }
                }
                break
            case 'stock-move.arrived':
                if (event.tag == 'stock-move-event.completed') {
                    return {
                        tag: 'stock-move.completed',
                        info: state.info,
                        outgoing: state.outgoing,
                        incoming: state.incoming
                    }
                }
                break
            case 'stock-move.completed':
            case 'stock-move.cancelled':
            case 'stock-move.assign-failed':
            case 'stock-move.shipment-failed':
                break
        }
        return state
    }

    private static applyShipped(state: StockMoveDraft | StockMoveAssigned, 
        event: ShippedMoveEvent | AssignShippedMoveEvent): StockMove {

        if (state.info.item == event.item && state.info.from == event.from) {
            if (event.outgoing > 0) {
                return {
                    tag: 'stock-move.shipped',
                    info: state.info,
                    outgoing: event.outgoing
                }
            }
            else {
                return {
                    tag: 'stock-move.shipment-failed',
                    info: state.info
                }
            }
        }
        return state
    }

    private static applyEventToDraft(state: StockMoveDraft, event: MoveEvent): StockMove {

        switch (event.tag) {
            case 'stock-move-event.cancelled':
                return {
                    tag: 'stock-move.cancelled',
                    info: state.info
                }
            case 'stock-move-event.assigned':
                if (state.info.item == event.item && state.info.from == event.from) {
                    if (event.assigned > 0) {
                        return {
                            tag: 'stock-move.assigned',
                            info: state.info,
                            assigned: event.assigned
                        }
                    }
                    else {
                        return {
                            tag: 'stock-move.assign-failed',
                            info: state.info
                        }
                    }
                }
                break
            case 'stock-move-event.shipped':
                return StockMoveRestore.applyShipped(state, event)
            case 'stock-move-event.arrived':
                if (state.info.item == event.item && state.info.to == event.to) {
                    return {
                        tag: 'stock-move.arrived',
                        info: state.info,
                        outgoing: 0,
                        incoming: Math.max(event.incoming, 0)
                    }
                }
                break
        }

        return state
    }
}

3.2 GraphQL 化 + MongoDB へ永続化

ついでに、前述のステートマシン(TypeScript 実装版)を Apollo Server で GraphQL 化し、MongoDB へ永続化するようにしてみました。

index.ts
import { ApolloServer, gql } from 'apollo-server'
import { v4 as uuidv4 } from 'uuid'
import { MongoClient, Collection } from 'mongodb'

import {
    ItemCode, LocationCode, MoveEvent,
    StockMoveAction, StockMoveRestore, StockMove, StockMoveResult,
    StockAction, StockRestore, Stock
} from './models'

const mongoUrl = 'mongodb://localhost'
const dbName = 'stockmoves'
const colName = 'events'
const stocksColName = 'stocks'

type MoveId = string
type Revision = number
// MongoDB へ保存するイベント内容
interface StoredEvent {
    move_id: MoveId
    revision: Revision
    item: ItemCode
    from: LocationCode
    to: LocationCode
    event: MoveEvent
}

interface RestoredStockMove {
    state: StockMove
    revision: Revision
}
// MongoDB への永続化処理
class Store {
    ・・・
    async loadStock(item: ItemCode, location: LocationCode): Promise<Stock | undefined> {
        const id = this.stockId(item, location)
        const stock = await this.stocksCol.findOne({ _id: id })

        if (!stock) {
            return undefined
        }

        const query = {
            '$and': [
                { item },
                { '$or': [
                    { from: location },
                    { to: location }
                ]}
            ]
        }

        const events = await this.eventsCol
            .find(query)
            .map(r => r.event)
            .toArray()

        return StockRestore.restore(stock, events)
    }

    async saveStock(stock: Stock): Promise<void> {
        const id = this.stockId(stock.item, stock.location)

        const res = await this.stocksCol.updateOne(
            { _id: id },
            { '$setOnInsert': stock },
            { upsert: true }
        )

        if (res.upsertedCount == 0) {
            return Promise.reject('conflict stock')
        }
    }

    async loadMove(moveId: MoveId): Promise<RestoredStockMove | undefined> {
        const events: StoredEvent[] = await this.eventsCol
            .find({ move_id: moveId })
            .sort({ revision: 1 })
            .toArray()

        const state = StockMoveAction.initialState()
        const revision = events.reduce((acc, e) => Math.max(acc, e.revision), 0)

        const res = StockMoveRestore.restore(state, events.map(e => e.event))

        return (res == state) ? undefined : { state: res, revision }
    }

    async saveEvent(event: StoredEvent): Promise<void> {
        const res = await this.eventsCol.updateOne(
            { move_id: event.move_id, revision: event.revision },
            { '$setOnInsert': event },
            { upsert: true }
        )

        if (res.upsertedCount == 0) {
            return Promise.reject(`conflict event revision=${event.revision}`)
        }
    }

    private stockId(item: ItemCode, location: LocationCode): string {
        return `${item}/${location}`
    }
}
// GraphQL スキーマ定義
const typeDefs = gql(`
    type StockMoveInfo {
        item: ID!
        qty: Int!
        from: ID!
        to: ID!
    }

    interface StockMove {
        id: ID!
        info: StockMoveInfo!
    }

    type DraftStockMove implements StockMove {
        id: ID!
        info: StockMoveInfo!
    }

    type CompletedStockMove implements StockMove {
        id: ID!
        info: StockMoveInfo!
        outgoing: Int!
        incoming: Int!
    }

    ・・・

    interface Stock {
        item: ID!
        location: ID!
    }

    type UnmanagedStock implements Stock {
        item: ID!
        location: ID!
    }

    type ManagedStock implements Stock {
        item: ID!
        location: ID!
        qty: Int!
        assigned: Int!
    }

    input CreateStockInput {
        item: ID!
        location: ID!
    }

    input StartMoveInput {
        item: ID!
        qty: Int!
        from: ID!
        to: ID!
    }

    type Query {
        findStock(item: ID!, location: ID!): Stock
        findMove(id: ID!): StockMove
    }

    type Mutation {
        createManaged(input: CreateStockInput!): ManagedStock
        createUnmanaged(input: CreateStockInput!): UnmanagedStock

        start(input: StartMoveInput!): StockMove
        assign(id: ID!): StockMove
        ship(id: ID!, outgoing: Int!): StockMove
        arrive(id: ID!, incoming: Int!): StockMove
        complete(id: ID!): StockMove
        cancel(id: ID!): StockMove
    }
`)

const toStockMoveForGql = (id: MoveId, state: StockMove | undefined) => {
    if (state) {
        return { id, ...state }
    }
    return undefined
}

type MoveAction = (state: StockMove) => StockMoveResult

const doMoveAction = async (store: Store, rs: RestoredStockMove | undefined, 
    id: MoveId, action: MoveAction) => {

    if (rs) {
        const res = action(rs.state)

        if (res) {
            const [mv, ev] = res
            const info = StockMoveAction.info(mv)

            if (info) {
                const event = { 
                    move_id: id, 
                    revision: rs.revision + 1,
                    item: info.item,
                    from: info.from,
                    to: info.to,
                    event: ev
                }

                await store.saveEvent(event)

                return toStockMoveForGql(id, mv)
            }
        }
    }
    return undefined
}
// GraphQL 処理の実装
const resolvers = {
    Stock: {
        __resolveType: (obj, ctx, info) => {
            if (obj.tag == 'stock.managed') {
                return 'ManagedStock'
            }
            return 'UnmanagedStock'
        }
    },
    StockMove: {
        __resolveType: (obj: StockMove, ctx, info) => {
            switch (obj.tag) {
                case 'stock-move.draft':
                    return 'DraftStockMove'
                case 'stock-move.completed':
                    return 'CompletedStockMove'
                ・・・
                case 'stock-move.shipment-failed':
                    return 'ShipmentFailedStockMove'
            }
            return undefined
        }
    },
    Query: {
        findStock: async (parent, { item, location }, { store }, info) => {
            return store.loadStock(item, location)
        },
        findMove: async (parent, { id }, { store }, info) => {
            const res = await store.loadMove(id)
            return toStockMoveForGql(id, res?.state)
        }
    },
    Mutation: {
        createManaged: async (parent, { input: { item, location } }, { store }, info) => {
            const s = StockAction.newManaged(item, location)

            await store.saveStock(s)

            return s
        },
        ・・・
        start: async (parent, { input: { item, qty, from, to } }, { store }, info) => {
            const rs = { state: StockMoveAction.initialState(), revision: 0 }
            const id = `move-${uuidv4()}`

            return doMoveAction(
                store, rs, id, 
                s => StockMoveAction.start(s, item, qty, from, to)
            )
        },
        assign: async(parent, { id }, { store }, info) => {
            const rs = await store.loadMove(id)

            if (rs) {
                const info = StockMoveAction.info(rs.state)

                if (info) {
                    const stock = await store.loadStock(info.item, info.from)

                    return doMoveAction(
                        store, rs, id, 
                        s => StockMoveAction.assign(s, stock)
                    )
                }
            }
            return undefined
        },
        ship: async(parent, { id, outgoing }, { store }, info) => {
            const rs = await store.loadMove(id)

            return doMoveAction(
                store, rs, id, 
                s => StockMoveAction.ship(s, outgoing)
            )
        },
        ・・・
    }
}

const run = async () => {
    const mongo = await MongoClient.connect(mongoUrl, { useUnifiedTopology: true })
    const eventsCol = mongo.db(dbName).collection(colName)
    const stocksCol = mongo.db(dbName).collection(stocksColName)

    const store = new Store(eventsCol, stocksCol)

    const server = new ApolloServer({
        typeDefs, 
        resolvers, 
        context: {
            store
        }
    })

    const res = await server.listen()

    console.log(res.url)
}

run().catch(err => console.error(err))

クライアント実装例

以下のように GraphQL クエリを送信する事で操作できます。

client/create_stock.ts (在庫の作成)
import { request, gql } from 'graphql-request'

const endpoint = 'http://localhost:4000'

const item = process.argv[2]
const location = process.argv[3]

const q1 = gql`
    mutation CreateUnmanaged($item: ID!, $location: ID!) {
        createUnmanaged(input: { item: $item, location: $location }) {
            __typename
            item
            location
        }
    }
`

const q2 = gql`
    mutation CreateManaged($item: ID!, $location: ID!) {
        createManaged(input: { item: $item, location: $location }) {
            __typename
            item
            location
        }
    }
`

const query = process.argv.length > 4 ? q1 : q2

request(endpoint, query, { item, location })
    .then(r => console.log(r))
    .catch(err => console.error(err))
create_stock.ts 実行例
> ts-node create_stock.ts item-1 store-A
{
  createManaged: { __typename: 'ManagedStock', item: 'item-1', location: 'store-A' }
}
client/start_move.ts (在庫移動の開始)
・・・
const item = process.argv[2]
const qty = parseInt(process.argv[3])
const from = process.argv[4]
const to = process.argv[5]

const query = gql`
    mutation {
        start(input: { item: "${item}", qty: ${qty}, from: "${from}", to: "${to}" }) {
            __typename
            id
            info {
                item
                qty
                from
                to
            }
        }
    }
`

request(endpoint, query)
    .then(r => console.log(r))
    .catch(err => console.error(err))
start_move.ts 実行例
> ts-node start_move.ts item-1 5 store-A store-B
{
  start: {
    __typename: 'DraftStockMove',
    id: 'move-cfa1fc9c-b599-4854-8385-207cbb77e8a3',
    info: { item: 'item-1', qty: 5, from: 'store-A', to: 'store-B' }
  }
}
client/find_move.ts (在庫移動の取得)
・・・
const id = process.argv[2]

const query = gql`
    {
        findMove(id: "${id}") {
            __typename
            id
            info {
                item
                qty
                from
                to
            }
            ... on AssignedStockMove {
                assigned
            }
            ... on ShippedStockMove {
                outgoing
            }
            ... on ArrivedStockMove {
                outgoing
                incoming
            }
            ... on CompletedStockMove {
                outgoing
                incoming
            }
        }
    }
`

request(endpoint, query)
    .then(r => console.log(r))
    .catch(err => console.error(err))
find_move.ts 実行例
> ts-node find_move.ts move-cfa1fc9c-b599-4854-8385-207cbb77e8a3
{
  findMove: {
    __typename: 'CompletedStockMove',
    id: 'move-cfa1fc9c-b599-4854-8385-207cbb77e8a3',
    info: { item: 'item-1', qty: 5, from: 'store-A', to: 'store-B' },
    outgoing: 5,
    incoming: 5
  }
}

Node.js で GraphQL over gRPC 的な事をやってみる

gRPC 上で GraphQL を扱う GraphQL over gRPC 的な処理を Node.js で試しに実装してみました。

今回のコードは http://github.com/fits/try_samples/tree/master/blog/20201124/

はじめに

GraphQL はクエリ言語なので基本的に通信プロトコルには依存していません。

Web フロントエンドの用途では Apollo GraphQL が公開している GraphQL over WebSocket Protocol が有力そうですが、マイクロサービス等の用途で GraphQL を利用する事を考えると WebSocket よりも gRPC の方が適しているように思います。

  • GraphQL の Query や Mutation は gRPC の Unary RPC で実現可能
  • GraphQL の Subscription は gRPC の Server streaming RPC で実現可能 ※

そこで、とりあえず実装し確認してみたというのが本件の趣旨となっています。

 ※ GraphQL の Subscription を使わずに
    gRPC の streaming RPC で代用する事も考えられる

なお、GraphQL の処理に関しては「Deno で GraphQL」の内容をベースに、gRPC は「Node.js で gRPC を試す」の静的コード生成を用いて実装しています。

Query と Mutation - sample1

まずは Subscription を除いた Query と Mutation について実装してみます。

gRPC サービス定義

gRPC のサービス定義を下記のように定義してみました。

GraphQL のクエリは文字列で扱うので型は string、結果は実質的に JSON となるので型は google.protobuf.Struct としました。

ついでに、クエリの変数も渡せるようにして型は google.protobuf.Value としています。

ここで、google.protobuf.StructJSON Object を Protocol Buffers で表現するための型として定義されたもので、google.protobuf.ValueJSON Value(null、文字列、数値、配列、JSON Object 等)を表現するための型です。(参照 google/protobuf/struct.proto

proto/graphql.proto
syntax = "proto3";

import "google/protobuf/struct.proto";

package gql;

message QueryRequest {
    string query = 1;
    google.protobuf.Value variables = 2;
}

service GraphQL {
    rpc Query(QueryRequest) returns (google.protobuf.Struct);
}

この proto/graphql.proto から gRPC のコードを生成しておきます。

静的コード生成
> mkdir generated

> npm run gen-grpc
・・・
grpc_tools_node_protoc --grpc_out=grpc_js:generated --js_out=import_style=commonjs:generated proto/*.proto

package.json の内容は以下の通りです。

package.json
{
  "name": "sample1",
  "version": "1.0.0",
  "description": "",
  "scripts": {
    "gen-grpc": "grpc_tools_node_protoc --grpc_out=grpc_js:generated --js_out=import_style=commonjs:generated proto/*.proto"
  },
  "dependencies": {
    "@grpc/grpc-js": "^1.2.1",
    "google-protobuf": "^3.14.0",
    "graphql": "^15.4.0",
    "uuid": "^8.3.1"
  },
  "devDependencies": {
    "grpc-tools": "^1.10.0"
  }
}

サーバー実装

GraphQL を扱う gRPC サーバーを実装します。

gRPC のリクエストから GraphQL のクエリやその変数の内容を取得し、graphql 関数で処理した結果をレスポンスとして返すような処理となります。

google.protobuf.Struct や Value に該当する型は google-protobuf/google/protobuf/struct_pb.js にて StructValue として定義されており、プレーンな JavaScript オブジェクトと相互変換するための fromJavaScripttoJavaScript メソッドが用意されています。

下記コードでは QueryRequest の variables の内容を JavaScript オブジェクトとして取得するために toJavaScript を、graphql の処理結果を Struct で返すために fromJavaScript をそれぞれ使用しています。

server.js
const grpc = require('@grpc/grpc-js')
const { GraphQLService } = require('./generated/proto/graphql_grpc_pb')
const { Struct } = require('google-protobuf/google/protobuf/struct_pb')

const { graphql, buildSchema } = require('graphql')

const { v4: uuidv4 } = require('uuid')

// GraphQL スキーマ定義
const schema = buildSchema(`
    enum Category {
        Standard
        Extra
    }

    input CreateItem {
        category: Category!
        value: Int!
    }

    type Item {
        id: ID!
        category: Category!
        value: Int!
    }

    type Mutation {
        create(input: CreateItem!): Item
    }

    type Query {
        find(id: ID!): Item
    }
`)

const store = {}
// スキーマ定義に応じた GraphQL 処理の実装
const root = {
    create: ({ input: { category, value } }) => {
        console.log(`*** call create: category = ${category}, value = ${value}`)

        const id = `item-${uuidv4()}`
        const item = { id, category, value }

        store[id] = item

        return item
    },
    find: ({ id }) => {
        console.log(`*** call find: ${id}`)
        return store[id]
    }
}

const server = new grpc.Server()

server.addService(GraphQLService, {
    async query(call, callback) {
        try {
            const query = call.request.getQuery()
            const variables = call.request.getVariables().toJavaScript()
            // GraphQL の処理
            const r = await graphql(schema, query, root, {}, variables)

            callback(null, Struct.fromJavaScript(r))

        } catch(e) {
            console.error(e)
            callback(e)
        }
    }
})

server.bindAsync(
    '127.0.0.1:50051',
    grpc.ServerCredentials.createInsecure(),
    (err, port) => {
        if (err) {
            console.error(err)
            return
        }

        console.log(`start server: ${port}`)

        server.start()
    }
)

クライアント実装

クライアント側は以下のようになります。

client.js
const grpc = require('@grpc/grpc-js')
const { QueryRequest } = require('./generated/proto/graphql_pb')
const { GraphQLClient } = require('./generated/proto/graphql_grpc_pb')
const { Value } = require('google-protobuf/google/protobuf/struct_pb')

const client = new GraphQLClient(
    '127.0.0.1:50051',
    grpc.credentials.createInsecure()
)

const promisify = (obj, methodName) => args => 
    new Promise((resolve, reject) => {
        obj[methodName](args, (err, res) => {
            if (err) {
                reject(err)
            }
            else {
                resolve(res)
            }
        })
    })

const query = promisify(client, 'query')

const createRequest = (q, v = null) => {
    const req = new QueryRequest()

    req.setQuery(q)
    req.setVariables(Value.fromJavaScript(v))

    return req
}

const run = async () => {
    // Item の作成
    const r1 = await query(createRequest(`
        mutation {
            create(input: { category: Extra, value: 123 }) {
                id
            }
        }
    `))

    console.log(r1.toJavaScript())
    // 存在しない Item の find
    const r2 = await query(createRequest(`
        {
            find(id: "a1") {
                id
                value
            }
        }
    `))

    console.log(r2.toJavaScript())

    const id = r1.toJavaScript().data.create.id
    // 作成した Item の find (クエリ変数の使用)
    const r3 = await query(createRequest(
        `
            query findItem($id: ID!) {
                find(id: $id) {
                    id
                    category
                    value
                }
            }
        `,
        { id }
    ))

    console.log(r3.toJavaScript())
}

run().catch(err => console.error(err))

動作確認

server.js を実行しておきます。

server.js 実行
> node server.js
start server: 50051

client.js を実行した結果は以下の通りで、特に問題無く動作しているようです。

client.js 実行
> node client.js
{
  data: { create: { id: 'item-63bb7704-27b6-44ae-b955-61cbad83248d' } }
}
{ data: { find: null } }
{
  data: {
    find: {
      category: 'Extra',
      id: 'item-63bb7704-27b6-44ae-b955-61cbad83248d',
      value: 123
    }
  }
}

Subscription - sample2

次は Subscription の機能を追加します。

gRPC サービス定義

gRPC のサービス定義に Subscription 用のメソッドを追加し、sample1 と同様にコードを生成しておきます。

proto/graphql.proto
syntax = "proto3";

import "google/protobuf/struct.proto";

package gql;

message QueryRequest {
    string query = 1;
    google.protobuf.Value variables = 2;
}

service GraphQL {
    rpc Query(QueryRequest) returns (google.protobuf.Struct);
    rpc Subscription(QueryRequest) returns (stream google.protobuf.Struct);
}

サーバー実装

Deno で GraphQL」では単一の Subscription を処理するだけの実装だったので、複数クライアントからの Subscription を処理するために PubSub というクラスを追加し、subscription の呼び出し毎に MessageBox を作成、(クライアントが接続中の)有効な全ての MessageBox へメッセージを配信するようにしています。

server.js
const grpc = require('@grpc/grpc-js')
const { GraphQLService } = require('./generated/proto/graphql_grpc_pb')
const { Struct } = require('google-protobuf/google/protobuf/struct_pb')

const { graphql, buildSchema, subscribe, parse } = require('graphql')

const { v4: uuidv4 } = require('uuid')
// GraphQL スキーマ定義
const schema = buildSchema(`
    enum Category {
        Standard
        Extra
    }

    input CreateItem {
        category: Category!
        value: Int!
    }

    type Item {
        id: ID!
        category: Category!
        value: Int!
    }

    type Mutation {
        create(input: CreateItem!): Item
    }

    type Query {
        find(id: ID!): Item
    }

    type Subscription {
        created: Item
    }
`)

class MessageBox {
    #promises = []
    #resolves = []

    #appendPromise = () => this.#promises.push(
        new Promise(res => this.#resolves.push(res))
    )

    publish(msg) {
        if (this.#resolves.length == 0) {
            this.#appendPromise()
        }

        this.#resolves.shift()(msg)
    }

    [Symbol.asyncIterator]() {
        return {
            next: async () => {
                console.log('*** asyncIterator next')

                if (this.#promises.length == 0) {
                    this.#appendPromise()
                }

                const value = await this.#promises.shift()
                return { value, done: false }
            }
        }
    }
}
// クライアント毎の MessageBox を管理
class PubSub {
    #subscribes = []

    publish(msg) {
        this.#subscribes.forEach(s => s.publish(msg))
    }

    subscribe() {
        const sub = new MessageBox()
        this.#subscribes.push(sub)

        return sub
    }

    unsubscribe(sub) {
        this.#subscribes = this.#subscribes.filter(s => s != sub)
    }
}

const store = {}
const pubsub = new PubSub()

const root = {
    create: ({ input: { category, value } }) => {
        ・・・
    },
    find: ({ id }) => {
        ・・・
    }
}

const server = new grpc.Server()

server.addService(GraphQLService, {
   async query(call, callback) {
       ・・・
    },
    async subscription(call) {
        console.log('*** subscribed')

        try {
            const query = call.request.getQuery()
            const variables = call.request.getVariables().toJavaScript()

            const sub = pubsub.subscribe()

            call.on('cancelled', () => {
                console.log('*** unsubscribed')
                pubsub.unsubscribe(sub)
            })

            const subRoot = {
                created: () => sub
            }
            // GraphQL の Subscription 処理
            const aiter = await subscribe(schema, parse(query), subRoot, {}, variables)

            for await (const r of aiter) {
                // メッセージの配信
                call.write(Struct.fromJavaScript(r))
            }
        } catch(e) {
            console.error(e)
            call.destroy(e)
        }
    }
})

server.bindAsync(
    ・・・
)

Subscription 用クライアント実装

Subscription を呼び出すクライアントは以下のようになります。

client_subscribe.js
const grpc = require('@grpc/grpc-js')
const { QueryRequest } = require('./generated/proto/graphql_pb')
const { GraphQLClient } = require('./generated/proto/graphql_grpc_pb')
const { Value } = require('google-protobuf/google/protobuf/struct_pb')

const client = new GraphQLClient(
    '127.0.0.1:50051',
    grpc.credentials.createInsecure()
)

const req = new QueryRequest()

req.setQuery(`
    subscription {
        created {
            id
            category
        }
    }
`)

req.setVariables(Value.fromJavaScript(null))

const stream = client.subscription(req)

stream.on('data', msg => {
    const event = msg.toJavaScript()
    console.log(`*** received event = ${JSON.stringify(event)}`)
})

stream.on('end', () => console.log('*** stream end'))
stream.on('error', err => console.log(`*** stream error: ${err}`))

動作確認

server.js を実行した後、client_subscribe.js を 2つ実行して sample1 で作成した client.js を実行すると以下のようになりました。

server.js の出力結果
> node server.js
start server: 50051
*** subscribed
*** asyncIterator next
*** subscribed
*** asyncIterator next
*** call create: category = Extra, value = 123
*** asyncIterator next
*** asyncIterator next
*** call find: a1
*** call find: item-5e3f81ed-774a-4f7f-afc5-000a2db34859
client_subscribe.js の出力結果
> node client_subscribe.js
*** received event = {"data":{"created":{"category":"Extra","id":"item-5e3f81ed-774a-4f7f-afc5-000a2db34859"}}}

Node.js で gRPC を試す

gRPC Server Reflection のクライアント処理」では Node.js で gRPC クライアントを実装しましたが、今回はサーバー側も実装してみます。

サンプルコードは http://github.com/fits/try_samples/tree/master/blog/20201115/

はじめに

gRPC on Node.js では、以下の 2通りの手法が用意されており、それぞれ使用するパッケージが異なります。

  • (a) 動的コード生成 (@grpc/proto-loader パッケージを使用)
  • (b) 静的コード生成 (grpc-tools パッケージを使用)

更に、gRPC の実装ライブラリとして以下の 2種類が用意されており、どちらかを使う事になります。

  • C-based Client and Server (grpc パッケージを使用)
  • Pure JavaScript Client (@grpc/grpc-js パッケージを使用)

@grpc/grpc-js は現時点で Pure JavaScript Client と表現されていますが、クライアントだけではなくサーバーの実装にも使えます。

ここでは、Pure JavaScript 実装の @grpc/grpc-js を使って、(a) と (b) の両方を試してみます。

サービス定義(proto ファイル)

gRPC のサービス定義として下記ファイルを使用します。

Unary RPC(1リクエスト / 1レスポンス)と Server streaming RPC(1リクエスト / 多レスポンス)、message の oneofgoogle.protobuf.Empty の扱い等を確認するような内容にしてみました。

proto/item.proto
syntax = "proto3";

import "google/protobuf/empty.proto";

package item;

message AddItemRequest {
    string item_id = 1;
    uint64 price = 2;
}

message ItemRequest {
    string item_id = 1;
}

message Item {
    string item_id = 1;
    uint64 price = 2;
}

message ItemSubscribeRequest {
}

message AddedItem {
    string item_id = 1;
    uint64 price = 2;
}

message RemovedItem {
    string item_id = 1;
}

message ItemEvent {
    oneof event {
        AddedItem added = 1;
        RemovedItem removed = 2;
    }
}

service ItemManage {
    rpc AddItem(AddItemRequest) returns (google.protobuf.Empty);
    rpc RemoveItem(ItemRequest) returns (google.protobuf.Empty);
    rpc GetItem(ItemRequest) returns (Item);

    rpc Subscribe(ItemSubscribeRequest) returns (stream ItemEvent);
}

(a) 動的コード生成(@grpc/proto-loader)

まずは、@grpc/proto-loader を使った動的コード生成を試します。

インストール

@grpc/proto-loader@grpc/grpc-js をインストールしておきます。

> npm install --save @grpc/proto-loader @grpc/grpc-js

サーバー実装

proto-loader の loadSync 関数 ※ で proto ファイルをロードした結果を grpc-js の loadPackageDefinition で処理する事で型定義などを動的に生成します。

 ※ 非同期版の load 関数も用意されています

addService で gRPC のサービス定義と処理をマッピングし、bindAsync 後に start を呼び出す事でサーバー処理を開始します。

proto ファイルで定義したメッセージ型と同じフィールドを持つ JavaScript オブジェクトを gRPC のリクエストやレスポンスで使う事ができるようです。

Unary RPC の場合は、第二引数の callback へ失敗時の値と成功時の値をそれぞれ渡す事で処理結果を返します。

任意のエラーを返したい場合は、code で gRPC のステータスコードを、details でエラー内容を指定します。

google.protobuf.Empty の箇所は null もしくは undefined で代用できるようです。

Server streaming RPC の場合は、第一引数(下記コードでは call)の write を呼び出す事で処理結果を返す事ができます。

クライアントが途中で切断したりすると cancelled が発生するようになっており、cancelled 発生後に write を呼び出してもエラー等は発生しないようになっていました。

server.js
const protoLoader = require('@grpc/proto-loader')
const grpc = require('@grpc/grpc-js')

const protoFile = './proto/item.proto'
// proto ファイルのロード
const pd = protoLoader.loadSync(protoFile)
// gPRC 用の動的な型定義生成
const proto = grpc.loadPackageDefinition(pd)

let store = []
let subscribeList = []

const findItem = itemId => store.find(i => i.itemId == itemId)

const addItem = (itemId, price) => {
    if (findItem(itemId)) {
        return undefined
    }

    const item = { itemId, price }

    store.push(item)

    return item
}

const removeItem = itemId => {
    const item = findItem(itemId)

    if (item) {
        store = store.filter(i => i.itemId != item.itemId)
    }

    return item
}
// ItemEvent の配信
const publishEvent = event => {
    console.log(`*** publish event: ${JSON.stringify(event)}`)
    subscribeList.forEach(s => s.write(event))
}

const server = new grpc.Server()
// サービス定義と処理のマッピング
server.addService(proto.item.ItemManage.service, {
    AddItem(call, callback) {
        const itemId = call.request.itemId
        const price = call.request.price

        const item = addItem(itemId, price)

        if (item) {
            callback()
            publishEvent({ added: { itemId, price }})
        }
        else {
            const err = { code: grpc.status.ALREADY_EXISTS, details: 'exists item' }
            callback(err)
        }
    },
    RemoveItem(call, callback) {
        const itemId = call.request.itemId

        if (removeItem(itemId)) {
            callback()
            publishEvent({ removed: { itemId }})
        }
        else {
            const err = { code: grpc.status.NOT_FOUND, details: 'item not found' }
            callback(err)
        }
    },
    GetItem(call, callback) {
        const itemId = call.request.itemId
        const item = findItem(itemId)

        if (item) {
            callback(null, item)
        }
        else {
            const err = { code: grpc.status.NOT_FOUND, details: 'item not found' }
            callback(err)
        }
    },
    Subscribe(call) {
        console.log('*** subscribed')
        subscribeList.push(call)
        // クライアント切断時の処理
        call.on('cancelled', () => {
            console.log('*** unsubscribed')
            subscribeList = subscribeList.filter(s => s != call)
        })
    }
})

server.bindAsync(
    '127.0.0.1:50051',
    grpc.ServerCredentials.createInsecure(),
    (err, port) => {
        if (err) {
            console.error(err)
            return
        }

        console.log(`start server: ${port}`)
        // 開始
        server.start()
    }
)

クライアント実装1

まずは、Unary RPC の API のみ(Subscribe 以外)を呼び出すクライアントを実装してみます。

loadPackageDefinition を実施するところまではサーバーと同じです。

Unary RPC はコールバック関数を伴ったメソッドとして用意されますが、このメソッドに Node.js の util.promisify を直接適用すると不都合が生じたため、Promise 化は自前の関数(下記の promisify)で実施するようにしました。

client.js
const protoLoader = require('@grpc/proto-loader')
const grpc = require('@grpc/grpc-js')

const protoFile = './proto/item.proto'

const pd = protoLoader.loadSync(protoFile)
const proto = grpc.loadPackageDefinition(pd)

const id = process.argv[2]

const client = new proto.item.ItemManage(
    '127.0.0.1:50051',
    grpc.credentials.createInsecure()
)
// Unary RPC の Promise 化
const promisify = (obj, methodName) => args => 
    new Promise((resolve, reject) => {
        obj[methodName](args, (err, res) => {
            if (err) {
                reject(err)
            }
            else {
                resolve(res)
            }
        })
    })

const addItem = promisify(client, 'AddItem')
const removeItem = promisify(client, 'RemoveItem')
const getItem = promisify(client, 'GetItem')

const printItem = item => {
    console.log(`id = ${item.itemId}, price = ${item.price}`)
}

const run = async () => {
    await addItem({ itemId: `${id}_item-1`, price: 100 })

    const item1 = await getItem({ itemId: `${id}_item-1` })
    printItem(item1)

    await addItem({ itemId: `${id}_item-2`, price: 20 })

    const item2 = await getItem({ itemId: `${id}_item-2` })
    printItem(item2)

    await addItem({ itemId: `${id}_item-1`, price: 50 })
        .catch(err => console.error(`*** ERROR = ${err.message}`))

    await removeItem({ itemId: `${id}_item-1` })

    await getItem({ itemId: `${id}_item-1` })
        .catch(err => console.error(`*** ERROR = ${err.message}`))

    await removeItem({ itemId: `${id}_item-2` })
}

run().catch(err => console.error(err))

クライアント実装2

次は Server streaming RPC のクライアント実装です。

client_subscribe.js
const protoLoader = require('@grpc/proto-loader')
const grpc = require('@grpc/grpc-js')

const protoFile = './proto/item.proto'

const pd = protoLoader.loadSync(protoFile)
const proto = grpc.loadPackageDefinition(pd)

const client = new proto.item.ItemManage(
    '127.0.0.1:50051',
    grpc.credentials.createInsecure()
)

const stream = client.Subscribe({})
// メッセージ受信時
stream.on('data', event => {
    console.log(`*** received event = ${JSON.stringify(event)}`)
})

// サーバー終了時
stream.on('end', () => console.log('*** stream end'))
stream.on('error', err => console.log(`*** stream error: ${err}`))

動作確認

server.js の実行後、client_subscribe.js を 2つ起動した後に client.js を実行してみます。

Server 実行
> node server.js
start server: 50051
Client2-1 実行
> node client_subscribe.js
Client2-2 実行
> node client_subscribe.js
Client1 実行
> node client.js a1
id = a1_item-1, price = 100
id = a1_item-2, price = 20
*** ERROR = 6 ALREADY_EXISTS: exists item
*** ERROR = 5 NOT_FOUND: item not found

この時点で出力内容は以下のようになりました。

Server 出力内容
> node server.js
start server: 50051
*** subscribed
*** subscribed
*** publish event: {"added":{"itemId":"a1_item-1","price":100}}
*** publish event: {"added":{"itemId":"a1_item-2","price":20}}
*** publish event: {"removed":{"itemId":"a1_item-1"}}
*** publish event: {"removed":{"itemId":"a1_item-2"}}
Client2-1、Client2-2 出力内容
> node client_subscribe.js
*** received event = {"added":{"itemId":"a1_item-1","price":{"low":100,"high":0,"unsigned":true}}}
*** received event = {"added":{"itemId":"a1_item-2","price":{"low":20,"high":0,"unsigned":true}}}
*** received event = {"removed":{"itemId":"a1_item-1"}}
*** received event = {"removed":{"itemId":"a1_item-2"}}

Client2-2 を Ctrl + c で終了後に、Server も Ctrl + c で終了すると以下のようになり、Client2-1 のプロセスは終了しました。

Server 出力内容
> node server.js
start server: 50051
・・・
*** publish event: {"removed":{"itemId":"a1_item-2"}}
*** unsubscribed
^C
Client2-1 出力内容
> node client_subscribe.js
・・・
*** received event = {"removed":{"itemId":"a1_item-2"}}
*** stream error: Error: 13 INTERNAL: Received RST_STREAM with code 2 (Internal server error)
*** stream end

特に問題はなく、正常に動作しているようです。

(b) 静的コード生成(grpc-tools)

grpc-tools を使った静的コード生成を試します。

インストールとコード生成

grpc-tools@grpc/grpc-jsgoogle-protobuf をインストールしておきます。

> npm install --save-dev grpc-tools
・・・
> npm install --save @grpc/grpc-js google-protobuf
・・・

grpc-tools をインストールする事で使えるようになる grpc_tools_node_protoc コマンドで proto ファイルからコードを生成します。

grpc_tools_node_protoc コマンドは内部的に protoc コマンドを grpc_node_plugin プラグインを伴って呼び出すようになっています。

--grpc_out でサービス定義用のファイル xxx_grpc_pb.js が生成され、--js_out でメッセージ定義用のファイルが生成されます。

サービス定義 xxx_grpc_pb.js は --js_out で import_style=commonjs オプションを指定する事を前提としたコードになっています。※

 ※ import_style=commonjs オプションを指定した際に生成される
    xxx_pb.js を参照するようになっている

また、--grpc_out はデフォルトで grpc パッケージ用のコードを生成するため、ここでは grpc_js オプションを指定して @grpc/grpc-js 用のコードを生成するようにしています。

静的コード生成例(grpc_tools_node_protoc コマンド)
> mkdir generated

> grpc_tools_node_protoc --grpc_out=grpc_js:generated --js_out=import_style=commonjs:generated proto/*.proto
・・・

サーバー実装

(a) の場合と処理内容に大きな違いはありませんが、リクエストやレスポンスでは生成された型を使います。

アクセサメソッド(getter、setter)で値の取得や設定ができるようになっており、new 時に配列として全フィールドの値を指定する事もできるようです。 JavaScript オブジェクトへ変換したい場合は toObject メソッドを使用します。

addService でマッピングする際のメソッド名の一文字目が小文字になっています。

proto ファイルで定義したサービス名の後に Service を付けた型(ここでは ItemManageService)がサーバー処理用、Client を付けた型がクライアント処理用の型定義となるようです。

server.js
const grpc = require('@grpc/grpc-js')

const { Item, AddedItem, RemovedItem, ItemEvent } = require('./generated/proto/item_pb')
const { ItemManageService } = require('./generated/proto/item_grpc_pb')
const { Empty } = require('google-protobuf/google/protobuf/empty_pb')

let store = []
let subscribeList = []

const findItem = itemId => store.find(i => i.getItemId() == itemId)

const addItem = (itemId, price) => {
    if (findItem(itemId)) {
        return undefined
    }

    const item = new Item([itemId, price])

    store.push(item)

    return item
}

const removeItem = itemId => {
    const item = findItem(itemId)

    if (item) {
        store = store.filter(i => i.getItemId() != item.getItemId())
    }

    return item
}

const createAddedEvent = (itemId, price) => {
    const event = new ItemEvent()
    event.setAdded(new AddedItem([itemId, price]))

    return event
}

const createRemovedEvent = itemId => {
    const event = new ItemEvent()
    event.setRemoved(new RemovedItem([itemId]))

    return event
}

const publishEvent = event => {
    // toObject で JavaScript オブジェクトへ変換
    console.log(`*** publish event: ${JSON.stringify(event.toObject())}`)
    subscribeList.forEach(s => s.write(event))
}

const server = new grpc.Server()

server.addService(ItemManageService, {
    addItem(call, callback) {
        const itemId = call.request.getItemId()
        const price = call.request.getPrice()

        const item = addItem(itemId, price)

        if (item) {
            callback(null, new Empty())
            publishEvent(createAddedEvent(itemId, price))
        }
        else {
            const err = { code: grpc.status.ALREADY_EXISTS, details: 'exists item' }
            callback(err)
        }
    },
    removeItem(call, callback) {
        const itemId = call.request.getItemId()

        if (removeItem(itemId)) {
            callback(null, new Empty())
            publishEvent(createRemovedEvent(itemId))
        }
        else {
            const err = { code: grpc.status.NOT_FOUND, details: 'item not found' }
            callback(err)
        }
    },
    getItem(call, callback) {
        const itemId = call.request.getItemId()
        const item = findItem(itemId)

        if (item) {
            callback(null, item)
        }
        else {
            const err = { code: grpc.status.NOT_FOUND, details: 'item not found' }
            callback(err)
        }
    },
    subscribe(call) {
        console.log('*** subscribed')
        subscribeList.push(call)

        call.on('cancelled', () => {
            console.log('*** unsubscribed')
            subscribeList = subscribeList.filter(s => s != call)
        })
    }
})

server.bindAsync(
    ・・・
)

クライアント実装1

生成された型を使う点とメソッド名の先頭が小文字になっている点を除くと、基本的に (a) と同じです。

client.js
const grpc = require('@grpc/grpc-js')

const { AddItemRequest, ItemRequest } = require('./generated/proto/item_pb')
const { ItemManageClient } = require('./generated/proto/item_grpc_pb')

const id = process.argv[2]

const client = new ItemManageClient(
    '127.0.0.1:50051',
    grpc.credentials.createInsecure()
)

const promisify = (obj, methodName) => args => 
    new Promise((resolve, reject) => {
        ・・・
    })

const addItem = promisify(client, 'addItem')
const removeItem = promisify(client, 'removeItem')
const getItem = promisify(client, 'getItem')

const printItem = item => {
    console.log(`id = ${item.getItemId()}, price = ${item.getPrice()}`)
}

const run = async () => {
    await addItem(new AddItemRequest([`${id}_item-1`, 100]))

    const item1 = await getItem(new ItemRequest([`${id}_item-1`]))
    printItem(item1)

    await addItem(new AddItemRequest([`${id}_item-2`, 20]))

    const item2 = await getItem(new ItemRequest([`${id}_item-2`]))
    printItem(item2)

    await addItem(new AddItemRequest([`${id}_item-1`, 50]))
        .catch(err => console.error(`*** ERROR = ${err.message}`))

    await removeItem(new ItemRequest([`${id}_item-1`]))

    await getItem(new ItemRequest([`${id}_item-1`]))
        .catch(err => console.error(`*** ERROR = ${err.message}`))

    await removeItem(new ItemRequest([`${id}_item-2`]))
}

run().catch(err => console.error(err))

クライアント実装2

こちらも同様です。

client_subscribe.js
const grpc = require('@grpc/grpc-js')

const { ItemSubscribeRequest } = require('./generated/proto/item_pb')
const { ItemManageClient } = require('./generated/proto/item_grpc_pb')

const client = new ItemManageClient(
    ・・・
)

const stream = client.subscribe(new ItemSubscribeRequest())

stream.on('data', event => {
    // toObject で JavaScript オブジェクトへ変換
    console.log(`*** received event = ${JSON.stringify(event.toObject())}`)
})

・・・

動作確認

(a) と同じ操作を行った結果は以下のようになりました。

Server 出力内容
> node server.js
start server: 50051
*** subscribed
*** subscribed
*** publish event: {"added":{"itemId":"a1_item-1","price":100}}
*** publish event: {"added":{"itemId":"a1_item-2","price":20}}
*** publish event: {"removed":{"itemId":"a1_item-1"}}
*** publish event: {"removed":{"itemId":"a1_item-2"}}
*** unsubscribed
^C
Client1 出力内容
> node client.js a1
id = a1_item-1, price = 100
id = a1_item-2, price = 20
*** ERROR = 6 ALREADY_EXISTS: exists item
*** ERROR = 5 NOT_FOUND: item not found
Client2-1 出力内容
> node client_subscribe.js
*** received event = {"added":{"itemId":"a1_item-1","price":100}}
*** received event = {"added":{"itemId":"a1_item-2","price":20}}
*** received event = {"removed":{"itemId":"a1_item-1"}}
*** received event = {"removed":{"itemId":"a1_item-2"}}
*** stream error: Error: 13 INTERNAL: Received RST_STREAM with code 2 (Internal server error)
*** stream end
Client2-2 出力内容
> node client_subscribe.js
*** received event = {"added":{"itemId":"a1_item-1","price":100}}
*** received event = {"added":{"itemId":"a1_item-2","price":20}}
*** received event = {"removed":{"itemId":"a1_item-1"}}
*** received event = {"removed":{"itemId":"a1_item-2"}}
^C

(a) と (b) は同一の gRPC サービス(proto ファイル)を実装したものなので当然ですが、(a) と (b) を相互接続しても特に問題はありませんでした。

Rust で WASI 対応の WebAssembly を作成して実行

Rust で WASI 対応の WebAssembly を作って、スタンドアロン実行や Web ブラウザ上での実行を試してみました。

WASI(WebAssembly System Interface) は WebAssembly のコードを様々なプラットフォームで実行するためのインターフェースで、これに対応した WebAssembly であれば Web ブラウザ外で実行できます。

Rust で WASI 対応の WebAssembly を作るのは簡単で、ビルドターゲットに wasm32-wasi を追加しておいて、rustccargo build によるビルド時に --target wasm32-wasi を指定するだけでした。

wasm32-wasi の追加
> rustup target add wasm32-wasi

標準出力へ文字列を出力するだけの下記サンプルコードを --target wasm32-wasi でビルドすると sample1.wasm ファイルが作られました。

sample1.rs
fn main() {
    for i in 1..=3 {
        println!("count-{}", i);
    }

    print!("aaa");
    print!("bbb");
}
WASI 対応の WebAssembly として sample1.rs をビルド
> rustc --target wasm32-wasi sample1.rs

なお、今回のビルドに使用した Rust のバージョンは以下の通りです。

  • Rust 1.43.0

また、使用したソースコードhttp://github.com/fits/try_samples/tree/master/blog/20200429/ に置いてあります。

(1) スタンドアロン用ランタイムで実行

sample1.wasm を WebAssembly のランタイム wasmtimewasmer でそれぞれ実行してみます。

(1-a) wasmtime で実行

wasmtime v0.15.0 による sample1.wasm 実行結果
> wasmtime sample1.wasm
count-1
count-2
count-3
aaabbb

(1-b) wasmer で実行

wasmer v0.16.2 による sample1.wasm 実行結果
> wasmer sample1.wasm
count-1
count-2
count-3
aaabbb

どちらのランタイムでも問題なく実行できました。

(2) Web ブラウザ上で実行

次は、sample1.wasm を外部ライブラリ等を使わずに Web ブラウザ上で実行してみます。

主要な Web ブラウザや Node.js は JavaScript 用の WebAssembly API に対応済みのため、WebAssembly を実行可能です。

WASI 対応 WebAssembly の場合、実行対象の WebAssembly がインポートしている WASI の関数(の実装)を WebAssembly インスタンス化関数(WebAssembly.instantiate()WebAssembly.instantiateStreaming())の第二引数(引数名 importObject)として渡す必要があるようです。

(2-a) WebAssembly のインポート内容を確認

WebAssembly.compile() 関数で取得した WebAssembly.Module オブジェクトを WebAssembly.Module.imports() 関数へ渡す事で、その WebAssembly がインポートしている内容を取得できます。

ここでは、以下の Node.js スクリプトを使って WebAssembly のインポート内容を確認してみました。

wasm_listup_imports.js (WebAssembly のインポート内容を出力)
const fs = require('fs')

const wasmFile = process.argv[2]

const run = async () => {
    const module = await WebAssembly.compile(fs.readFileSync(wasmFile))

    const imports = WebAssembly.Module.imports(module)

    console.log(imports)
}

run().catch(err => console.error(err))

sample1.wasm へ適用してみると以下のような結果となりました。

インポート内容の出力結果(Node.js v12.16.2 で実行)
> node wasm_listup_imports.js sample1.wasm
[
  {
    module: 'wasi_snapshot_preview1',
    name: 'proc_exit',
    kind: 'function'
  },
  {
    module: 'wasi_snapshot_preview1',
    name: 'fd_write',
    kind: 'function'
  },
  {
    module: 'wasi_snapshot_preview1',
    name: 'fd_prestat_get',
    kind: 'function'
  },
  {
    module: 'wasi_snapshot_preview1',
    name: 'fd_prestat_dir_name',
    kind: 'function'
  },
  {
    module: 'wasi_snapshot_preview1',
    name: 'environ_sizes_get',
    kind: 'function'
  },
  {
    module: 'wasi_snapshot_preview1',
    name: 'environ_get',
    kind: 'function'
  }
]

この結果から、sample1.wasm は以下のようにしてインスタンス化できる事になります。

WebAssembly インスタンス化の例
const importObject = {
    wasi_snapshot_preview1: {
        proc_exit: () => {・・・},
        fd_write: () => {・・・},
        fd_prestat_get: () => {・・・},
        fd_prestat_dir_name: () => {・・・},
        environ_sizes_get: () => {・・・},
        environ_get: () => {・・・}
    }
}

WebAssembly.instantiate(・・・, importObject)
    ・・・

(2-b) fd_write 関数の実装

Rust の println! で呼び出される WASI の関数は fd_write なので、これを実装してみます。

fd_write の引数は 4つで、第一引数 fd は出力先のファイルディスクリプタで標準出力の場合は 1、第二引数 iovs は出力内容へのポインタ、第三引数 iovsLen は出力内容の数、第四引数 nwritten は出力済みのバイト数を設定するポインタとなっています。

なお、ポインタの対象は WebAssembly.instantiate() で取得した WebAssembly のインスタンスに含まれている WebAssembly.Memory です。

出力内容は iovs ポインタの位置から 4バイト毎に以下のような並びで情報が格納されているようなので、これを基に出力対象の文字列を取得して出力する事になります。

  • 1個目の出力内容の格納先ポインタ(4バイト)
  • 1個目の出力内容のバイトサイズ(4バイト)
  • ・・・
  • iovsLen 個目の出力内容の格納先ポインタ(4バイト)
  • iovsLen 個目の出力内容のバイトサイズ(4バイト)

何処まで処理を行ったか(出力したか)を返すために、nwritten ポインタの位置へ出力の完了したバイトサイズを設定します。

fd_write の実装例(wasmInstance には WebAssembly のインスタンスを設定)
・・・
fd_write: (fd, iovs, iovsLen, nwritten) => {
    const memory = wasmInstance.exports.memory.buffer
    const view = new DataView(memory)

    const sizeList = Array.from(Array(iovsLen), (v, i) => {
        const ptr = iovs + i * 8

        // 出力内容の格納先のポインタ取得
        const bufStart = view.getUint32(ptr, true)
        // 出力内容のバイトサイズを取得
        const bufLen = view.getUint32(ptr + 4, true)

        const buf = new Uint8Array(memory, bufStart, bufLen)

        // 出力内容の String 化
        const msg = String.fromCharCode(...buf)

        // 出力
        console.log(msg)

        return buf.byteLength
    })

    // 出力済みのバイトサイズ合計
    const totalSize = sizeList.reduce((acc, v) => acc + v)

    // 出力済みのバイトサイズを設定
    view.setUint32(nwritten, totalSize, true)

    return 0
},
・・・

最終的な HTML は下記のようになりました。

fd_write 以外の WASI 関数を空実装にして main 関数を呼び出して実行するようにしていますが、WASI の仕様としては _start 関数を呼び出すのが正しいようです ※。(WASI Application ABI 参照)

 ※ _start 関数を使う場合、fd_prestat_get 等の実装も必要となります

WebAssembly がインポートしている WASI 関数の実装をインスタンス化時(WebAssembly.instantiateStreaming)に渡す事になりますが、WASI の関数(fd_write 等)はインスタンス化の結果を使って処理する点に注意が必要です。

index.html(main 関数版)
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8" />
</head>
<body>
  <h1>WASI WebAssembly Sample</h1>
  <div id="res"></div>

  <script>
    const WASM_URL = './sample1.wasm'

    const wasiObj = {
      wasmInstance: null,
      importObject: {
        wasi_snapshot_preview1: {
          fd_write: (fd, iovs, iovsLen, nwritten) => {
            console.log(`*** call fd_write: fd=${fd}, iovs=${iovs}, iovsLen=${iovsLen}, nwritten=${nwritten}`)

            const memory = wasiObj.wasmInstance.exports.memory.buffer
            const view = new DataView(memory)

            const sizeList = Array.from(Array(iovsLen), (v, i) => {
              const ptr = iovs + i * 8

              const bufStart = view.getUint32(ptr, true)
              const bufLen = view.getUint32(ptr + 4, true)

              const buf = new Uint8Array(memory, bufStart, bufLen)

              const msg = String.fromCharCode(...buf)

              // 出力
              console.log(msg)
              document.getElementById('res').innerHTML += `<p>${msg}</p>`

              return buf.byteLength
            })

            const totalSize = sizeList.reduce((acc, v) => acc + v)

            view.setUint32(nwritten, totalSize, true)

            return 0
          },
          proc_exit: () => {},
          fd_prestat_get: () => {},
          fd_prestat_dir_name: () => {},
          environ_sizes_get: () => {},
          environ_get: () => {}
        }
      }
    }

    WebAssembly.instantiateStreaming(fetch(WASM_URL), wasiObj.importObject)
      .then(res => {
        console.log(res)

        // fd_write で参照できるようにインスタンスを wasmInstance へ設定
        wasiObj.wasmInstance = res.instance

        // main 関数の実行
        wasiObj.wasmInstance.exports.main()
      })
      .catch(err => console.error(err))
  </script>
</body>
</html>

main 関数の代わりに _start 関数を呼び出す場合は下記のようになりました。

_start 関数版の場合、fd_prestat_get の実装が重要となります ※。

 ※ fd_prestat_get を正しく実装していないと、
    fd_prestat_get の呼び出しが延々と繰り返されてしまいました

今回はファイル等を使っていないので(file descriptor 3 以降を開いていない)、fd_prestat_get は単に 8(WASI_EBADF, Bad file descriptor)を返すだけで良さそうです。

index2.html(_start 関数版)
・・・
  <script>
    const WASM_URL = './sample1.wasm'

    const wasiObj = {
      wasmInstance: null,
      importObject: {
        wasi_snapshot_preview1: {
          ・・・
          fd_prestat_get: () => 8,
          ・・・
        }
      }
    }

    WebAssembly.instantiateStreaming(fetch(WASM_URL), wasiObj.importObject)
      .then(res => {
        console.log(res)

        wasiObj.wasmInstance = res.instance
        // _start 関数の実行
        wasiObj.wasmInstance.exports._start()
      })
      .catch(err => console.error(err))
  </script>
・・・

(2-c) 実行

上記の .html ファイルを Web ブラウザで直接開いても WebAssembly を実行できないため、HTTP サーバーを使う事になります。

更に、Web ブラウザ上で WebAssembly を実行するには、.wasm ファイルを MIME Type application/wasm で取得する必要があるようです。

Python の http.server は application/wasm に対応していたため(Python 3.8.2 と 3.7.6 で確認)、以下のスクリプトで HTTP サーバーを立ち上げる事にしました。

web_server.py
import http.server
import socketserver

PORT = 8080

Handler = http.server.SimpleHTTPRequestHandler

with socketserver.TCPServer(("", PORT), Handler) as httpd:
    print(f"start server port:{PORT}")
    httpd.serve_forever()
HTTP サーバー起動(Python 3.8.2 で実行)
> python web_server.py
start server port:8080

Web ブラウザ(Chrome)で http://localhost:8080/index.html へアクセスしたところ(index2.html でも同様)、sample1.wasm の実行を確認できました。

Chrome の実行結果

f:id:fits:20200429200738p:plain

(3) Node.js で組み込み実行

次は、Node.js で WebAssembly を組み込み実行してみます。

(3-a) fd_write 実装

上記 index2.html の処理をベースにローカルの .wasm ファイルを読み込んで実行するようにしました。

sample1.wasm のインポート内容に合わせたものなので、インポート内容の異なる WebAssembly の実行には使えません。

wasm_run_sample.js
const fs = require('fs')

const WASI_ESUCCESS = 0;
const WASI_EBADF = 8; // Bad file descriptor

const wasmFile = process.argv[2]

const wasiObj = {
    wasmInstance: null,
    importObject: {
        wasi_snapshot_preview1: {
            fd_write: (fd, iovs, iovsLen, nwritten) => {
                ・・・
                
                const sizeList = Array.from(Array(iovsLen), (v, i) => {
                    ・・・
                    
                    process.stdout.write(msg)
                    
                    return buf.byteLength
                })
                
                ・・・
                
                return WASI_ESUCCESS
            },
            ・・・
            fd_prestat_get: (fd, bufPtr) => { 
                console.log(`*** call fd_prestat_get: fd=${fd}, bufPtr=${bufPtr}`)
                return WASI_EBADF
            },
            ・・・
        }
    }
}

const buf = fs.readFileSync(wasmFile)

WebAssembly.instantiate(buf, wasiObj.importObject)
    .then(res => {
        wasiObj.wasmInstance = res.instance
        wasiObj.wasmInstance.exports._start()
    })
    .catch(err => console.error(err))
実行結果(Node.js v12.16.2 で実行)
> node wasm_run_sample.js sample1.wasm
*** call fd_prestat_get : fd=3, bufPtr=1048568
*** call fd_write: fd=1, iovs=1047968, iovsLen=1, nwritten=1047948
count-1
*** call fd_write: fd=1, iovs=1047968, iovsLen=1, nwritten=1047948
count-2
*** call fd_write: fd=1, iovs=1047968, iovsLen=1, nwritten=1047948
count-3
*** call fd_write: fd=1, iovs=1048432, iovsLen=1, nwritten=1048412
aaabbb

(3-b) Wasmer-JS 使用

Wasmer-JS@wasmer/wasi モジュールを使って、もっと汎用的に組み込み実行できるようにしてみます。

@wasmer/wasi インストール例
> npm install @wasmer/wasi

@wasmer/wasi の WASI を使う事で、インポート内容に合わせた WASI 関数の取得や _start 関数の呼び出しを任せる事ができます。

run_wasmer_js/index.js
const fs = require('fs')
const { WASI } = require('@wasmer/wasi')

const wasmFile = process.argv[2]

const wasi = new WASI()

const run = async () => {
    const module = await WebAssembly.compile(fs.readFileSync(wasmFile))
    // インポート内容に合わせた WASI 関数の実装を取得
    const importObject = wasi.getImports(module)

    const instance = await WebAssembly.instantiate(module, importObject)

    // 実行
    wasi.start(instance)
}

run().catch(err => console.error(err))
実行結果(Node.js v12.16.2 で実行)
> node index.js ../sample1.wasm
count-1
count-2
count-3
aaabbb

(4) 標準出力以外の機能

最後に、現時点でどんな機能を使えるのか気になったので、いくつか試してみました。

まず、TcpStream を使ったコードの wasm32-wasi ビルドは一応成功しました。

sample2.rs
use std::net::TcpStream;

fn main() {
    let res = TcpStream::connect("127.0.0.1:8080");
    println!("{:?}", res);
}
sample2.rs ビルド
> rustc --target wasm32-wasi sample2.rs

ただし、実行してみると以下のような結果となりました。(wasmtime 以外で実行しても同じ)

wasmtime による sample2.wasm 実行結果
> wasmtime sample2.wasm

Err(Custom { kind: Other, error: "operation not supported on wasm yet" })

Rust のソースコードで該当(すると思われる)箇所を確認してみると、unsupported() を返すようになっていました。

https://github.com/rust-lang/rust/blob/master/src/libstd/sys/wasi/net.rs(2020/4/26 時点)
・・・
impl TcpStream {
    pub fn connect(_: io::Result<&SocketAddr>) -> io::Result<TcpStream> {
        unsupported()
    }
    ・・・
}
・・・

https://github.com/rust-lang/rust/blob/master/src/libstd/sys/wasi/ のソースを確認してみると、(スレッド系の処理等)他にも未対応の機能がいくつもありました。

一方で、環境変数・システム時間・スリープ処理は使えそうだったので、以下のコードで確認してみました。

sample3.rs
use std::env;
use std::thread::sleep;
use std::time::{ Duration, SystemTime };

fn main() {
    // 環境変数 SLEEP_TIME からスリープする秒数を取得(デフォルトは 1)
    let sleep_sec = env::var("SLEEP_TIME").ok()
                         .and_then(|v| v.parse::<u64>().ok())
                         .unwrap_or(1);

    // システム時間の取得
    let time = SystemTime::now();

    println!("start: sleep {}s", sleep_sec);

    // スリープの実施
    sleep(Duration::from_secs(sleep_sec));

    // 経過時間の出力
    match time.elapsed() {
        Ok(s) => println!("end: elapsed {}s", s.as_secs()),
        Err(e) => println!("error: {:?}", e),
    }
}
sample3.rs のビルド
> rustc --target wasm32-wasi sample3.rs

wasmtime では正常に実行できましたが、wasmer は今のところスリープ処理に対応していないようでエラーとなりました。

ちなみに、環境変数はどちらのコマンドも --env で指定できました。

wasmtime v0.15.0 による sample3.wasm 実行結果
> wasmtime --env SLEEP_TIME=5 sample3.wasm
start: sleep 5s
end: elapsed 5s
wasmer v0.16.2 による sample3.wasm 実行結果
> wasmer sample3.wasm --env SLEEP_TIME=5
start: sleep 5s
thread 'main' panicked at 'not yet implemented: Polling not implemented for clocks yet', lib\wasi\src\syscalls\mod.rs:2373:21
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace.
Error: error: "unhandled trap at 7fffd474a799 - code #e06d7363: unknown exception code"