CDK で作った CloudFormation テンプレートをプログラム内からデプロイする
AWS CDK (Cloud Development Kit) では通常 cdk deploy
コマンドを使ってデプロイ処理を実施します。
これを cdk
コマンドを使わずにプログラム内から実施できないか、以下の 2通りで試してみました。
なお、実際のところ (a) の場合でも、内部的には CloudFormation の API が呼び出されています。
今回のソースは http://github.com/fits/try_samples/tree/master/blog/20210418/
はじめに
今回は DynamoDB のテーブルを作るだけの単純なスタック(以下)を使用します。
a_sample/stack.ts
import { Construct, Stack, StackProps, CfnOutput } from '@aws-cdk/core' import * as dynamodb from '@aws-cdk/aws-dynamodb' export class SampleStack extends Stack { constructor(scope: Construct, id: string, props?: StackProps) { super(scope, id, props) const table = new dynamodb.Table(this, 'Items', { partitionKey: { name: 'id', type: dynamodb.AttributeType.STRING } }) new CfnOutput(this, 'TableName', { value: table.tableName }) } }
このスタックの CloudFormation テンプレートは、以下のような処理で文字列として取得できます。
a_sample/sample.ts
import { App } from '@aws-cdk/core' import { SampleStack } from './stack' const app = new App() new SampleStack(app, 'SampleStack') // CloudFormation テンプレートの文字列化 const tpl = JSON.stringify(app.synth().stacks[0].template) console.log(tpl)
実行結果例
> cd a_sample > npx ts-node sample.ts {"Resources":{"Items5C12978B":{"Type":"AWS::DynamoDB::Table","Properties":{"KeySchema":[{"AttributeName":"id","KeyType":"HASH"}],"AttributeDefinitions":[{"AttributeName":"id","AttributeType":"S"}],"ProvisionedThroughput":{"ReadCapacityUnits":5,"WriteCapacityUnits":5}},"UpdateReplacePolicy":"Retain","DeletionPolicy":"Retain"}},"Outputs":{"TableName":{"Value":{"Ref":"Items5C12978B"}}}}
(a) AWS CDK の API を利用
cdk deploy
コマンド処理内で使われている CloudFormationDeployments
を直接使うのが簡単そうだったので、今回はこれを使います。
デフォルトでは cdk deploy
時と同じように進捗状況が出力されますが、これは quiet
を true
にする事で抑制できました。
a_sample/deploy_a.ts
import { App } from '@aws-cdk/core' import { SdkProvider } from 'aws-cdk/lib/api/aws-auth' import { CloudFormationDeployments } from 'aws-cdk/lib/api/cloudformation-deployments' import { SampleStack } from './stack' const app = new App() new SampleStack(app, 'Sample1Stack') const run = async () => { const deployer = new CloudFormationDeployments({ sdkProvider: await SdkProvider.withAwsCliCompatibleDefaults() }) // デプロイ処理 const r = await deployer.deployStack({ stack: app.synth().stacks[0], quiet: true }) console.log(r) } run().catch(err => console.error(err))
パッケージインストール例
> cd a_sample > npm install -D ts-node typescript @types/node ・・・ > npm install @aws-cdk/aws-dynamodb aws-cdk ・・・
実行結果は以下の通りです。 CloudFormation のスタックが登録され、DynamoDB のテーブルが問題なく作成されました。
実行結果
> npx ts-node deploy_a.ts Sample1Stack: creating CloudFormation changeset... { noOp: false, outputs: { TableName: 'Sample1Stack-Items・・・' }, ・・・ stackArtifact: CloudFormationStackArtifact { assembly: CloudAssembly { ・・・ }, id: 'Sample1Stack', manifest: { type: 'aws:cloudformation:stack', environment: 'aws://unknown-account/unknown-region', properties: [Object], metadata: [Object] }, messages: [], _dependencyIDs: [], environment: { account: 'unknown-account', region: 'unknown-region', name: 'aws://unknown-account/unknown-region' }, templateFile: 'Sample1Stack.template.json', parameters: {}, tags: {}, assumeRoleArn: undefined, cloudFormationExecutionRoleArn: undefined, stackTemplateAssetObjectUrl: undefined, requiresBootstrapStackVersion: undefined, bootstrapStackVersionSsmParameter: undefined, terminationProtection: undefined, stackName: 'Sample1Stack', assets: [], displayName: 'Sample1Stack', name: 'Sample1Stack', originalName: 'Sample1Stack', _deps: [], _template: { Resources: [Object], Outputs: [Object] } } }
(b) AWS SDK の CloudFormation API を利用
今回は、AWS SDK for JavaScript v3 を使います。
実質的に AWS CLI の cloudformation deploy
コマンドと同じ事をするだけですが、AWS SDK にはこれに相当する API が用意されていないようなので、下記ソースコードを参考に自作してみました。
- https://github.com/aws/aws-cli/blob/develop/awscli/customizations/cloudformation/deploy.py
- https://github.com/aws/aws-cli/blob/develop/awscli/customizations/cloudformation/deployer.py
なお、CDK の下記ソースコードでも同じような処理になっていました。
- https://github.com/aws/aws-cdk/blob/master/packages/aws-cdk/lib/api/deploy-stack.ts
- https://github.com/aws/aws-cdk/blob/master/packages/aws-cdk/lib/api/util/cloudformation.ts
簡単にまとめると、以下のような処理を実装する事になります。
- (1) DescribeStacks で Stack の有無を確認して、ChangeSetType(
CREATE
かUPDATE
)を決定 - (2) CreateChangeSet で変更セットを作成
- (3) 処理が完了するまで DescribeChangeSet をポーリング
- (4) ExecuteChangeSet で変更セットを実行
- (5) 処理が完了するまで DescribeStacks をポーリング
(1) の DescribeStacks では該当するスタックが存在しない場合にもエラーとなってしまうため、他のエラーと区別するためにエラーメッセージが指定の文字列に合致するかどうかで判定しています。(AWS CLI や CDK と同様)
(3) では変更セットのステータスが CREATE_PENDING
か CREATE_IN_PROGRESS
以外になるまでポーリングし、(5) ではスタックのステータスが xxx_IN_PROGRESS
以外になるまでポーリングすれば良さそうです。
また、(3) におけるポーリングの間隔は AWS CLI と CDK 共に 5秒毎となっていましたが、(5) の方は AWS CLI が 30秒で CDK は 5秒となっているようでした。
b_sample/deployer.ts(CloudFormation の API を使ったデプロイ処理の実装)
import { CloudFormationStackArtifact } from '@aws-cdk/cx-api' import { CloudFormationClient, CreateChangeSetCommand, ExecuteChangeSetCommand, DescribeChangeSetCommand, DescribeStacksCommand, Change, Stack } from '@aws-sdk/client-cloudformation' const MAX_RETRY = 100 const WAIT_TIME_CREATE = 5000 const WAIT_TIME_EXECUTE = 10000 const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms)) const cf = new CloudFormationClient({}) type ChangeSetType = 'CREATE' | 'UPDATE' export class CfDeployer { static async deploy(stackArtifact: CloudFormationStackArtifact): Promise<Stack | undefined> { const stackName = stackArtifact.stackName // (1) const changeSetType = await CfDeployer.selectChangeSetType(stackName) // (2) const r1 = await cf.send(new CreateChangeSetCommand({ ChangeSetName: `ChangeSet-${Date.now()}`, StackName: stackName, TemplateBody: JSON.stringify(stackArtifact.template), ChangeSetType: changeSetType })) console.log(r1) const changeSetId = r1.Id if (!changeSetId) { throw new Error(`failed create ChangeSet: StackName=${stackName}`) } // (3) const cs = await CfDeployer.waitForCreate(changeSetId, stackName) if (cs.length < 1) { console.log('*** NO CHANGE') return CfDeployer.getStack(stackName) } // (4) const r2 = await cf.send(new ExecuteChangeSetCommand({ ChangeSetName: changeSetId, StackName: stackName })) console.log(r2) // (5) return CfDeployer.waitForExecute(stackName) } static async getStackStatus(stackName: string): Promise<string> { const stack = await CfDeployer.getStack(stackName) return stack?.StackStatus ?? 'NONE' } static async getStack(stackName: string): Promise<Stack | undefined> { try { const r = await cf.send(new DescribeStacksCommand({ StackName: stackName })) return r.Stacks?.[0] } catch(e) { // スタックが存在しなかった場合のエラー以外はそのまま throw if (e.Code !== 'ValidationError' || e.message !== `Stack with id ${stackName} does not exist`) { throw e } } return undefined } // (3) private static async waitForCreate(changeSetId: string, stackName: string): Promise<Change[]> { for (let i = 0; i < MAX_RETRY; i++) { await sleep(WAIT_TIME_CREATE) const r = await cf.send(new DescribeChangeSetCommand({ ChangeSetName: changeSetId, StackName: stackName })) if (r.Status === 'CREATE_PENDING' || r.Status === 'CREATE_IN_PROGRESS') { continue } if (r.Status == 'CREATE_COMPLETE') { return r.Changes ?? [] } if (r.Status == 'FAILED') { console.log(`*** failed: ${JSON.stringify(r)}`) return [] } throw new Error(`failed create ChangeSet: ChangeSetId=${changeSetId}, StackName=${stackName}`) } throw new Error(`create ChangeSet timeout: ChangeSetId=${changeSetId}, StackName=${stackName}`) } // (5) private static async waitForExecute(stackName: string): Promise<Stack> { for (let i = 0; i < MAX_RETRY; i++) { await sleep(WAIT_TIME_EXECUTE) const stack = await CfDeployer.getStack(stackName) if (!stack) { throw new Error('not found') } if (!stack.StackStatus?.endsWith('_IN_PROGRESS')) { return stack } } throw new Error(`execute ChangeSet timeout: StackName=${stackName}`) } private static async selectChangeSetType(stackName: string): Promise<ChangeSetType> { const status = await CfDeployer.getStackStatus(stackName) return (status == 'NONE' || status == 'REVIEW_IN_PROGRESS') ? 'CREATE' : 'UPDATE' } }
上記を使ってスタックをデプロイする処理は以下のようになります。
b_sample/deploy_b.ts
import { App } from '@aws-cdk/core' import { CfDeployer } from './deployer' import { SampleStack } from './stack' const app = new App() new SampleStack(app, 'Sample2Stack') CfDeployer.deploy(app.synth().stacks[0]) .then(r => console.log(r)) .catch(err => console.error(err))
パッケージインストール例
> cd b_sample > npm install -D ts-node typescript @types/node ・・・ > npm install @aws-cdk/aws-dynamodb @aws-sdk/client-cloudformation ・・・
実行結果は以下の通りです。 こちらも CloudFormation のスタックが登録され、DynamoDB のテーブルが問題なく作成されました。
実行結果
> npx ts-node deploy_b.ts { '$metadata': { httpStatusCode: 200, requestId: '・・・', extendedRequestId: undefined, cfId: undefined, attempts: 1, totalRetryDelay: 0 }, Id: 'arn:aws:cloudformation:・・・:changeSet/ChangeSet-・・・', StackId: 'arn:aws:cloudformation:・・・:stack/Sample2Stack/・・・' } { '$metadata': { httpStatusCode: 200, requestId: '・・・', extendedRequestId: undefined, cfId: undefined, attempts: 1, totalRetryDelay: 0 } } { StackId: 'arn:aws:cloudformation:・・・', StackName: 'Sample2Stack', ChangeSetId: 'arn:aws:cloudformation:・・・', Description: undefined, Parameters: undefined, ・・・ StackStatus: 'CREATE_COMPLETE', StackStatusReason: undefined, DisableRollback: false, NotificationARNs: [], TimeoutInMinutes: undefined, Capabilities: undefined, Outputs: [ { OutputKey: 'TableName', OutputValue: 'Sample2Stack-Items・・・', Description: undefined, ExportName: undefined } ], RoleARN: undefined, Tags: [], EnableTerminationProtection: false, ParentId: undefined, RootId: undefined, DriftInformation: { StackDriftStatus: 'NOT_CHECKED', LastCheckTimestamp: undefined } }
Amplify AppSync Simulator を直接使ってマッピングテンプレートを検証
Amplify AppSync Simulator は、AWS Amplify CLI に含まれているモジュールで、AppSync をローカル環境で動作確認するためのものです。(AppSync の GraphQL を処理する Web サーバーが起動するようになっている)
ソースコードを見てみたところ、AppSync 用の処理を適用した GraphQL.js の GraphQLSchema
を作り、これを graphql
関数で実行するようになっていました。
そこで試しに、Amplify AppSync Simulator の API を直接使う事で、Amplify CLI を使わず Web サーバーも起動せずに、リクエストマッピングテンプレートの処理結果を出力する Node.js スクリプトを作ってみました。
今回のソースは http://github.com/fits/try_samples/tree/master/blog/20210315/
準備
amplify-appsync-simulator
モジュールをインストールしておきます。
インストール例
> npm install amplify-appsync-simulator
Lambda 用マッピングテンプレートの処理
今回は、Lambda 用のマッピングテンプレートを処理してみます。
AppSync では下記を設定する事で GraphQL の処理を定義するようになっています。
※ リゾルバーは、GraphQL API とデータソースとの紐づけ、 入出力データの加工をマッピングテンプレートとして設定するようになっています
マッピングテンプレートは、VTL(Apache Velocity Template Language)の形式でリクエストやレスポンスの内容を加工して JSON 文字列を作ります。
Amplify AppSync Simulator では AmplifyAppSyncSimulator
クラスの init
メソッドに対して、上記 (1) ~ (3) と defaultAuthenticationType
を設定した AmplifyAppSyncSimulatorConfig
を渡す事で、AppSync 用の処理を適用した GraphQLSchema
を内部的に作るようになっており、それを schema
で取得する事が可能です。
defaultAuthenticationType
の設定は、今回のように GraphQLSchema を直接利用するケースでは使われませんが、設定自体は必須のようです。
また、Amplify AppSync Simulator が対応しているデータソースは今のところ下記 3タイプのようで、タイプ毎に用意されている DataLoader
が処理を担うようになっています。
各 DataLoader の load
メソッドの第一引数にリクエストマッピングテンプレートの処理結果がそのまま渡ってくるようになっています。
NONE データソースの利用
NONE タイプ用 NoneDataLoader
の load メソッドは、単純に payload
の値を処理結果として返すような実装になっています。
そこで、この load メソッドを上書きする事でリクエストマッピングテンプレートの結果を出力するようにしてみました。
なお、AmplifyAppSyncSimulator が作成した GraphQLSchema を graphql
関数で処理する際のコンテキストに requestAuthorizationMode
と appsyncErrors
の項目が最低限必要でした。
defaultAuthenticationType は必須のようなので API_KEY
を設定していますが、実際には使われないので API_KEY の値を設定したりする必要はありませんでした。
sample1.js
const { AmplifyAppSyncSimulator } = require('amplify-appsync-simulator') const { graphql } = require('graphql') // GraphQL スキーマ定義 const schema = ` type Item { id: ID! value: Int! } input FindInput { id: ID! } type Query { find(input: FindInput!): Item } ` // データソースの設定 const dataSources = [ { type: 'NONE', name: 'ItemFunc' } ] // リゾルバーの設定 const resolvers = [ { kind: 'UNIT', // GraphQL の Query で定義した find フィールドと ItemFunc データソースとのマッピング typeName: 'Query', fieldName: 'find', dataSourceName: 'ItemFunc' // リクエストマッピングテンプレートの設定(GraphQL クエリの input をそのまま payload に設定) requestMappingTemplate: ` { "version": "2018-05-29", "operation": "Invoke", "payload": $utils.toJson($ctx.args.input) } `, // レスポンスマッピングテンプレートの設定 responseMappingTemplate: '$utils.toJson($context.result)' } ] // 設定 const config = { schema: { content: schema }, resolvers, dataSources, appSync: { // 下記の設定は必須 defaultAuthenticationType: { authenticationType: 'API_KEY' } } } const simulator = new AmplifyAppSyncSimulator() simulator.init(config) // ItemFunc(NoneDataLoader)の load メソッド上書き simulator.getDataLoader('ItemFunc').load = (req) => { console.log(`*** load: ${JSON.stringify(req)}`) return { id: req.payload.id, value: 123 } } // AmplifyAppSyncSimulator用 GraphQL の実行コンテキスト const ctx = { requestAuthorizationMode: 'API_KEY', appsyncErrors: [] } // GraphQL クエリ const q = ` query FindItem($id: ID!) { find(input: {id: $id}) { id value } } ` const run = async () => { // GraphQL クエリ実行 const r = await graphql(simulator.schema, q, null, ctx, {id: 'id1'}) console.log(JSON.stringify(r)) } run().catch(err => console.error(err))
実行結果は以下の通り。 リクエストマッピングテンプレートの処理結果が出力されています。
実行結果
> node sample1.js *** load: {"version":"2018-05-29","operation":"Invoke","payload":{"id":"id1"}} {"data":{"find":{"id":"id1","value":123}}}
AWS_LAMBDA データソースの利用
AWS_LAMBDA タイプの場合は、データソース設定の invoke へ設定した関数が呼び出されるようになっているので ※、これを使って Lambda の関数ハンドラーを直接実行する事もできます。
※ リクエストマッピングテンプレート処理結果の payload の値が引数として与えられるようになっています
lambda_func.js(関数ハンドラー)
exports.handler = async (event) => { console.log(`*** handler: ${JSON.stringify(event)}`) return { id: event.id, value: 234 } }
sample2.js
const { AmplifyAppSyncSimulator } = require('amplify-appsync-simulator') const { graphql } = require('graphql') const schema = ` ・・・ ` const dataSources = [ // invoke へ lambda_func.js の handler を設定 { type: 'AWS_LAMBDA', name: 'ItemFunc', invoke: require('./lambda_func.js').handler } ] const resolvers = [ ・・・ ] const config = { schema: { content: schema }, resolvers, dataSources, appSync: { defaultAuthenticationType: { authenticationType: 'API_KEY' } } } const simulator = new AmplifyAppSyncSimulator() simulator.init(config) const ctx = { requestAuthorizationMode: 'API_KEY', appsyncErrors: [] } const q = ` query FindItem($id: ID!) { find(input: {id: $id}) { id value } } ` const run = async () => { const r = await graphql(simulator.schema, q, null, ctx, {id: 'id2'}) console.log(JSON.stringify(r)) } run().catch(err => console.error(err))
実行結果
> node sample2.js *** handler: {"id":"id2"} {"data":{"find":{"id":"id2","value":234}}}
AWS Lambda のランタイム API サーバーを自作して関数ハンドラーをローカル実行
AWS Lambda では、(Lambda 関数の)ランタイムがランタイム API(ランタイムインターフェース)から呼び出しイベントを受け取って、関数ハンドラーを実行し、その結果をランタイム API へ返すような流れで処理が実施されているようです。
そこで、自作のランタイム API サーバーを使って、Go と Node.js でそれぞれ実装した AWS Lambda の関数ハンドラーを(Docker 等を使わずに)ローカル実行してみました。
ソースは http://github.com/fits/try_samples/tree/master/blog/20210211/
ランタイム API サーバーの実装
以下のようなオープンソースのランタイム API サーバーが提供されており、自作する必要は無かったりするのですが。
今回は、fastify を使ってランタイム API サーバーを実装してみました。
AWS Lambda ランタイム API のページから OpenAPI 仕様(2018-06-01)を入手できるので、このスキーマ定義を参考に必要最小限の処理だけを実装します。
まず、ランタイム API として下記を実装する事になります。(ただし、今回の用途では (d) を使いません)
- (a) 次の呼び出し : GET
/runtime/invocation/next
- (b) 呼び出しレスポンス : POST
/runtime/invocation/{AwsRequestId}/response
- (c) 呼び出しエラー : POST
/runtime/invocation/{AwsRequestId}/error
- (d) 初期化エラー : POST
/runtime/init/error
(b) ~ (d) は成功時に 202 ステータスコードを返す事になります。
(b) や (c) における AwsRequestId
の値は、(a) のレスポンスヘッダーで通知する事になり、(a) のレスポンスヘッダーで最低限必要な項目は下記でした。(aws-lambda-go の場合)
- Lambda-Runtime-Aws-Request-Id (AwsRequestId の値)
- Lambda-Runtime-Deadline-Ms (実行期限)
一方で、ランタイム側は以下のような手順でランタイム API を呼び出すようになっているようです。
- (1) (a) を GET してレスポンスが返ってくるのを待つ
- (2) (a) からレスポンス(呼び出しイベント)が返ってくると関数ハンドラーを呼び出す
- (3) 関数ハンドラーの呼び出し結果を (b) へ POST、エラーが発生した場合は (c) へ POST
- (4) (1) へ戻る
つまり、(a) で固定の内容をすぐに返すような実装にしてしまうと、(1) ~ (4) が繰り返されてしまい不都合が生じます。
そこで、下記では /invoke
を呼び出した際に (a) のレスポンスを返すようにしています。
また、Go と Node.js の関数ハンドラーを同時に扱うために、(a) が呼び出された際の reply を replies へ溜めておいて、forEach で処理するようにしています。
runtime-api-server/server.js (ランタイム API サーバー)
const logger = false const fastify = require('fastify')({ logger }) const RUNTIME_PATH = '/2018-06-01/runtime' const TIMEOUT = 5 * 1000 const port = process.env['SERVER_PORT'] ? parseInt(process.env['SERVER_PORT']) : 8080 const replies = [] fastify.post('/invoke', (req, reply) => { const data = req.body console.log(`*** invoke: ${JSON.stringify(data)}`) replies.splice(0).forEach(r => { const deadline = Date.now() + TIMEOUT // (a) のレスポンスを返す r.code(200) .header('Lambda-Runtime-Aws-Request-Id', data['request-id']) .header('Lambda-Runtime-Deadline-Ms', deadline.toString()) .header('Lambda-Runtime-Trace-Id', data['trace-id']) .header('Lambda-Runtime-Invoked-Function-Arn', data['function-arn']) .send(data.body) }) reply.code(200).send({}) }) // Runtime API の実装 // (a) 次の呼び出し fastify.get(`${RUNTIME_PATH}/invocation/next`, (req, reply) => { console.log('*** next') console.log(req.body) replies.push(reply) }) // (b) 呼び出しレスポンス fastify.post(`${RUNTIME_PATH}/invocation/:id/response`, (req, reply) => { console.log(`*** response: id = ${req.params['id']}`) console.log(req.body) reply.code(202).send({ status: '' }) }) // (c) 呼び出しエラー fastify.post(`${RUNTIME_PATH}/invocation/:id/error`, (req, reply) => { console.log(`*** error: id = ${req.params['id']}`) console.log(req.body) reply.code(202).send({ status: '' }) }) // (d) 初期化エラー fastify.post(`${RUNTIME_PATH}/init/error`, (req, reply) => { console.log('*** init error') console.log(req.body) reply.code(202).send({ status: '' }) }) fastify.listen(port) .then(r => console.log(`started: ${r}`)) .catch(err => console.error(err))
実行
動作確認のために実行しておきます。
> cd runtime-api-server
> node server.js
started: http://127.0.0.1:8080
Go による関数ハンドラー
Go の場合、aws-lambda-go にランタイム API とやり取りを行うランタイムの処理が実装されています。
そのため、aws-lambda-go を使って実装した関数ハンドラーをローカル環境でビルドして実行するだけです。
接続するランタイム API サーバーのアドレスは AWS_LAMBDA_RUNTIME_API
環境変数で設定します。
ここでは、以下の関数ハンドラーを使用します。
sample_go/app.go (関数ハンドラー)
package main import ( "context" "fmt" "github.com/aws/aws-lambda-go/lambda" "github.com/aws/aws-lambda-go/lambdacontext" ) func handler(ctx context.Context, event interface{}) (string, error) { c, _ := lambdacontext.FromContext(ctx) fmt.Println("*** call handler") fmt.Printf("event = %#v\n", event) fmt.Printf("context = %#v\n", c) return "sample-go", nil } func main() { lambda.Start(handler) }
動作確認1
AWS_LAMBDA_RUNTIME_API 環境変数にランタイム API サーバーのアドレスを設定し、app.go をビルドして実行します。
ビルドと実行
> set AWS_LAMBDA_RUNTIME_API=localhost:8080 > cd sample_go > go build app.go > app
ランタイム API サーバー側の出力内容は下記のようになり、app.go から (a) の API が呼び出されている事を確認できます。
server.js(ランタイム API サーバー)の出力内容1
> node server.js started: http://127.0.0.1:8080 *** next null
/invoke を呼び出して、関数ハンドラーを実行してみます。
invoke の呼び出し1
$ curl -s -XPOST -H "Content-Type: application/json" http://localhost:8080/invoke -d '{"request-id":"a1", "function-arn":"sample", "body":{"name":"abc","value":1}}' {}
app.go の出力内容は以下のようになり、関数ハンドラーの実行を確認できました。
app.go の出力内容
> app *** call handler event = map[string]interface {}{"name":"abc", "value":1} context = &lambdacontext.LambdaContext{AwsRequestID:"a1", InvokedFunctionArn:"sample", Identity:lambdacontext.CognitoIdentity{CognitoIdentityID:"", CognitoIdentityPoolID:""}, ClientContext:lambdacontext.ClientContext{Client:lambdacontext.ClientApplication{InstallationID:"", AppTitle:"", AppVersionCode:"", AppPackageName:""}, Env:map[string]string(nil), Custom:map[string]string(nil)}}
ランタイム API サーバー側は以下のように、app.go からのレスポンスを受け取っている事が確認できます。
server.js(ランタイム API サーバー)の出力内容2
> node server.js started: http://127.0.0.1:8080 ・・・ *** invoke: {"request-id":"a1","function-arn":"sample","body":{"name":"abc","value":1}} *** response: id = a1 sample-go *** next null
続いて、エラー時の挙動を確認するために下記を実施します。
invoke の呼び出し2
$ curl -s -XPOST -H "Content-Type: application/json" http://localhost:8080/invoke -d '{"request-id":"b2"}' {}
app.go には何も出力されず、ランタイム API サーバー側は以下のようになり、エラーの内容を受け取っています。
server.js(ランタイム API サーバー)の出力内容3
> node server.js started: http://127.0.0.1:8080 ・・・ *** invoke: {"request-id":"b2"} *** error: id = b2 { errorMessage: 'unexpected end of JSON input', errorType: 'SyntaxError' } *** next null
Node.js による関数ハンドラー
Node.js の場合、通常は関数ハンドラーそのものを実装するだけなので、Go とは違ってランタイムが別途必要となります。
Node.js 用のランタイムとしては下記が提供されており、ネイティブコードなどを使った本格的な作りとなっています。(本番利用を想定していると思われる)
今回はこれを使わず、簡易的なランタイムを node-fetch を使って自作してみました。
node_runtime/runtime.js (Node.js 用の簡易ランタイム)
const fetch = require('node-fetch') const runtimeUrl = `http://${process.env["AWS_LAMBDA_RUNTIME_API"]}/2018-06-01/runtime/invocation` const appRoot = process.cwd() const [moduleName, handlerName] = process.argv[2].split('.') const app = require(`${appRoot}/${moduleName}`) const envData = { functionVersion: process.env["AWS_LAMBDA_FUNCTION_VERSION"], functionName: process.env["AWS_LAMBDA_FUNCTION_NAME"], memoryLimitInMB: process.env["AWS_LAMBDA_FUNCTION_MEMORY_SIZE"], logGroupName: process.env["AWS_LAMBDA_LOG_GROUP_NAME"], logStreamName: process.env["AWS_LAMBDA_LOG_STREAM_NAME"], } const run = async () => { // (a) の API を呼び出す const nextRes = await fetch(`${runtimeUrl}/next`) const deadline = parseInt(nextRes.headers.get('lambda-runtime-deadline-ms')) const context = Object.assign( { getRemainingTimeInMillis: () => deadline - Date.now(), awsRequestId: nextRes.headers.get('lambda-runtime-aws-request-id'), invokedFunctionArn: nextRes.headers.get('lambda-runtime-invoked-function-arn'), identity: {}, clientContext: {}, }, envData ) try { const event = await nextRes.json() // 関数ハンドラーの呼び出し const res = await app[handlerName](event, context) console.log(`* handler result: ${res}`) // (b) の API を呼び出す await fetch( `${runtimeUrl}/${context.awsRequestId}/response`, {method: 'POST', body: res} ) } catch (e) { // (c) の API を呼び出す await fetch( `${runtimeUrl}/${context.awsRequestId}/error`, {method: 'POST', body: JSON.stringify({type: e.type, message: e.message})} ) } } const loop = async () => { while (true) { await run() } } loop().catch(err => console.error(err))
ここでは、下記の関数ハンドラーを使う事にします。
sample_node/app.js (関数ハンドラー)
exports.handler = async (event, context) => { console.log('*** call handler') console.log(`event = ${JSON.stringify(event)}`) console.log(`context = ${JSON.stringify(context)}`) console.log(`remaining time = ${context.getRemainingTimeInMillis()}`) return 'sample-node' }
動作確認2
runtime.js を使って app.js の handler を呼び出すように実行します。
実行
> set AWS_LAMBDA_RUNTIME_API=localhost:8080 > cd sample_node > node ../node_runtime/runtime.js app.handler
ランタイム API サーバーの next が呼び出されました。
server.js(ランタイム API サーバー)の出力内容4
> node server.js started: http://127.0.0.1:8080 ・・・ *** next null
/invoke を呼び出します。
invoke の呼び出し3
$ curl -s -XPOST -H "Content-Type: application/json" http://localhost:8080/invoke -d '{"request-id":"c3", "function-arn":"sample2", "body":{"name":"def","value":2}}' {}
runtime.js + app.js の出力内容は以下のようになり、関数ハンドラーが呼び出されました。
runtime.js + app.js の出力内容
> node ../node_runtime/runtime.js app.handler *** call handler event = {"name":"def","value":2} context = {"awsRequestId":"c3","invokedFunctionArn":"sample2","identity":{},"clientContext":{}} remaining time = 4945 * handler result: sample-node
ランタイム API サーバーの出力も以下のようになり、問題は無さそうです。
server.js(ランタイム API サーバー)の出力内容5
> node server.js started: http://127.0.0.1:8080 ・・・ *** invoke: {"request-id":"c3","function-arn":"sample2","body":{"name":"def","value":2}} ・・・ *** response: id = c3 sample-node *** next null
エラーが発生するように /invoke を呼び出します。
invoke の呼び出し4
$ curl -s -XPOST -H "Content-Type: application/json" http://localhost:8080/invoke -d '{"request-id":"d4"}' {}
こちらも問題は無さそうです。
server.js(ランタイム API サーバー)の出力内容6
> node server.js started: http://127.0.0.1:8080 ・・・ *** invoke: {"request-id":"d4"} ・・・ *** error: id = d4 {"type":"invalid-json","message":"invalid json response body at http://localhost:8080/2018-06-01/runtime/invocation/next reason: Unexpected end of JSON input"} *** next null
Go言語で GraphQL - graph-gophers/graphql-go で Interface を試す
前回の graph-gophers/graphql-go を使って、GraphQL の Interface を扱ってみます。
ソースは http://github.com/fits/try_samples/tree/master/blog/20210125/
はじめに
GraphQL には Interface と Union という類似の機能が用意されており、共通のフィールドを設けるかどうかによって使い分けるようになっています。
graph-gophers/graphql-go では、具体的な型(Interface の実装型や Union の要素型)への変換メソッドを用意する事で Interface や Union を扱えるようになっています。
具体型への変換メソッド
func To<GraphQLの型名>() (<Goの型>, bool)
Go 側の実装方法はいくつか考えられるので、試しに 3通りで実装してみました。
(1) 基本形
まずは、graph-gophers/graphql-go の examples で使われている実装方法です。
Go の実装方法 | |
---|---|
GraphQL Interface | interface 埋め込み struct |
GraphQL Interface 実装型 | struct |
GraphQL 実装型への変換 | struct へのキャスト |
sample1.go
package main import ( "context" "encoding/json" graphql "github.com/graph-gophers/graphql-go" ) const ( // GraphQL スキーマ定義 schemaString = ` interface Event { id: ID! } type Created implements Event { id: ID! title: String! } type Deleted implements Event { id: ID! reason: String } type Query { events: [Event!]! } ` ) // GraphQL の Event に対応 type event interface { ID() graphql.ID } // GraphQL の Created に対応 type created struct { id string title string } func (c *created) ID() graphql.ID { return graphql.ID(c.id) } func (c *created) Title() string { return c.title } // GraphQL の Deleted に対応 type deleted struct { id string reason string } func (d *deleted) ID() graphql.ID { return graphql.ID(d.id) } func (d *deleted) Reason() *string { if d.reason == "" { return nil } return &d.reason } // GraphQL の Event に対応 type eventResolver struct { event } // GraphQL の Created 型への変換処理 func (r *eventResolver) ToCreated() (*created, bool) { c, ok := r.event.(*created) return c, ok } // GraphQL の Deleted 型への変換処理 func (r *eventResolver) ToDeleted() (*deleted, bool) { d, ok := r.event.(*deleted) return d, ok } type resolver struct{} func (r *resolver) Events() []*eventResolver { return []*eventResolver{ {&created{id: "i-1", title: "sample1"}}, {&deleted{id: "i-1"}}, {&created{id: "i-2", title: "sample2"}}, {&created{id: "i-3", title: "sample3"}}, {&deleted{id: "i-3", reason: "test"}}, } } func main() { schema := graphql.MustParseSchema(schemaString, new(resolver)) q := ` { events { __typename id ... on Created { title } ... on Deleted { reason } } } ` r := schema.Exec(context.Background(), q, "", nil) b, err := json.Marshal(r) if err != nil { panic(err) } println(string(b)) }
実行結果は以下の通りです。
実行結果
> go build sample1.go > sample1 {"data":{"events":[{"__typename":"Created","id":"i-1","title":"sample1"},{"__typename":"Deleted","id":"i-1","reason":null},{"__typename":"Created","id":"i-2","title":"sample2"},{"__typename":"Created","id":"i-3","title":"sample3"},{"__typename":"Deleted","id":"i-3","reason":"test"}]}}
Union の場合
Interface の代わりに Union を使った場合は以下のようになります。
sample1_union.go
・・・ const ( schemaString = ` union Event = Created | Deleted type Created { id: ID! title: String! } type Deleted { id: ID! reason: String } type Query { events: [Event!]! } ` ) type event interface{} ・・・ func main() { schema := graphql.MustParseSchema(schemaString, new(resolver)) q := ` { events { __typename ... on Created { id title } ... on Deleted { id reason } } } ` ・・・ }
(2) OneOf
次は、gRPC の oneof を参考にした実装です。
こちらは Interface よりも Union の実装に向いているかもしれません。
Go の実装方法 | |
---|---|
GraphQL Interface | GraphQL 実装型毎にフィールドを用意した struct |
GraphQL Interface 実装型 | struct |
GraphQL 実装型への変換 | nil では無いフィールド値を返す |
sample2.go
・・・ // GraphQL の Created に対応 type created struct { id string title string } func (c *created) ID() graphql.ID { return graphql.ID(c.id) } func (c *created) Title() string { return c.title } // GraphQL の Deleted に対応 type deleted struct { id string reason string } func (d *deleted) ID() graphql.ID { return graphql.ID(d.id) } func (d *deleted) Reason() *string { if d.reason == "" { return nil } return &d.reason } // GraphQL の Event に対応 type event struct { created *created deleted *deleted } func (e *event) ID() graphql.ID { if e.created == nil { return e.deleted.ID() } return e.created.ID() } // GraphQL の Created 型への変換処理 func (e *event) ToCreated() (*created, bool) { if e.created == nil { return nil, false } return e.created, true } // GraphQL の Deleted 型への変換処理 func (e *event) ToDeleted() (*deleted, bool) { if e.deleted == nil { return nil, false } return e.deleted, true } type resolver struct{} func (r *resolver) Events() []*event { return []*event{ {created: &created{id: "i-1", title: "sample1"}}, {deleted: &deleted{id: "i-1"}}, {created: &created{id: "i-2", title: "sample2"}}, {created: &created{id: "i-3", title: "sample3"}}, {deleted: &deleted{id: "i-3", reason: "test"}}, } } func main() { ・・・ }
実行結果
> go build sample2.go > sample2 {"data":{"events":[{"__typename":"Created","id":"i-1","title":"sample1"},{"__typename":"Deleted","id":"i-1","reason":null},{"__typename":"Created","id":"i-2","title":"sample2"},{"__typename":"Created","id":"i-3","title":"sample3"},{"__typename":"Deleted","id":"i-3","reason":"test"}]}}
(c) オールインワン
最後に、GraphQL Interface の実装型を単一の struct へ集約してみました。
実装内容が分かり難くなりそうで微妙かもしれません。
Go の実装方法 | |
---|---|
GraphQL Interface | GraphQL 実装型の全フィールドを備えた struct |
GraphQL Interface 実装型 | interface |
GraphQL 実装型への変換 | フラグやフィールド値の有無で判定して自身を返す |
sample3.go
・・・ // GraphQL の Created に対応 type created interface { ID() graphql.ID Title() string } // GraphQL の Deleted に対応 type deleted interface { ID() graphql.ID Reason() *string } // GraphQL の Event に対応 type event struct { id string title string reason string del bool // Created と Deleted の判定用 } func (e *event) ID() graphql.ID { return graphql.ID(e.id) } func (e *event) Title() string { return e.title } func (e *event) Reason() *string { if e.reason == "" { return nil } return &e.reason } // GraphQL の Created 型への変換処理 func (e *event) ToCreated() (created, bool) { if e.del { return nil, false } return e, true } // GraphQL の Deleted 型への変換処理 func (e *event) ToDeleted() (deleted, bool) { if e.del { return e, true } return nil, false } type resolver struct{} func (r *resolver) Events() []*event { return []*event{ {id: "i-1", title: "sample1"}, {id: "i-1", del: true}, {id: "i-2", title: "sample2"}, {id: "i-3", title: "sample3"}, {id: "i-3", reason: "test", del: true}, } } func main() { ・・・ }
実行結果
> go build sample3.go > sample3 {"data":{"events":[{"__typename":"Created","id":"i-1","title":"sample1"},{"__typename":"Deleted","id":"i-1","reason":null},{"__typename":"Created","id":"i-2","title":"sample2"},{"__typename":"Created","id":"i-3","title":"sample3"},{"__typename":"Deleted","id":"i-3","reason":"test"}]}}
Go言語で GraphQL - graph-gophers/graphql-go で Query, Mutation, Subscription を試す
Go言語で GraphQL を扱うライブラリはいくつかありますが、今回は下記を試しました。
文字列として定義した GraphQL スキーマを使うようになっており、それなりに使い易いと思います。
今回のソースは http://github.com/fits/try_samples/tree/master/blog/20210112/
Query 処理
まずは、単純な Query 処理を実装してみます。
MustParseSchema
に GraphQL スキーマ文字列(以下の schemaString
)と処理の実装(以下の resolver
)を与えて、Schema
を取得します。
Exec
に Context
、クエリ文字列、オペレーション名、クエリ用の変数を与えてクエリを実行します。
クエリの実行結果は Response
として返ってくるので json.Marshal
で JSON 文字列化して出力しています。
デフォルトで、GraphQL スキーマのフィールド(下記の id
や value
)の値は、該当するメソッドから取得するようになっています。※
※ 大文字・小文字は区別せず、 "_" を除いた名称が合致するメソッドを探している模様
sample1.go
package main import ( "context" "encoding/json" graphql "github.com/graph-gophers/graphql-go" ) const ( // GraphQL スキーマ定義 schemaString = ` type Item { id: ID! value: Int! } type Query { one: Item! } ` ) type item struct { id string value int32 } // GraphQL の id フィールドの値 func (i *item) ID() graphql.ID { return graphql.ID(i.id) } // GraphQL の value フィールドの値 func (i *item) Value() int32 { return i.value } type resolver struct{} // Query の one を実装 func (r *resolver) One() *item { return &item{"item-1", 12} } func main() { // GraphQL スキーマのパース schema := graphql.MustParseSchema(schemaString, new(resolver)) q := ` { one { id value } } ` // GraphQL クエリの実行 r := schema.Exec(context.Background(), q, "", nil) // JSON 文字列化 b, err := json.Marshal(r) if err != nil { panic(err) } println(string(b)) }
実行結果は以下の通りです。
ビルドと実行
> go build sample1.go > sample1 {"data":{"one":{"id":"item-1","value":12}}}
構造体のフィールドを使用
graph-gophers/graphql-go のソースコード internal/exec/resolvable/resolvable.go
を確認してみたところ、GraphQL フィールド値の取得先は以下のように探しているようでした。
- (1) 該当するメソッドを探す(findMethod の実施)
- (2) (1) で見つからず、UseFieldResolvers が適用されている場合は該当するフィールドを探す(findField の実施)
そこで、UseFieldResolvers
を適用し、item 構造体のフィールドから値を取得するようにしてみました。
sample2.go
・・・ type item struct { ID graphql.ID Value int32 } type resolver struct{} func (r *resolver) One() *item { return &item{graphql.ID("item-2"), 34} } func main() { // UseFieldResolvers 適用 schema := graphql.MustParseSchema(gqlSchema, new(resolver), graphql.UseFieldResolvers()) ・・・ }
実行結果は以下の通りです。
ビルドと実行
> go build sample2.go > sample1_field {"data":{"one":{"id":"item-2","value":34}}}
なお、このコードで UseFieldResolvers を適用しなかった場合、実行時に panic: *main.item does not resolve "Item": missing method for field "id"
となりました。
Mutation, Subscription 処理
最後に、GraphQL の enum や input を使って Mutation や Subscription を行う処理を実装してみました。
enum は string、input は 構造体で扱う事ができます。
Subscription は Subscribe
で実施し、その実装メソッドは受信用 channel(<-chan T
)を戻り値にします。
Exec の戻り値である Response の Data フィールドの型は json.RawMessage
となっているので、構造体や map へアンマーシャルする事が可能です。
sample3.go
package main import ( "context" "encoding/json" "log" "sync" "github.com/google/uuid" graphql "github.com/graph-gophers/graphql-go" ) const ( schemaString = ` enum Category { Standard Extra } input CreateItem { category: Category! value: Int! } type Item { id: ID! category: Category! value: Int! } type Mutation { create(input: CreateItem!): Item } type Query { find(id: ID!): Item } type Subscription { created: Item } ` ) // input type createItem struct { Category string Value int32 } type item struct { id string category string value int32 } func (i *item) ID() graphql.ID { return graphql.ID(i.id) } func (i *item) Category() string { return i.category } func (i *item) Value() int32 { return i.value } // item 管理 type store struct { sync.RWMutex items []*item } func (s *store) add(i *item) { s.Lock() defer s.Unlock() s.items = append(s.items, i) } func (s *store) get(id graphql.ID) *item { s.RLock() defer s.RUnlock() for _, i := range s.items { if i.ID() == id { return i } } return nil } // Subscription 用の channel 管理 type broker struct { sync.RWMutex subscribes []chan<- *item } func (b *broker) subscribe(ch chan<- *item) { log.Println("[INFO] subscribe") b.Lock() defer b.Unlock() b.subscribes = append(b.subscribes, ch) } func (b *broker) unsubscribe(ch chan<- *item) { log.Println("[INFO] unsubscribe") var tmp []chan<- *item b.Lock() defer b.Unlock() for _, s := range b.subscribes { if s != ch { tmp = append(tmp, s) } } b.subscribes = tmp } func (b *broker) publish(i *item) { b.RLock() defer b.RUnlock() for _, s := range b.subscribes { s <- i } } type resolver struct { store *store broker *broker } // Mutation の実装 func (r *resolver) Create(args struct{ Input createItem }) (*item, error) { id, err := uuid.NewRandom() if err != nil { return nil, err } i := item{id.String(), args.Input.Category, args.Input.Value} r.store.add(&i) go func() { r.broker.publish(&i) }() return &i, nil } // Query の実装 func (r *resolver) Find(args struct{ ID graphql.ID }) *item { return r.store.get(args.ID) } // Subscription の実装 func (r *resolver) Created(ctx context.Context) <-chan *item { ch := make(chan *item) r.broker.subscribe(ch) go func() { // Context キャンセル時 <-ctx.Done() log.Println("[INFO] context done") r.broker.unsubscribe(ch) close(ch) }() return ch } // Subscription の受信 func onCreated(ch <-chan interface{}) { for { r, ok := <-ch if !ok { log.Println("[INFO] closed channel") return } b, _ := json.Marshal(r) log.Println("[SUBSCRIPTION]", string(b)) } } func printResponse(r *graphql.Response) error { b, err := json.Marshal(r) if err != nil { return err } log.Println(string(b)) return nil } func main() { resolver := resolver{new(store), new(broker)} schema := graphql.MustParseSchema(schemaString, &resolver) ctx, cancel := context.WithCancel(context.Background()) s := ` subscription { created { id category value } } ` // Subscription の実施 ch, err := schema.Subscribe(ctx, s, "", nil) if err != nil { panic(err) } go onCreated(ch) m1 := ` mutation { create(input: { category: Standard, value: 10 }) { id } } ` mr1 := schema.Exec(context.Background(), m1, "", nil) _ = printResponse(mr1) var cr1 struct { Create struct { ID string } } // Mutation 結果の data の内容を構造体へアンマーシャル err = json.Unmarshal(mr1.Data, &cr1) if err != nil { panic(err) } q := ` query findItem($id: ID!) { find(id: $id) { id category value } } ` qr1 := schema.Exec(context.Background(), q, "", map[string]interface{}{"id": cr1.Create.ID}) _ = printResponse(qr1) m2 := ` mutation Create($p: CreateItem!) { create(input: $p) { id } } ` // GraphQL のクエリ用変数 vs := map[string]interface{}{ "p": map[string]interface{}{ "category": "Extra", "value": 123, }, } mr2 := schema.Exec(context.Background(), m2, "", vs) _ = printResponse(mr2) var cr2 map[string]map[string]interface{} // Mutation 結果の data の内容を map へアンマーシャル err = json.Unmarshal(mr2.Data, &cr2) if err != nil { panic(err) } qr2 := schema.Exec(context.Background(), q, "", cr2["create"]) _ = printResponse(qr2) // Subscription のキャンセル cancel() mr3 := schema.Exec(context.Background(), m2, "", map[string]interface{}{ "p": map[string]interface{}{ "category": "Extra", "value": 987, }, }) _ = printResponse(mr3) mr4 := schema.Exec(context.Background(), m2, "", map[string]interface{}{ "p": map[string]interface{}{ "category": "Standard", "value": 567, }, }) _ = printResponse(mr4) qr5 := schema.Exec(context.Background(), q, "", map[string]interface{}{"id": "invalid-id"}) _ = printResponse(qr5) }
ビルドと実行
> go build sample3.go > sample3 2021/01/11 21:03:40 [INFO] subscribe 2021/01/11 21:03:40 {"data":{"create":{"id":"507dae03-1f93-4b1a-a75e-3fc54b297ad4"}}} 2021/01/11 21:03:40 [SUBSCRIPTION] {"data":{"created":{"id":"507dae03-1f93-4b1a-a75e-3fc54b297ad4","category":"Standard","value":10}}} 2021/01/11 21:03:40 {"data":{"find":{"id":"507dae03-1f93-4b1a-a75e-3fc54b297ad4","category":"Standard","value":10}}} 2021/01/11 21:03:40 {"data":{"create":{"id":"b47bf46c-5c10-4a8f-892e-9ebfa83d576a"}}} 2021/01/11 21:03:40 [SUBSCRIPTION] {"data":{"created":{"id":"b47bf46c-5c10-4a8f-892e-9ebfa83d576a","category":"Extra","value":123}}} 2021/01/11 21:03:40 {"data":{"find":{"id":"b47bf46c-5c10-4a8f-892e-9ebfa83d576a","category":"Extra","value":123}}} 2021/01/11 21:03:40 [INFO] closed channel 2021/01/11 21:03:40 [INFO] context done 2021/01/11 21:03:40 [INFO] unsubscribe 2021/01/11 21:03:40 {"data":{"create":{"id":"aef942a6-3aa7-4b31-89c4-cd44f748bed6"}}} 2021/01/11 21:03:40 {"data":{"create":{"id":"fe3db2a4-5578-4d33-b54a-26a8b6e281f3"}}} 2021/01/11 21:03:40 {"data":{"find":null}}
Go言語でインターフェースのメソッドを列挙する
Go 言語のリフレクションで型情報を扱う Type
は以下で取得できます。
func TypeOf(i interface{}) Type
引数 i
には値を指定する事になりますが、インターフェースの型情報を取得したい場合はどうするのか気になって試してみたところ、以下で取得できました。
インターフェースの型情報を取得
TypeOf( (*対象インターフェース)(nil) ).Elem()
対象とするインターフェースのポインタ型の nil
を使って TypeOf
を実施した後、Elem
※ を使ってインターフェースの型情報を取得します。
※ Elem() で Array, Chan, Map, Ptr, Slice から 要素の型情報を取得できる
これを利用して、インターフェースのメソッドを列挙するとこのようになります。
sample.go
package main import ( "fmt" "reflect" ) type Counter interface { Up(v int) Down(v int) Current() int reset() end() int } // メソッド情報の出力 func printMethods(t reflect.Type) { for i := 0; i < t.NumMethod(); i++ { fmt.Println("method:", t.Method(i)) } } func main() { // Counter インターフェースの型情報を取得 t := reflect.TypeOf((*Counter)(nil)).Elem() printMethods(t) }
ビルドと実行
> go build sample.go > sample method: {Current func() int <invalid Value> 0} method: {Down func(int) <invalid Value> 1} method: {Up func(int) <invalid Value> 2} method: {end main func() int <invalid Value> 3} method: {reset main func() <invalid Value> 4}
ちなみに、上記コードで t := reflect.TypeOf((Counter)(nil))
のようにすると、t.NumMethod()
で panic: runtime error: invalid memory address or nil pointer dereference
となりました。
イベントベースで考える在庫管理モデル
従来のイベントソーシングのような手法だと、特定の State(というよりは Entity かも)を永続化するための手段として Event を用いるというような、あくまでも State 中心の発想になると思います。
そこで、ここでは下記のような Event 中心の発想に切り替えて、在庫管理(在庫数を把握するだけの単純なもの)を考えてみました。
State
は本質ではなく、Event
を解釈した結果にすぎない(解釈の仕方は様々)Event
を得たり、伝えたりするための手段としてState
を用いる
要するに、Event こそが重要で State(Entity とか)は取るに足らない存在だと(実験的に)考えてみたって事です。
従来のイベントソーシング | 本件 |
---|---|
State が目的、Event が手段 | Event が目的、State が手段 |
なお、ここでイメージしている Event は、特定のドメインに依存しないような本質的なものです。
在庫管理
本件の在庫管理は、以下を把握するだけの単純なものです。
- 何処に何の在庫(とりあえず現物のあるもの)がいくつあるか
実装コードは http://github.com/fits/try_samples/tree/master/blog/20201213/
1. 本質的なイベント
在庫管理で起こりそうなイベントを考えてみます。
在庫(数)は入庫と出庫の結果だと考えられるので、以下のようなイベントが考えられそうです。
- 入庫イベント
- 出庫イベント
また、シンプルに物が移動 ※ した結果が在庫なのだと考えると、(2地点間の)在庫の移動という形で抽象化できそうな気がします。
※ 概念的な移動も含める
そうすると、以下のようなイベントも考えられそうです。
- 在庫移動の開始イベント
- 在庫移動の終了(完了)イベント
ついでに、在庫の引当も区別して考えると ※、以下のようなイベントも考えられます。
- 引当イベント
※ 引当用の場所へ移動するという事にするのであれば区別しなくてもよさそう
まとめると、とりあえずは以下のようなイベントが考えられそうです。
- 在庫移動の開始イベント
- 在庫移動の完了イベント
- 在庫移動のキャンセルイベント
- 引当イベント
- 引当した場合の出庫イベント
- 引当しなかった場合の出庫イベント
- 入庫イベント
ついでに、引当や出庫などの成否をイベントとして明確に分けたい場合は、引当失敗イベント等の失敗イベントを別途設ければ良さそうな気がします。
2. イベント定義
これらのイベントを Rust と TypeScript で型定義してみました。
商品や在庫のロケーション(在庫の保管場所)の具体的な内容はどうでもよいので(ここで具体化する必要がない)、Generics の型変数で表現しておきます。
本質的に必要そうな最低限の情報のみを持たせ、余計な情報は取り除いておきます。※
※ 在庫移動を一意に限定する ID や日付のような メタデータ(と考えられるもの)に関しても除外しました
用語はとりあえず以下のようにしています。
- 引当: assign
- 出庫: ship
- 入庫: arrive
何(item
)をいくつ(qty
)、何処(from
)から何処(to
)へ移動する予定なのかという情報を持たせて在庫の移動を開始するようにしてみました。
入出庫等で予定とは異なる内容になっても不都合が生じないように、それぞれのイベントに必要な情報を持たせています。
また、全体的に ADT(代数的データ型)を意識した内容にしています。
Rust で型定義したイベント
models/events.rs
pub enum StockMoveEvent<Item, Location, Quantity> { // 開始 Started { item: Item, qty: Quantity, from: Location, to: Location, }, // 完了 Completed, // キャンセル Cancelled, // 引当 Assigned { item: Item, from: Location, assigned: Quantity, }, // 出庫 Shipped { item: Item, from: Location, outgoing: Quantity, }, // 引当に対する出庫 AssignShipped { item: Item, from: Location, outgoing: Quantity, assigned: Quantity, }, // 入庫 Arrived { item: Item, to: Location, incoming: Quantity, }, }
TypeScript で型定義したイベント
models/events.ts
export interface StockMoveEventStarted<Item, Location, Quantity> { tag: 'stock-move-event.started' item: Item qty: Quantity from: Location to: Location } export interface StockMoveEventCompleted { tag: 'stock-move-event.completed' } export interface StockMoveEventCancelled { tag: 'stock-move-event.cancelled' } export interface StockMoveEventAssigned<Item, Location, Quantity> { tag: 'stock-move-event.assigned' item: Item from: Location assigned: Quantity } export interface StockMoveEventShipped<Item, Location, Quantity> { tag: 'stock-move-event.shipped' item: Item from: Location outgoing: Quantity } export interface StockMoveEventAssignShipped<Item, Location, Quantity> { tag: 'stock-move-event.assign-shipped' item: Item from: Location outgoing: Quantity assigned: Quantity } export interface StockMoveEventArrived<Item, Location, Quantity> { tag: 'stock-move-event.arrived' item: Item to: Location incoming: Quantity } export type StockMoveEvent<Item, Location, Quantity> = StockMoveEventStarted<Item, Location, Quantity> | StockMoveEventCompleted | StockMoveEventCancelled | StockMoveEventAssigned<Item, Location, Quantity> | StockMoveEventShipped<Item, Location, Quantity> | StockMoveEventAssignShipped<Item, Location, Quantity> | StockMoveEventArrived<Item, Location, Quantity>
3. 在庫移動処理サンプル
上記で定義したイベントを以下のような(在庫移動の)ステートマシンで扱ってみます。※
※ 本件の考え方では、 (本質的な)イベントは特定の処理やルールになるべく依存していない事が重要なので、 このステートマシン(イベントを扱う手段の一つでしかない)に対して 特化しないように注意します
- 在庫移動の状態遷移の基本パターンは 3通り
- (a) 引当 -> 出庫 -> 入庫
- (b) 出庫 -> 入庫
- (c) 入庫
- 入庫の失敗状態は無し(0個の入庫で代用)
(c) は出庫側の状況が不明なケースで入庫の記録だけを残すような用途を想定しています。
3.1 ステートマシンの実装
Rust と TypeScript でそれぞれ実装してみます。
この辺のレイヤーまでは、外界の都合(※1)から隔離しておきたいと考え、関数言語的な作りにしています。
イベントと同様に在庫移動や在庫を ADT(代数的データ型) で表現し、下記のような関数(+ ユーティリティ関数)を提供するだけの作りにしてみました。(※2)
(※1)フレームワーク、UI、永続化、非同期処理、排他制御やその他諸々の都合 (※2)こうしておくと、WebAssembly 等でコンポーネント化して 再利用するなんて事も実現し易くなるかもしれませんし
- (1) 初期状態を返す関数
- (2) 現在の状態とアクションから次の状態とそれに伴って発生したイベントを返す関数(イメージとしては
State -> Action -> Container<(State, Event)>
) - (3) ある時点の状態とそれ以降に起きたイベントの内容から任意の状態を復元して返す関数
なお、ここでは (2) のアクションに相当する部分は関数(と引数の一部)として実装しています。
また、(2) で状態遷移が発生しなかった場合に undefined
を返すように実装していますが、実際は成功時と失敗時の両方を扱うようなコンテナ型(Rust の Result や Either モナドとか)で包むのが望ましいと思います。
ついでに、実装に際して以下のようなルールを加えています。
- 引当、入庫、出庫のロケーションは開始時に予定したものを使用
- 引当時にのみ在庫数を確認(残りの在庫をチェック)
- 在庫のタイプは 2種類
- 在庫数を管理するタイプ(引当分の在庫が余っている場合にのみ引当が成功、在庫数は入庫イベントと出庫イベントから算出)
- 在庫数を管理しないタイプ(引当は常に成功、在庫数は管理せず実質的に無限)
- 引当数や出庫数が 0 の場合は(引当や出庫の)失敗として扱う
引当はこの処理内における単なる数値上の予約であり、入出庫は実際の作業の結果を反映するような用途をとりあえず想定しています。
そのため、数値上の引当に成功しても実際の出庫が成功するとは限らず、数値上の在庫数以上の出庫が発生するようなケースも考えられるので、この処理ではそれらを許容するようにしています。※
※ 在庫の整合性等をどのように制御・調整するかは 使う側(外側のレイヤー)に任せる
Rust による実装
ここで、商品(以下の ItemCode)や在庫ロケーション(以下の LocationCode)の具体的な型を決めていますが、これより外側のレイヤーに具体型を決めさせるようにした方が望ましいかもしれません。
models/stockmove.rs
use std::slice; use super::events::StockMoveEvent; // 商品を識別するための型 pub type ItemCode = String; // 在庫ロケーションを識別するための型 pub type LocationCode = String; pub type Quantity = u32; pub trait Event<S> { type Output; fn apply_to(&self, state: S) -> Self::Output; } pub trait Restore<E> { fn restore(self, events: slice::Iter<E>) -> Self; } // 在庫の型定義 #[allow(dead_code)] #[derive(Debug, Clone)] pub enum Stock { Unmanaged { item: ItemCode, location: LocationCode }, Managed { item: ItemCode, location: LocationCode, qty: Quantity, assigned: Quantity }, } // 在庫に関する処理 #[allow(dead_code)] impl Stock { pub fn unmanaged_new(item: ItemCode, location: LocationCode) -> Self { Self::Unmanaged { item, location } } pub fn managed_new(item: ItemCode, location: LocationCode) -> Self { Self::Managed { item, location, qty: 0, assigned: 0 } } pub fn eq_id(&self, item: &ItemCode, location: &LocationCode) -> bool { match self { Self::Managed { item: it, location: loc, .. } | Self::Unmanaged { item: it, location: loc } => it == item && loc == location } } // 在庫数のチェック pub fn is_sufficient(&self, v: Quantity) -> bool { match self { Self::Managed { qty, assigned, .. } => v + assigned <= *qty, Self::Unmanaged { .. } => true, } } fn update(&self, qty: Quantity, assigned: Quantity) -> Self { match self { Self::Managed { item, location, .. } => { Self::Managed { item: item.clone(), location: location.clone(), qty, assigned, } }, Self::Unmanaged { .. } => self.clone(), } } fn update_qty(&self, qty: Quantity) -> Self { match self { Self::Managed { assigned, .. } => self.update(qty, *assigned), Self::Unmanaged { .. } => self.clone(), } } fn update_assigned(&self, assigned: Quantity) -> Self { match self { Self::Managed { qty, .. } => self.update(*qty, assigned), Self::Unmanaged { .. } => self.clone(), } } } // 在庫に対するイベントの適用 impl Event<Stock> for MoveEvent { type Output = Stock; fn apply_to(&self, state: Stock) -> Self::Output { match &state { Stock::Unmanaged { .. } => state, Stock::Managed { item: s_item, location: s_loc, qty: s_qty, assigned: s_assigned } => { match self { Self::Assigned { item, from, assigned } if s_item == item && s_loc == from => { state.update_assigned( s_assigned + assigned ) }, Self::Shipped { item, from, outgoing } if s_item == item && s_loc == from => { state.update_qty( s_qty.checked_sub(*outgoing).unwrap_or(0) ) }, Self::AssignShipped { item, from, outgoing, assigned } if s_item == item && s_loc == from => { state.update( s_qty.checked_sub(*outgoing).unwrap_or(0), s_assigned.checked_sub(*assigned).unwrap_or(0), ) }, Self::Arrived { item, to, incoming } if s_item == item && s_loc == to => { state.update_qty( s_qty + incoming ) }, _ => state, } }, } } } #[derive(Debug, Default, Clone, PartialEq)] pub struct StockMoveInfo { item: ItemCode, qty: Quantity, from: LocationCode, to: LocationCode, } // 在庫移動の型(状態)定義 #[allow(dead_code)] #[derive(Debug, Clone, PartialEq)] pub enum StockMove { Nothing, Draft { info: StockMoveInfo }, Completed { info: StockMoveInfo, outgoing: Quantity, incoming: Quantity }, Cancelled { info: StockMoveInfo }, Assigned { info: StockMoveInfo, assigned: Quantity }, Shipped { info: StockMoveInfo, outgoing: Quantity }, Arrived { info: StockMoveInfo, outgoing: Quantity, incoming: Quantity }, AssignFailed { info: StockMoveInfo }, ShipmentFailed { info: StockMoveInfo }, } type MoveEvent = StockMoveEvent<ItemCode, LocationCode, Quantity>; type MoveResult = Option<(StockMove, MoveEvent)>; // 在庫移動に関する処理 #[allow(dead_code)] impl StockMove { // 初期状態の取得 pub fn initial_state() -> Self { Self::Nothing } // 開始 pub fn start(&self, item: ItemCode, qty: Quantity, from: LocationCode, to: LocationCode) -> MoveResult { if qty < 1 { return None } let event = StockMoveEvent::Started { item: item.clone(), qty: qty, from: from.clone(), to: to.clone() }; self.apply_event(event) } // 引当 pub fn assign(&self, stock: &Stock) -> MoveResult { if let Some(info) = self.info() { if stock.eq_id(&info.item, &info.from) { let assigned = if stock.is_sufficient(info.qty) { info.qty } else { 0 }; return self.apply_event( StockMoveEvent::Assigned { item: info.item.clone(), from: info.from.clone(), assigned, } ) } } None } // 出庫 pub fn ship(&self, outgoing: Quantity) -> MoveResult { let ev = match self { Self::Assigned { info, assigned } => { Some(StockMoveEvent::AssignShipped { item: info.item.clone(), from: info.from.clone(), outgoing, assigned: assigned.clone(), }) }, _ => { self.info() .map(|i| StockMoveEvent::Shipped { item: i.item.clone(), from: i.from.clone(), outgoing, } ) }, }; ev.and_then(|e| self.apply_event(e)) } // 入庫 pub fn arrive(&self, incoming: Quantity) -> MoveResult { self.info() .and_then(|i| self.apply_event(StockMoveEvent::Arrived { item: i.item.clone(), to: i.to.clone(), incoming, }) ) } pub fn complete(&self) -> MoveResult { self.apply_event(StockMoveEvent::Completed) } pub fn cancel(&self) -> MoveResult { self.apply_event(StockMoveEvent::Cancelled) } fn info(&self) -> Option<StockMoveInfo> { match self { Self::Draft { info } | Self::Completed { info, .. } | ・・・ Self::Arrived { info, .. } => { Some(info.clone()) }, Self::Nothing => None, } } fn apply_event(&self, event: MoveEvent) -> MoveResult { let new_state = event.apply_to(self.clone()); Some((new_state, event)) .filter(|r| r.0 != *self) } } // 在庫移動に対するイベントの適用 impl Event<StockMove> for MoveEvent { type Output = StockMove; fn apply_to(&self, state: StockMove) -> Self::Output { match self { Self::Started { item, qty, from, to } => { if state == StockMove::Nothing { StockMove::Draft { info: StockMoveInfo { item: item.clone(), qty: qty.clone(), from: from.clone(), to: to.clone(), } } } else { state } }, Self::Completed => { if let StockMove::Arrived { info, outgoing, incoming } = state { StockMove::Completed { info: info.clone(), outgoing, incoming } } else { state } }, Self::Cancelled => { if let StockMove::Draft { info } = state { StockMove::Cancelled { info: info.clone() } } else { state } }, Self::Assigned { item, from, assigned } => { match state { StockMove::Draft { info } if info.item == *item && info.from == *from => { if *assigned > 0 { StockMove::Assigned { info: info.clone(), assigned: assigned.clone(), } } else { StockMove::AssignFailed { info: info.clone() } } }, _ => state, } }, Self::Shipped { item, from, outgoing } => { match state { StockMove::Draft { info } if info.item == *item && info.from == *from => { if *outgoing > 0 { StockMove::Shipped { info: info.clone(), outgoing: outgoing.clone(), } } else { StockMove::ShipmentFailed { info: info.clone() } } }, _ => state, } }, Self::AssignShipped { item, from, outgoing, .. } => { match state { StockMove::Assigned { info, .. } if info.item == *item && info.from == *from => { if *outgoing > 0 { StockMove::Shipped { info: info.clone(), outgoing: outgoing.clone(), } } else { StockMove::ShipmentFailed { info: info.clone() } } }, _ => state, } }, Self::Arrived { item, to, incoming } => { match state { StockMove::Draft { info } if info.item == *item && info.to == *to => { StockMove::Arrived { info: info.clone(), outgoing: 0, incoming: *incoming, } }, StockMove::Shipped { info, outgoing } if info.item == *item && info.to == *to => { StockMove::Arrived { info: info.clone(), outgoing, incoming: *incoming, } }, _ => state, } }, } } } // 在庫や在庫移動の状態復元 impl<S, E> Restore<&E> for S where Self: Clone, E: Event<Self, Output = Self>, { fn restore(self, events: slice::Iter<&E>) -> Self { events.fold(self, |acc, ev| ev.apply_to(acc.clone())) } }
TypeScript による実装
実装の仕方が多少違っていますが、Rust 版の処理内容と概ね同じ(にしたつもり)です。
models/stockmove.ts
import { StockMoveEvent, StockMoveEventShipped, StockMoveEventAssignShipped } from './events' export type ItemCode = string export type LocationCode = string export type Quantity = number export type MoveEvent = StockMoveEvent<ItemCode, LocationCode, Quantity> type ShippedMoveEvent = StockMoveEventShipped<ItemCode, LocationCode, Quantity> type AssignShippedMoveEvent = StockMoveEventAssignShipped<ItemCode, LocationCode, Quantity> interface StockUnmanaged { tag: 'stock.unmanaged' item: ItemCode location: LocationCode } interface StockManaged { tag: 'stock.managed' item: ItemCode location: LocationCode qty: Quantity assigned: Quantity } // 在庫の型定義 export type Stock = StockUnmanaged | StockManaged // 在庫に関する処理 export class StockAction { static newUnmanaged(item: ItemCode, location: LocationCode): Stock { return { tag: 'stock.unmanaged', item, location } } static newManaged(item: ItemCode, location: LocationCode): Stock { return { tag: 'stock.managed', item, location, qty: 0, assigned: 0 } } // 在庫数のチェック static isSufficient(stock: Stock, qty: Quantity): boolean { switch (stock.tag) { case 'stock.unmanaged': return true case 'stock.managed': return qty + Math.max(0, stock.assigned) <= Math.max(0, stock.qty) } } } // 在庫の復元処理 export class StockRestore { static restore(state: Stock, events: MoveEvent[]): Stock { return events.reduce(StockRestore.applyTo, state) } // 在庫に対するイベントの適用 private static applyTo(state: Stock, event: MoveEvent): Stock { if (state.tag == 'stock.managed') { switch (event.tag) { case 'stock-move-event.assigned': if (state.item == event.item && state.location == event.from) { return StockRestore.updateAssigned( state, state.assigned + event.assigned ) } break case 'stock-move-event.assign-shipped': if (state.item == event.item && state.location == event.from) { return StockRestore.updateStock( state, state.qty - event.outgoing, state.assigned - event.assigned ) } break ・・・ } } return state } private static updateStock(stock: Stock, qty: Quantity, assigned: Quantity): Stock { switch (stock.tag) { case 'stock.unmanaged': return stock case 'stock.managed': return { tag: stock.tag, item: stock.item, location: stock.location, qty, assigned } } } ・・・ } interface StockMoveInfo { item: ItemCode qty: Quantity from: LocationCode to: LocationCode } interface StockMoveNothing { tag: 'stock-move.nothing' } interface StockMoveDraft { tag: 'stock-move.draft' info: StockMoveInfo } ・・・ // 在庫移動の型(状態)定義 export type StockMove = StockMoveNothing | StockMoveDraft | StockMoveCompleted | StockMoveCancelled | StockMoveAssigned | StockMoveShipped | StockMoveArrived | StockMoveAssignFailed | StockMoveShipmentFailed export type StockMoveResult = [StockMove, MoveEvent] | undefined // 在庫移動に関する処理 export class StockMoveAction { // 初期状態を取得 static initialState(): StockMove { return { tag: 'stock-move.nothing' } } // 開始 static start(state: StockMove, item: ItemCode, qty: Quantity, from: LocationCode, to: LocationCode): StockMoveResult { if (qty < 1) { return undefined } const event: MoveEvent = { tag: 'stock-move-event.started', item, qty, from, to } return StockMoveAction.applyTo(state, event) } // 引当 static assign(state: StockMove, stock: Stock): StockMoveResult { const info = StockMoveAction.info(state) if (info && info.item == stock.item && info.from == stock.location) { const assigned = (stock && StockAction.isSufficient(stock, info.qty)) ? info.qty : 0 const event: MoveEvent = { tag: 'stock-move-event.assigned', item: info.item, from: info.from, assigned } return StockMoveAction.applyTo(state, event) } return undefined } // 出庫 static ship(state: StockMove, outgoing: Quantity): StockMoveResult { if (outgoing < 0) { return undefined } const event = StockMoveAction.toShippedEvent(state, outgoing) return event ? StockMoveAction.applyTo(state, event) : undefined } // 入庫 static arrive(state: StockMove, incoming: Quantity): StockMoveResult { if (incoming < 0) { return undefined } const info = StockMoveAction.info(state) if (info) { const event: MoveEvent = { tag: 'stock-move-event.arrived', item: info.item, to: info.to, incoming } return StockMoveAction.applyTo(state, event) } return undefined } ・・・ static info(state: StockMove) { if (state.tag != 'stock-move.nothing') { return state.info } return undefined } private static applyTo(state: StockMove, event: MoveEvent): StockMoveResult { const nextState = StockMoveRestore.restore(state, [event]) return (nextState != state) ? [nextState, event] : undefined } private static toShippedEvent(state: StockMove, outgoing: number): MoveEvent | undefined { const info = StockMoveAction.info(state) if (info) { if (state.tag == 'stock-move.assigned') { return { tag: 'stock-move-event.assign-shipped', item: info.item, from: info.from, assigned: state.assigned, outgoing } } else { return { tag: 'stock-move-event.shipped', item: info.item, from: info.from, outgoing } } } return undefined } } // 在庫移動の復元処理 export class StockMoveRestore { static restore(state: StockMove, events: MoveEvent[]): StockMove { return events.reduce(StockMoveRestore.applyTo, state) } // 在庫移動に対するイベントの適用 private static applyTo(state: StockMove, event: MoveEvent): StockMove { switch (state.tag) { case 'stock-move.nothing': if (event.tag == 'stock-move-event.started') { return { tag: 'stock-move.draft', info: { item: event.item, qty: event.qty, from: event.from, to: event.to } } } break case 'stock-move.draft': return StockMoveRestore.applyEventToDraft(state, event) case 'stock-move.assigned': if (event.tag == 'stock-move-event.assign-shipped') { return StockMoveRestore.applyShipped(state, event) } break case 'stock-move.shipped': if (event.tag == 'stock-move-event.arrived' && state.info.item == event.item && state.info.to == event.to) { return { tag: 'stock-move.arrived', info: state.info, outgoing: state.outgoing, incoming: event.incoming } } break case 'stock-move.arrived': if (event.tag == 'stock-move-event.completed') { return { tag: 'stock-move.completed', info: state.info, outgoing: state.outgoing, incoming: state.incoming } } break case 'stock-move.completed': case 'stock-move.cancelled': case 'stock-move.assign-failed': case 'stock-move.shipment-failed': break } return state } private static applyShipped(state: StockMoveDraft | StockMoveAssigned, event: ShippedMoveEvent | AssignShippedMoveEvent): StockMove { if (state.info.item == event.item && state.info.from == event.from) { if (event.outgoing > 0) { return { tag: 'stock-move.shipped', info: state.info, outgoing: event.outgoing } } else { return { tag: 'stock-move.shipment-failed', info: state.info } } } return state } private static applyEventToDraft(state: StockMoveDraft, event: MoveEvent): StockMove { switch (event.tag) { case 'stock-move-event.cancelled': return { tag: 'stock-move.cancelled', info: state.info } case 'stock-move-event.assigned': if (state.info.item == event.item && state.info.from == event.from) { if (event.assigned > 0) { return { tag: 'stock-move.assigned', info: state.info, assigned: event.assigned } } else { return { tag: 'stock-move.assign-failed', info: state.info } } } break case 'stock-move-event.shipped': return StockMoveRestore.applyShipped(state, event) case 'stock-move-event.arrived': if (state.info.item == event.item && state.info.to == event.to) { return { tag: 'stock-move.arrived', info: state.info, outgoing: 0, incoming: Math.max(event.incoming, 0) } } break } return state } }
3.2 GraphQL 化 + MongoDB へ永続化
ついでに、前述のステートマシン(TypeScript 実装版)を Apollo Server で GraphQL 化し、MongoDB へ永続化するようにしてみました。
index.ts
import { ApolloServer, gql } from 'apollo-server' import { v4 as uuidv4 } from 'uuid' import { MongoClient, Collection } from 'mongodb' import { ItemCode, LocationCode, MoveEvent, StockMoveAction, StockMoveRestore, StockMove, StockMoveResult, StockAction, StockRestore, Stock } from './models' const mongoUrl = 'mongodb://localhost' const dbName = 'stockmoves' const colName = 'events' const stocksColName = 'stocks' type MoveId = string type Revision = number // MongoDB へ保存するイベント内容 interface StoredEvent { move_id: MoveId revision: Revision item: ItemCode from: LocationCode to: LocationCode event: MoveEvent } interface RestoredStockMove { state: StockMove revision: Revision } // MongoDB への永続化処理 class Store { ・・・ async loadStock(item: ItemCode, location: LocationCode): Promise<Stock | undefined> { const id = this.stockId(item, location) const stock = await this.stocksCol.findOne({ _id: id }) if (!stock) { return undefined } const query = { '$and': [ { item }, { '$or': [ { from: location }, { to: location } ]} ] } const events = await this.eventsCol .find(query) .map(r => r.event) .toArray() return StockRestore.restore(stock, events) } async saveStock(stock: Stock): Promise<void> { const id = this.stockId(stock.item, stock.location) const res = await this.stocksCol.updateOne( { _id: id }, { '$setOnInsert': stock }, { upsert: true } ) if (res.upsertedCount == 0) { return Promise.reject('conflict stock') } } async loadMove(moveId: MoveId): Promise<RestoredStockMove | undefined> { const events: StoredEvent[] = await this.eventsCol .find({ move_id: moveId }) .sort({ revision: 1 }) .toArray() const state = StockMoveAction.initialState() const revision = events.reduce((acc, e) => Math.max(acc, e.revision), 0) const res = StockMoveRestore.restore(state, events.map(e => e.event)) return (res == state) ? undefined : { state: res, revision } } async saveEvent(event: StoredEvent): Promise<void> { const res = await this.eventsCol.updateOne( { move_id: event.move_id, revision: event.revision }, { '$setOnInsert': event }, { upsert: true } ) if (res.upsertedCount == 0) { return Promise.reject(`conflict event revision=${event.revision}`) } } private stockId(item: ItemCode, location: LocationCode): string { return `${item}/${location}` } } // GraphQL スキーマ定義 const typeDefs = gql(` type StockMoveInfo { item: ID! qty: Int! from: ID! to: ID! } interface StockMove { id: ID! info: StockMoveInfo! } type DraftStockMove implements StockMove { id: ID! info: StockMoveInfo! } type CompletedStockMove implements StockMove { id: ID! info: StockMoveInfo! outgoing: Int! incoming: Int! } ・・・ interface Stock { item: ID! location: ID! } type UnmanagedStock implements Stock { item: ID! location: ID! } type ManagedStock implements Stock { item: ID! location: ID! qty: Int! assigned: Int! } input CreateStockInput { item: ID! location: ID! } input StartMoveInput { item: ID! qty: Int! from: ID! to: ID! } type Query { findStock(item: ID!, location: ID!): Stock findMove(id: ID!): StockMove } type Mutation { createManaged(input: CreateStockInput!): ManagedStock createUnmanaged(input: CreateStockInput!): UnmanagedStock start(input: StartMoveInput!): StockMove assign(id: ID!): StockMove ship(id: ID!, outgoing: Int!): StockMove arrive(id: ID!, incoming: Int!): StockMove complete(id: ID!): StockMove cancel(id: ID!): StockMove } `) const toStockMoveForGql = (id: MoveId, state: StockMove | undefined) => { if (state) { return { id, ...state } } return undefined } type MoveAction = (state: StockMove) => StockMoveResult const doMoveAction = async (store: Store, rs: RestoredStockMove | undefined, id: MoveId, action: MoveAction) => { if (rs) { const res = action(rs.state) if (res) { const [mv, ev] = res const info = StockMoveAction.info(mv) if (info) { const event = { move_id: id, revision: rs.revision + 1, item: info.item, from: info.from, to: info.to, event: ev } await store.saveEvent(event) return toStockMoveForGql(id, mv) } } } return undefined } // GraphQL 処理の実装 const resolvers = { Stock: { __resolveType: (obj, ctx, info) => { if (obj.tag == 'stock.managed') { return 'ManagedStock' } return 'UnmanagedStock' } }, StockMove: { __resolveType: (obj: StockMove, ctx, info) => { switch (obj.tag) { case 'stock-move.draft': return 'DraftStockMove' case 'stock-move.completed': return 'CompletedStockMove' ・・・ case 'stock-move.shipment-failed': return 'ShipmentFailedStockMove' } return undefined } }, Query: { findStock: async (parent, { item, location }, { store }, info) => { return store.loadStock(item, location) }, findMove: async (parent, { id }, { store }, info) => { const res = await store.loadMove(id) return toStockMoveForGql(id, res?.state) } }, Mutation: { createManaged: async (parent, { input: { item, location } }, { store }, info) => { const s = StockAction.newManaged(item, location) await store.saveStock(s) return s }, ・・・ start: async (parent, { input: { item, qty, from, to } }, { store }, info) => { const rs = { state: StockMoveAction.initialState(), revision: 0 } const id = `move-${uuidv4()}` return doMoveAction( store, rs, id, s => StockMoveAction.start(s, item, qty, from, to) ) }, assign: async(parent, { id }, { store }, info) => { const rs = await store.loadMove(id) if (rs) { const info = StockMoveAction.info(rs.state) if (info) { const stock = await store.loadStock(info.item, info.from) return doMoveAction( store, rs, id, s => StockMoveAction.assign(s, stock) ) } } return undefined }, ship: async(parent, { id, outgoing }, { store }, info) => { const rs = await store.loadMove(id) return doMoveAction( store, rs, id, s => StockMoveAction.ship(s, outgoing) ) }, ・・・ } } const run = async () => { const mongo = await MongoClient.connect(mongoUrl, { useUnifiedTopology: true }) const eventsCol = mongo.db(dbName).collection(colName) const stocksCol = mongo.db(dbName).collection(stocksColName) const store = new Store(eventsCol, stocksCol) const server = new ApolloServer({ typeDefs, resolvers, context: { store } }) const res = await server.listen() console.log(res.url) } run().catch(err => console.error(err))
クライアント実装例
以下のように GraphQL クエリを送信する事で操作できます。
client/create_stock.ts (在庫の作成)
import { request, gql } from 'graphql-request' const endpoint = 'http://localhost:4000' const item = process.argv[2] const location = process.argv[3] const q1 = gql` mutation CreateUnmanaged($item: ID!, $location: ID!) { createUnmanaged(input: { item: $item, location: $location }) { __typename item location } } ` const q2 = gql` mutation CreateManaged($item: ID!, $location: ID!) { createManaged(input: { item: $item, location: $location }) { __typename item location } } ` const query = process.argv.length > 4 ? q1 : q2 request(endpoint, query, { item, location }) .then(r => console.log(r)) .catch(err => console.error(err))
create_stock.ts 実行例
> ts-node create_stock.ts item-1 store-A { createManaged: { __typename: 'ManagedStock', item: 'item-1', location: 'store-A' } }
client/start_move.ts (在庫移動の開始)
・・・ const item = process.argv[2] const qty = parseInt(process.argv[3]) const from = process.argv[4] const to = process.argv[5] const query = gql` mutation { start(input: { item: "${item}", qty: ${qty}, from: "${from}", to: "${to}" }) { __typename id info { item qty from to } } } ` request(endpoint, query) .then(r => console.log(r)) .catch(err => console.error(err))
start_move.ts 実行例
> ts-node start_move.ts item-1 5 store-A store-B { start: { __typename: 'DraftStockMove', id: 'move-cfa1fc9c-b599-4854-8385-207cbb77e8a3', info: { item: 'item-1', qty: 5, from: 'store-A', to: 'store-B' } } }
client/find_move.ts (在庫移動の取得)
・・・ const id = process.argv[2] const query = gql` { findMove(id: "${id}") { __typename id info { item qty from to } ... on AssignedStockMove { assigned } ... on ShippedStockMove { outgoing } ... on ArrivedStockMove { outgoing incoming } ... on CompletedStockMove { outgoing incoming } } } ` request(endpoint, query) .then(r => console.log(r)) .catch(err => console.error(err))
find_move.ts 実行例
> ts-node find_move.ts move-cfa1fc9c-b599-4854-8385-207cbb77e8a3 { findMove: { __typename: 'CompletedStockMove', id: 'move-cfa1fc9c-b599-4854-8385-207cbb77e8a3', info: { item: 'item-1', qty: 5, from: 'store-A', to: 'store-B' }, outgoing: 5, incoming: 5 } }