CDK と LocalStack でローカルに Lambda と DynamoDB の実行環境を構築

AWS CDK (Cloud Development Kit) を使って、ローカル環境の LocalStack に Lambda 関数と DynamoDB のテーブルを構築してみました。

下記のようなツールを使用し、CDK によるスタックと Lambda 関数ハンドラーは TypeScript で実装しました。

今回のソースは http://github.com/fits/try_samples/tree/master/blog/20210524/

1. はじめに

今回は cdk init を使わずに CDK のコードを自前で構築する事にします。

まず、TypeScript で実装するために下記モジュールを

  • typescript
  • ts-node
  • @types/node

CDK を使って Lambda と DynamoDB を定義するために下記モジュールを

TypeScript のビルド用に下記モジュールを

  • esbuild

Lambda から DynamoDB へ接続するために下記モジュールを

そして、aws-cdk を使って LocalStack に対してデプロイ等を実施するために下記モジュールを

  • aws-cdk-local

これらをインストールして package.json は以下のようになりました。

package.json
{
  "name": "cdk_localstack_sample",
  "version": "1.0.0",
  "description": "",
  "devDependencies": {
    "@types/node": "^14.17.0",
    "aws-cdk": "^1.105.0",
    "aws-cdk-local": "^1.65.4",
    "esbuild": "^0.12.1",
    "ts-node": "^9.1.1",
    "typescript": "^4.2.4"
  },
  "dependencies": {
    "@aws-cdk/aws-dynamodb": "^1.105.0",
    "@aws-cdk/aws-lambda": "^1.105.0",
    "@aws-cdk/aws-lambda-nodejs": "^1.105.0",
    "@aws-sdk/client-dynamodb": "^3.16.0"
  }
}

CDK 用の設定ファイル cdk.json はシンプルに以下のようにしました。

cdk.json
{
    "app": "npx ts-node --prefer-ts-exts app.ts"
}

2. 実装

DynamoDB のテーブルと Lambda 関数を構築するだけの簡単なスタックを定義しました。

app.ts (スタック定義)
import { App, Construct, Stack, StackProps, CfnOutput } from '@aws-cdk/core'
import * as dynamodb from '@aws-cdk/aws-dynamodb'
import * as lambda from '@aws-cdk/aws-lambda'
import { NodejsFunction } from '@aws-cdk/aws-lambda-nodejs'

class SampleStack extends Stack {
    constructor(scope: Construct, id: string, props?: StackProps) {
        super(scope, id, props)

        const table = new dynamodb.Table(this, 'Items', {
            partitionKey: { name: 'id', type: dynamodb.AttributeType.STRING}
        })

        const func = new NodejsFunction(this, 'SampleFunc', {
            runtime: lambda.Runtime.NODEJS_14_X,
            entry: './src/handler.ts',
            environment: {
                'TABLE_NAME': table.tableName
            }
        })

        // テーブルへの書き込み権限を Lambda に付与
        table.grantWriteData(func)

        // Lambda 関数名の出力
        new CfnOutput(this, 'functionName', {
            value: func.functionName
        })
    }
}

const app = new App()

new SampleStack(app, 'SampleStack')

続いて Lambda 関数ハンドラーです。

LocalStack で実行している場合に、接続先の DynamoDB を LocalStack のものに切り替える必要があります。

また、LocalStack では Node.js ランタイムの Lambda 関数ハンドラーを docker で実行するようになっており、その際に LocalStack のホスト名は LOCALSTACK_HOSTNAME 環境変数で、ポート番号は EDGE_PORT 環境変数でそれぞれ渡されるようになっていました。

そこで、今回はこれらの環境変数の値を利用して DynamoDB の接続先を変えるようにしてみました。

src/handler.ts(Lambda 関数ハンドラー)
import { DynamoDBClient, PutItemCommand } from '@aws-sdk/client-dynamodb'

const tableName = process.env.TABLE_NAME

const config = {}

if (process.env.LOCALSTACK_HOSTNAME) {
    // LocalStack の DynamoDB へ接続するための設定
    config['endpoint'] = `http://${process.env.LOCALSTACK_HOSTNAME}:${process.env.EDGE_PORT}`
}

const client = new DynamoDBClient(config)

export interface Input {
    id: string
}

export const handler = async (event: Input) => {
    const res = await client.send(new PutItemCommand({
        TableName: tableName,
        Item: {
            id: { S: event.id }
        }
    }))

    console.log(`dynamodb put-item: ${JSON.stringify(res)}`)

    return {
        statusCode: 201,
        body: {
            id: event.id
        }
    }
}

3. デプロイと動作確認

それでは、LocalStack を実行してデプロイを実施してみます。

LocalStack 実行

docker コマンドを使って LocalStack を実行します。

Node.js ランタイムの Lambda 関数を実行するために LAMBDA_EXECUTOR 環境変数の値を docker にして、/var/run/docker.sockマッピングしています。

docker で LocalStack 実行
$ docker run --rm -it -p 4566:4566 -e LAMBDA_EXECUTOR=docker -v /var/run/docker.sock:/var/run/docker.sock localstack/localstack

デプロイ

aws-cdk-local モジュールには、cdklocal コマンドという LocalStack に対してデプロイ等を実施する cdk コマンドのラッパーが用意されているので、このコマンドを使います。

cdk コマンドと同様に初回時は bootstrap を実行します。

ブートストラップ処理
$ npx cdklocal bootstrap

 ⏳  Bootstrapping environment aws://000000000000/ap-northeast-1...
CDKToolkit: creating CloudFormation changeset...
7:38:43 AM | UPDATE_IN_PROGRESS   | AWS::CloudFormation::Stack | UsePublicAccessBlockConfiguration

 ✅  Environment aws://000000000000/ap-northeast-1 bootstrapped.

続いてデプロイを実施します。

デプロイ処理
$ npx cdklocal deploy --require-approval never

Bundling asset SampleStack/SampleFunc/Code/Stage...
  ・・・
SampleStack: deploying...
・・・
SampleStack: creating CloudFormation changeset...
・・・


 ✅  SampleStack

Outputs:
SampleStack.functionName = SampleStack-lambda-c75c6ee1

Stack ARN:
arn:aws:cloudformation:us-east-1:000000000000:stack/SampleStack/c83384b9

これで DynamoDB のテーブルと Lambda 関数が作られ、Lambda の関数名は SampleStack-lambda-c75c6ee1 となりました。

動作確認

ここからは AWS CLI v2 を使って動作確認してみます。

LocalStack へ接続するには --endpoint-urlhttp://localhost:4566 を設定して aws コマンドを使います。

それでは、lambda invoke を使って SampleStack-lambda-c75c6ee1 を実行してみます。

Lambda の実行
$ aws --endpoint-url=http://localhost:4566 lambda invoke --function-name SampleStack-lambda-c75c6ee1  --payload '{"id":"id1"}' --cli-binary-format raw-in-base64-out output.json

{
    "StatusCode": 200,
    "LogResult": "",
    "ExecutedVersion": "$LATEST"
}

output.json の内容は以下のようになり、正常に実行できているようです。

output.json
{"body":{"id":"id1"},"statusCode":201}

次に、この Lambda 関数が出力した CloudWatch のログを確認してみます。 こちらも特に問題は無さそうです。

CloudWatch のログ確認
$ aws --endpoint-url=http://localhost:4566 logs tail "/aws/lambda/SampleStack-lambda-c75c6ee1"

2021-05-23T22:42:30.401000+00:00 2021/05/23/[LATEST]24101d6b START RequestId: ced21627-9622-15e9-45cd-e1c1c93c39a9 Version: $LATEST
2021-05-23T22:42:30.419000+00:00 2021/05/23/[LATEST]24101d6b 
2021-05-23T22:42:30.437000+00:00 2021/05/23/[LATEST]24101d6b 2021-05-23T22:42:28.141Z   ced21627-9622-15e9-45cd-e1c1c93c39a9    INFO    dynamodb put-item: {"$metadata":{"httpStatusCode":200,"requestId":"0f5854a3-1110-42e7-bd95-3ff8bd8bb003","attempts":1,"totalRetryDelay":0},"ConsumedCapacity":{"CapacityUnits":1,"TableName":"SampleStack-Items5C12978B-b5cec818"}}
2021-05-23T22:42:30.473000+00:00 2021/05/23/[LATEST]24101d6b END RequestId: ced21627-9622-15e9-45cd-e1c1c93c39a9
・・・

最後に、SampleStack-Items5C12978B-b5cec818 テーブルの内容も出力してみました。

DynamoDB テーブルの検索結果
$ aws --endpoint-url=http://localhost:4566 dynamodb scan --table-name SampleStack-Items5C12978B-b5cec818

{
    "Items": [
        {
            "id": {
                "S": "id1"
            }
        }
    ],
    "Count": 1,
    "ScannedCount": 1,
    "ConsumedCapacity": null
}

CDK で作った CloudFormation テンプレートをプログラム内からデプロイする

AWS CDK (Cloud Development Kit) では通常 cdk deploy コマンドを使ってデプロイ処理を実施します。

これを cdk コマンドを使わずにプログラム内から実施できないか、以下の 2通りで試してみました。

  • (a) AWS CDK の API を利用
  • (b) AWS SDK の CloudFormation API を利用

なお、実際のところ (a) の場合でも、内部的には CloudFormation の API が呼び出されています。

今回のソースは http://github.com/fits/try_samples/tree/master/blog/20210418/

はじめに

今回は DynamoDB のテーブルを作るだけの単純なスタック(以下)を使用します。

a_sample/stack.ts
import { Construct, Stack, StackProps, CfnOutput } from '@aws-cdk/core'
import * as dynamodb from '@aws-cdk/aws-dynamodb'

export class SampleStack extends Stack {
    constructor(scope: Construct, id: string, props?: StackProps) {
        super(scope, id, props)

        const table = new dynamodb.Table(this, 'Items', {
            partitionKey: { name: 'id', type: dynamodb.AttributeType.STRING }
        })

        new CfnOutput(this, 'TableName', {
            value: table.tableName
        })
    }
}

このスタックの CloudFormation テンプレートは、以下のような処理で文字列として取得できます。

a_sample/sample.ts
import { App } from '@aws-cdk/core'
import { SampleStack } from './stack'

const app = new App()
new SampleStack(app, 'SampleStack')
// CloudFormation テンプレートの文字列化
const tpl = JSON.stringify(app.synth().stacks[0].template)

console.log(tpl)
実行結果例
> cd a_sample

> npx ts-node sample.ts
{"Resources":{"Items5C12978B":{"Type":"AWS::DynamoDB::Table","Properties":{"KeySchema":[{"AttributeName":"id","KeyType":"HASH"}],"AttributeDefinitions":[{"AttributeName":"id","AttributeType":"S"}],"ProvisionedThroughput":{"ReadCapacityUnits":5,"WriteCapacityUnits":5}},"UpdateReplacePolicy":"Retain","DeletionPolicy":"Retain"}},"Outputs":{"TableName":{"Value":{"Ref":"Items5C12978B"}}}}

(a) AWS CDK の API を利用

cdk deploy コマンド処理内で使われている CloudFormationDeployments を直接使うのが簡単そうだったので、今回はこれを使います。

デフォルトでは cdk deploy 時と同じように進捗状況が出力されますが、これは quiettrue にする事で抑制できました。

a_sample/deploy_a.ts
import { App } from '@aws-cdk/core'
import { SdkProvider } from 'aws-cdk/lib/api/aws-auth'
import { CloudFormationDeployments } from 'aws-cdk/lib/api/cloudformation-deployments'

import { SampleStack } from './stack'

const app = new App()
new SampleStack(app, 'Sample1Stack')

const run = async () => {
    const deployer = new CloudFormationDeployments({
        sdkProvider: await SdkProvider.withAwsCliCompatibleDefaults()
    })
    // デプロイ処理
    const r = await deployer.deployStack({
        stack: app.synth().stacks[0],
        quiet: true
    })

    console.log(r)
}

run().catch(err => console.error(err))
パッケージインストール例
> cd a_sample
> npm install -D ts-node typescript @types/node
・・・
> npm install @aws-cdk/aws-dynamodb aws-cdk
・・・

実行結果は以下の通りです。 CloudFormation のスタックが登録され、DynamoDB のテーブルが問題なく作成されました。

実行結果
> npx ts-node deploy_a.ts

Sample1Stack: creating CloudFormation changeset...
{
  noOp: false,
  outputs: { TableName: 'Sample1Stack-Items・・・' },
  ・・・
  stackArtifact: CloudFormationStackArtifact {
    assembly: CloudAssembly {
      ・・・
    },
    id: 'Sample1Stack',
    manifest: {
      type: 'aws:cloudformation:stack',
      environment: 'aws://unknown-account/unknown-region',
      properties: [Object],
      metadata: [Object]
    },
    messages: [],
    _dependencyIDs: [],
    environment: {
      account: 'unknown-account',
      region: 'unknown-region',
      name: 'aws://unknown-account/unknown-region'
    },
    templateFile: 'Sample1Stack.template.json',
    parameters: {},
    tags: {},
    assumeRoleArn: undefined,
    cloudFormationExecutionRoleArn: undefined,
    stackTemplateAssetObjectUrl: undefined,
    requiresBootstrapStackVersion: undefined,
    bootstrapStackVersionSsmParameter: undefined,
    terminationProtection: undefined,
    stackName: 'Sample1Stack',
    assets: [],
    displayName: 'Sample1Stack',
    name: 'Sample1Stack',
    originalName: 'Sample1Stack',
    _deps: [],
    _template: { Resources: [Object], Outputs: [Object] }
  }
}

(b) AWS SDK の CloudFormation API を利用

今回は、AWS SDK for JavaScript v3 を使います。

実質的に AWS CLIcloudformation deploy コマンドと同じ事をするだけですが、AWS SDK にはこれに相当する API が用意されていないようなので、下記ソースコードを参考に自作してみました。

なお、CDK の下記ソースコードでも同じような処理になっていました。

簡単にまとめると、以下のような処理を実装する事になります。

  • (1) DescribeStacks で Stack の有無を確認して、ChangeSetType(CREATEUPDATE)を決定
  • (2) CreateChangeSet で変更セットを作成
  • (3) 処理が完了するまで DescribeChangeSet をポーリング
  • (4) ExecuteChangeSet で変更セットを実行
  • (5) 処理が完了するまで DescribeStacks をポーリング

(1) の DescribeStacks では該当するスタックが存在しない場合にもエラーとなってしまうため、他のエラーと区別するためにエラーメッセージが指定の文字列に合致するかどうかで判定しています。(AWS CLI や CDK と同様)

(3) では変更セットのステータスが CREATE_PENDINGCREATE_IN_PROGRESS 以外になるまでポーリングし、(5) ではスタックのステータスが xxx_IN_PROGRESS 以外になるまでポーリングすれば良さそうです。

また、(3) におけるポーリングの間隔は AWS CLI と CDK 共に 5秒毎となっていましたが、(5) の方は AWS CLI が 30秒で CDK は 5秒となっているようでした。

b_sample/deployer.ts(CloudFormation の API を使ったデプロイ処理の実装)
import { CloudFormationStackArtifact } from '@aws-cdk/cx-api'

import { 
    CloudFormationClient, 
    CreateChangeSetCommand, ExecuteChangeSetCommand, 
    DescribeChangeSetCommand, DescribeStacksCommand,
    Change, Stack
} from '@aws-sdk/client-cloudformation'

const MAX_RETRY = 100
const WAIT_TIME_CREATE = 5000
const WAIT_TIME_EXECUTE = 10000

const sleep = (ms: number) =>
    new Promise(resolve => setTimeout(resolve, ms))

const cf = new CloudFormationClient({})

type ChangeSetType = 'CREATE' | 'UPDATE'

export class CfDeployer {
    static async deploy(stackArtifact: CloudFormationStackArtifact): Promise<Stack | undefined> {
        const stackName = stackArtifact.stackName

        // (1)
        const changeSetType = await CfDeployer.selectChangeSetType(stackName)
        // (2)
        const r1 = await cf.send(new CreateChangeSetCommand({
            ChangeSetName: `ChangeSet-${Date.now()}`,
            StackName: stackName,
            TemplateBody: JSON.stringify(stackArtifact.template),
            ChangeSetType: changeSetType
        }))

        console.log(r1)

        const changeSetId = r1.Id

        if (!changeSetId) {
            throw new Error(`failed create ChangeSet: StackName=${stackName}`)
        }

        // (3)
        const cs = await CfDeployer.waitForCreate(changeSetId, stackName)

        if (cs.length < 1) {
            console.log('*** NO CHANGE')
            return CfDeployer.getStack(stackName)
        }
        // (4)
        const r2 = await cf.send(new ExecuteChangeSetCommand({
            ChangeSetName: changeSetId,
            StackName: stackName
        }))

        console.log(r2)
        // (5)
        return CfDeployer.waitForExecute(stackName)
    }

    static async getStackStatus(stackName: string): Promise<string> {
        const stack = await CfDeployer.getStack(stackName)
        return stack?.StackStatus ?? 'NONE'
    }

    static async getStack(stackName: string): Promise<Stack | undefined> {
        try {
            const r = await cf.send(new DescribeStacksCommand({
                StackName: stackName
            }))

            return r.Stacks?.[0]
        } catch(e) {
            // スタックが存在しなかった場合のエラー以外はそのまま throw
            if (e.Code !== 'ValidationError' || 
                e.message !== `Stack with id ${stackName} does not exist`) {
                throw e
            }
        }

        return undefined
    }
    // (3)
    private static async waitForCreate(changeSetId: string, stackName: string): Promise<Change[]> {

        for (let i = 0; i < MAX_RETRY; i++) {
            await sleep(WAIT_TIME_CREATE)

            const r = await cf.send(new DescribeChangeSetCommand({
                ChangeSetName: changeSetId,
                StackName: stackName
            }))

            if (r.Status === 'CREATE_PENDING' || 
                r.Status === 'CREATE_IN_PROGRESS') {

                continue
            }

            if (r.Status == 'CREATE_COMPLETE') {
                return r.Changes ?? []
            }

            if (r.Status == 'FAILED') {
                console.log(`*** failed: ${JSON.stringify(r)}`)
                return []
            }

            throw new Error(`failed create ChangeSet: ChangeSetId=${changeSetId}, StackName=${stackName}`)
        }

        throw new Error(`create ChangeSet timeout: ChangeSetId=${changeSetId}, StackName=${stackName}`)
    }
    // (5)
    private static async waitForExecute(stackName: string): Promise<Stack> {
        for (let i = 0; i < MAX_RETRY; i++) {
            await sleep(WAIT_TIME_EXECUTE)

            const stack = await CfDeployer.getStack(stackName)

            if (!stack) {
                throw new Error('not found')
            }

            if (!stack.StackStatus?.endsWith('_IN_PROGRESS')) {
                return stack
            }
        }

        throw new Error(`execute ChangeSet timeout: StackName=${stackName}`)
    }

    private static async selectChangeSetType(stackName: string): Promise<ChangeSetType> {

        const status = await CfDeployer.getStackStatus(stackName)

        return (status == 'NONE' || status == 'REVIEW_IN_PROGRESS') ? 
            'CREATE' : 'UPDATE'
    }
}

上記を使ってスタックをデプロイする処理は以下のようになります。

b_sample/deploy_b.ts
import { App } from '@aws-cdk/core'

import { CfDeployer } from './deployer'
import { SampleStack } from './stack'

const app = new App()
new SampleStack(app, 'Sample2Stack')

CfDeployer.deploy(app.synth().stacks[0])
    .then(r => console.log(r))
    .catch(err => console.error(err))
パッケージインストール例
> cd b_sample
> npm install -D ts-node typescript @types/node
・・・
> npm install @aws-cdk/aws-dynamodb @aws-sdk/client-cloudformation
・・・

実行結果は以下の通りです。 こちらも CloudFormation のスタックが登録され、DynamoDB のテーブルが問題なく作成されました。

実行結果
> npx ts-node deploy_b.ts

{
  '$metadata': {
    httpStatusCode: 200,
    requestId: '・・・',
    extendedRequestId: undefined,
    cfId: undefined,
    attempts: 1,
    totalRetryDelay: 0
  },
  Id: 'arn:aws:cloudformation:・・・:changeSet/ChangeSet-・・・',
  StackId: 'arn:aws:cloudformation:・・・:stack/Sample2Stack/・・・'
}
{
  '$metadata': {
    httpStatusCode: 200,
    requestId: '・・・',
    extendedRequestId: undefined,
    cfId: undefined,
    attempts: 1,
    totalRetryDelay: 0
  }
}
{
  StackId: 'arn:aws:cloudformation:・・・',
  StackName: 'Sample2Stack',
  ChangeSetId: 'arn:aws:cloudformation:・・・',
  Description: undefined,
  Parameters: undefined,
  ・・・
  StackStatus: 'CREATE_COMPLETE',
  StackStatusReason: undefined,
  DisableRollback: false,
  NotificationARNs: [],
  TimeoutInMinutes: undefined,
  Capabilities: undefined,
  Outputs: [
    {
      OutputKey: 'TableName',
      OutputValue: 'Sample2Stack-Items・・・',
      Description: undefined,
      ExportName: undefined
    }
  ],
  RoleARN: undefined,
  Tags: [],
  EnableTerminationProtection: false,
  ParentId: undefined,
  RootId: undefined,
  DriftInformation: { StackDriftStatus: 'NOT_CHECKED', LastCheckTimestamp: undefined }
}

Amplify AppSync Simulator を直接使ってマッピングテンプレートを検証

Amplify AppSync Simulator は、AWS Amplify CLI に含まれているモジュールで、AppSync をローカル環境で動作確認するためのものです。(AppSync の GraphQL を処理する Web サーバーが起動するようになっている)

ソースコードを見てみたところ、AppSync 用の処理を適用した GraphQL.jsGraphQLSchema を作り、これを graphql 関数で実行するようになっていました。

そこで試しに、Amplify AppSync Simulator の API を直接使う事で、Amplify CLI を使わず Web サーバーも起動せずに、リクエスマッピングテンプレートの処理結果を出力する Node.js スクリプトを作ってみました。

今回のソースは http://github.com/fits/try_samples/tree/master/blog/20210315/

準備

amplify-appsync-simulator モジュールをインストールしておきます。

インストール例
> npm install amplify-appsync-simulator

Lambda 用マッピングテンプレートの処理

今回は、Lambda 用のマッピングテンプレートを処理してみます。

AppSync では下記を設定する事で GraphQL の処理を定義するようになっています。

  • (1) GraphQL スキーマから GraphQL API を作成
  • (2) データソース(実際の処理を行う部分)を追加
  • (3) リゾルバー ※ を作成
 ※ リゾルバーは、GraphQL API とデータソースとの紐づけ、
    入出力データの加工をマッピングテンプレートとして設定するようになっています

マッピングテンプレートは、VTL(Apache Velocity Template Language)の形式でリクエストやレスポンスの内容を加工して JSON 文字列を作ります。

Amplify AppSync Simulator では AmplifyAppSyncSimulator クラスの init メソッドに対して、上記 (1) ~ (3) と defaultAuthenticationType を設定した AmplifyAppSyncSimulatorConfig を渡す事で、AppSync 用の処理を適用した GraphQLSchema を内部的に作るようになっており、それを schema で取得する事が可能です。

defaultAuthenticationType の設定は、今回のように GraphQLSchema を直接利用するケースでは使われませんが、設定自体は必須のようです。

また、Amplify AppSync Simulator が対応しているデータソースは今のところ下記 3タイプのようで、タイプ毎に用意されている DataLoader が処理を担うようになっています。

各 DataLoader の load メソッドの第一引数にリクエスマッピングテンプレートの処理結果がそのまま渡ってくるようになっています。

NONE データソースの利用

NONE タイプ用 NoneDataLoader の load メソッドは、単純に payload の値を処理結果として返すような実装になっています。

そこで、この load メソッドを上書きする事でリクエスマッピングテンプレートの結果を出力するようにしてみました。

なお、AmplifyAppSyncSimulator が作成した GraphQLSchema を graphql 関数で処理する際のコンテキストに requestAuthorizationModeappsyncErrors の項目が最低限必要でした。

defaultAuthenticationType は必須のようなので API_KEY を設定していますが、実際には使われないので API_KEY の値を設定したりする必要はありませんでした。

sample1.js
const { AmplifyAppSyncSimulator } = require('amplify-appsync-simulator')
const { graphql } = require('graphql')
// GraphQL スキーマ定義
const schema = `
    type Item {
        id: ID!
        value: Int!
    }

    input FindInput {
        id: ID!
    }

    type Query {
        find(input: FindInput!): Item
    }
`
// データソースの設定
const dataSources = [
    { type: 'NONE', name: 'ItemFunc' }
]
// リゾルバーの設定
const resolvers = [
    {
        kind: 'UNIT', 
        // GraphQL の Query で定義した find フィールドと ItemFunc データソースとのマッピング
        typeName: 'Query', fieldName: 'find',
        dataSourceName: 'ItemFunc'
        // リクエストマッピングテンプレートの設定(GraphQL クエリの input をそのまま payload に設定)
        requestMappingTemplate: `
            {
                "version": "2018-05-29",
                "operation": "Invoke",
                "payload": $utils.toJson($ctx.args.input)
            }
        `,
        // レスポンスマッピングテンプレートの設定
        responseMappingTemplate: '$utils.toJson($context.result)'
    }
]
// 設定
const config = {
    schema: { content: schema },
    resolvers,
    dataSources,
    appSync: {
        // 下記の設定は必須
        defaultAuthenticationType: {
            authenticationType: 'API_KEY'
        }
    }
}

const simulator = new AmplifyAppSyncSimulator()

simulator.init(config)

// ItemFunc(NoneDataLoader)の load メソッド上書き
simulator.getDataLoader('ItemFunc').load = (req) => {
    console.log(`*** load: ${JSON.stringify(req)}`)

    return { id: req.payload.id, value: 123 }
}
// AmplifyAppSyncSimulator用 GraphQL の実行コンテキスト
const ctx = {
    requestAuthorizationMode: 'API_KEY',
    appsyncErrors: []
}
// GraphQL クエリ
const q = `
    query FindItem($id: ID!) {
        find(input: {id: $id}) {
            id
            value
        }
    }
`

const run = async () => {
    // GraphQL クエリ実行
    const r = await graphql(simulator.schema, q, null, ctx, {id: 'id1'})
    console.log(JSON.stringify(r))
}

run().catch(err => console.error(err))

実行結果は以下の通り。 リクエスマッピングテンプレートの処理結果が出力されています。

実行結果
> node sample1.js
*** load: {"version":"2018-05-29","operation":"Invoke","payload":{"id":"id1"}}
{"data":{"find":{"id":"id1","value":123}}}

AWS_LAMBDA データソースの利用

AWS_LAMBDA タイプの場合は、データソース設定の invoke へ設定した関数が呼び出されるようになっているので ※、これを使って Lambda の関数ハンドラーを直接実行する事もできます。

 ※ リクエストマッピングテンプレート処理結果の
    payload の値が引数として与えられるようになっています
lambda_func.js(関数ハンドラー)
exports.handler = async (event) => {
    console.log(`*** handler: ${JSON.stringify(event)}`)

    return { id: event.id, value: 234 }
}
sample2.js
const { AmplifyAppSyncSimulator } = require('amplify-appsync-simulator')
const { graphql } = require('graphql')

const schema = `
    ・・・
`

const dataSources = [
    // invoke へ lambda_func.js の handler を設定
    { type: 'AWS_LAMBDA', name: 'ItemFunc', invoke: require('./lambda_func.js').handler }
]

const resolvers = [
    ・・・
]

const config = {
    schema: { content: schema },
    resolvers,
    dataSources,
    appSync: {
        defaultAuthenticationType: {
            authenticationType: 'API_KEY'
        }
    }
}

const simulator = new AmplifyAppSyncSimulator()

simulator.init(config)

const ctx = {
    requestAuthorizationMode: 'API_KEY',
    appsyncErrors: []
}

const q = `
    query FindItem($id: ID!) {
        find(input: {id: $id}) {
            id
            value
        }
    }
`

const run = async () => {
    const r = await graphql(simulator.schema, q, null, ctx, {id: 'id2'})
    console.log(JSON.stringify(r))
}

run().catch(err => console.error(err))
実行結果
> node sample2.js
*** handler: {"id":"id2"}
{"data":{"find":{"id":"id2","value":234}}}

AWS Lambda のランタイム API サーバーを自作して関数ハンドラーをローカル実行

AWS Lambda では、(Lambda 関数の)ランタイムがランタイム API(ランタイムインターフェース)から呼び出しイベントを受け取って、関数ハンドラーを実行し、その結果をランタイム API へ返すような流れで処理が実施されているようです。

そこで、自作のランタイム API サーバーを使って、Go と Node.js でそれぞれ実装した AWS Lambda の関数ハンドラーを(Docker 等を使わずに)ローカル実行してみました。

ソースは http://github.com/fits/try_samples/tree/master/blog/20210211/

ランタイム API サーバーの実装

以下のようなオープンソースのランタイム API サーバーが提供されており、自作する必要は無かったりするのですが。

今回は、fastify を使ってランタイム API サーバーを実装してみました。

AWS Lambda ランタイム API のページから OpenAPI 仕様(2018-06-01)を入手できるので、このスキーマ定義を参考に必要最小限の処理だけを実装します。

まず、ランタイム API として下記を実装する事になります。(ただし、今回の用途では (d) を使いません)

  • (a) 次の呼び出し : GET /runtime/invocation/next
  • (b) 呼び出しレスポンス : POST /runtime/invocation/{AwsRequestId}/response
  • (c) 呼び出しエラー : POST /runtime/invocation/{AwsRequestId}/error
  • (d) 初期化エラー : POST /runtime/init/error

(b) ~ (d) は成功時に 202 ステータスコードを返す事になります。

(b) や (c) における AwsRequestId の値は、(a) のレスポンスヘッダーで通知する事になり、(a) のレスポンスヘッダーで最低限必要な項目は下記でした。(aws-lambda-go の場合)

  • Lambda-Runtime-Aws-Request-Id (AwsRequestId の値)
  • Lambda-Runtime-Deadline-Ms (実行期限)

一方で、ランタイム側は以下のような手順でランタイム API を呼び出すようになっているようです。

  • (1) (a) を GET してレスポンスが返ってくるのを待つ
  • (2) (a) からレスポンス(呼び出しイベント)が返ってくると関数ハンドラーを呼び出す
  • (3) 関数ハンドラーの呼び出し結果を (b) へ POST、エラーが発生した場合は (c) へ POST
  • (4) (1) へ戻る

つまり、(a) で固定の内容をすぐに返すような実装にしてしまうと、(1) ~ (4) が繰り返されてしまい不都合が生じます。

そこで、下記では /invoke を呼び出した際に (a) のレスポンスを返すようにしています。

また、Go と Node.js の関数ハンドラーを同時に扱うために、(a) が呼び出された際の reply を replies へ溜めておいて、forEach で処理するようにしています。

runtime-api-server/server.js (ランタイム API サーバー)
const logger = false
const fastify = require('fastify')({ logger })

const RUNTIME_PATH = '/2018-06-01/runtime'
const TIMEOUT = 5 * 1000

const port = process.env['SERVER_PORT'] ? 
    parseInt(process.env['SERVER_PORT']) : 8080

const replies = []

fastify.post('/invoke', (req, reply) => {
    const data = req.body

    console.log(`*** invoke: ${JSON.stringify(data)}`)

    replies.splice(0).forEach(r => {
        const deadline = Date.now() + TIMEOUT
        // (a) のレスポンスを返す
        r.code(200)
            .header('Lambda-Runtime-Aws-Request-Id', data['request-id'])
            .header('Lambda-Runtime-Deadline-Ms', deadline.toString())
            .header('Lambda-Runtime-Trace-Id', data['trace-id'])
            .header('Lambda-Runtime-Invoked-Function-Arn', data['function-arn'])
            .send(data.body)
    })

    reply.code(200).send({})
})

// Runtime API の実装

// (a) 次の呼び出し
fastify.get(`${RUNTIME_PATH}/invocation/next`, (req, reply) => {
    console.log('*** next')
    console.log(req.body)

    replies.push(reply)
})
// (b) 呼び出しレスポンス
fastify.post(`${RUNTIME_PATH}/invocation/:id/response`, (req, reply) => {
    console.log(`*** response: id = ${req.params['id']}`)
    console.log(req.body)

    reply.code(202).send({ status: '' })
})
// (c) 呼び出しエラー
fastify.post(`${RUNTIME_PATH}/invocation/:id/error`, (req, reply) => {
    console.log(`*** error: id = ${req.params['id']}`)
    console.log(req.body)

    reply.code(202).send({ status: '' })
})
// (d) 初期化エラー
fastify.post(`${RUNTIME_PATH}/init/error`, (req, reply) => {
    console.log('*** init error')
    console.log(req.body)

    reply.code(202).send({ status: '' })
})

fastify.listen(port)
    .then(r => console.log(`started: ${r}`))
    .catch(err => console.error(err))

実行

動作確認のために実行しておきます。

> cd runtime-api-server
> node server.js
started: http://127.0.0.1:8080

Go による関数ハンドラー

Go の場合、aws-lambda-go にランタイム API とやり取りを行うランタイムの処理が実装されています。

そのため、aws-lambda-go を使って実装した関数ハンドラーをローカル環境でビルドして実行するだけです。

接続するランタイム API サーバーのアドレスは AWS_LAMBDA_RUNTIME_API 環境変数で設定します。

ここでは、以下の関数ハンドラーを使用します。

sample_go/app.go (関数ハンドラー)
package main

import (
    "context"
    "fmt"

    "github.com/aws/aws-lambda-go/lambda"
    "github.com/aws/aws-lambda-go/lambdacontext"
)

func handler(ctx context.Context, event interface{}) (string, error) {
    c, _ := lambdacontext.FromContext(ctx)

    fmt.Println("*** call handler")
    fmt.Printf("event = %#v\n", event)
    fmt.Printf("context = %#v\n", c)

    return "sample-go", nil
}

func main() {
    lambda.Start(handler)
}

動作確認1

AWS_LAMBDA_RUNTIME_API 環境変数にランタイム API サーバーのアドレスを設定し、app.go をビルドして実行します。

ビルドと実行
> set AWS_LAMBDA_RUNTIME_API=localhost:8080
> cd sample_go
> go build app.go
> app

ランタイム API サーバー側の出力内容は下記のようになり、app.go から (a) の API が呼び出されている事を確認できます。

server.js(ランタイム API サーバー)の出力内容1
> node server.js
started: http://127.0.0.1:8080
*** next
null

/invoke を呼び出して、関数ハンドラーを実行してみます。

invoke の呼び出し1
$ curl -s -XPOST -H "Content-Type: application/json" http://localhost:8080/invoke -d '{"request-id":"a1", "function-arn":"sample", "body":{"name":"abc","value":1}}'
{}

app.go の出力内容は以下のようになり、関数ハンドラーの実行を確認できました。

app.go の出力内容
> app
*** call handler
event = map[string]interface {}{"name":"abc", "value":1}
context = &lambdacontext.LambdaContext{AwsRequestID:"a1", InvokedFunctionArn:"sample", Identity:lambdacontext.CognitoIdentity{CognitoIdentityID:"", CognitoIdentityPoolID:""}, ClientContext:lambdacontext.ClientContext{Client:lambdacontext.ClientApplication{InstallationID:"", AppTitle:"", AppVersionCode:"", AppPackageName:""}, Env:map[string]string(nil), Custom:map[string]string(nil)}}

ランタイム API サーバー側は以下のように、app.go からのレスポンスを受け取っている事が確認できます。

server.js(ランタイム API サーバー)の出力内容2
> node server.js
started: http://127.0.0.1:8080
・・・
*** invoke: {"request-id":"a1","function-arn":"sample","body":{"name":"abc","value":1}}
*** response: id = a1
sample-go
*** next
null

続いて、エラー時の挙動を確認するために下記を実施します。

invoke の呼び出し2
$ curl -s -XPOST -H "Content-Type: application/json" http://localhost:8080/invoke -d '{"request-id":"b2"}'
{}

app.go には何も出力されず、ランタイム API サーバー側は以下のようになり、エラーの内容を受け取っています。

server.js(ランタイム API サーバー)の出力内容3
> node server.js
started: http://127.0.0.1:8080
・・・
*** invoke: {"request-id":"b2"}
*** error: id = b2
{
  errorMessage: 'unexpected end of JSON input',
  errorType: 'SyntaxError'
}
*** next
null

Node.js による関数ハンドラー

Node.js の場合、通常は関数ハンドラーそのものを実装するだけなので、Go とは違ってランタイムが別途必要となります。

Node.js 用のランタイムとしては下記が提供されており、ネイティブコードなどを使った本格的な作りとなっています。(本番利用を想定していると思われる)

今回はこれを使わず、簡易的なランタイムを node-fetch を使って自作してみました。

node_runtime/runtime.js (Node.js 用の簡易ランタイム)
const fetch = require('node-fetch')

const runtimeUrl = `http://${process.env["AWS_LAMBDA_RUNTIME_API"]}/2018-06-01/runtime/invocation`

const appRoot = process.cwd()
const [moduleName, handlerName] = process.argv[2].split('.')

const app = require(`${appRoot}/${moduleName}`)

const envData = {
    functionVersion: process.env["AWS_LAMBDA_FUNCTION_VERSION"],
    functionName: process.env["AWS_LAMBDA_FUNCTION_NAME"],
    memoryLimitInMB: process.env["AWS_LAMBDA_FUNCTION_MEMORY_SIZE"],
    logGroupName: process.env["AWS_LAMBDA_LOG_GROUP_NAME"],
    logStreamName: process.env["AWS_LAMBDA_LOG_STREAM_NAME"],
}

const run = async () => {
    // (a) の API を呼び出す
    const nextRes = await fetch(`${runtimeUrl}/next`)

    const deadline = parseInt(nextRes.headers.get('lambda-runtime-deadline-ms'))

    const context = Object.assign(
        {
            getRemainingTimeInMillis: () => deadline - Date.now(),
            awsRequestId: nextRes.headers.get('lambda-runtime-aws-request-id'),
            invokedFunctionArn: 
                nextRes.headers.get('lambda-runtime-invoked-function-arn'),
            identity: {},
            clientContext: {},
        }, 
        envData
    )

    try {
        const event = await nextRes.json()
        // 関数ハンドラーの呼び出し
        const res = await app[handlerName](event, context)
        console.log(`* handler result: ${res}`)
        // (b) の API を呼び出す
        await fetch(
            `${runtimeUrl}/${context.awsRequestId}/response`, 
            {method: 'POST', body: res}
        )

    } catch (e) {
        // (c) の API を呼び出す
        await fetch(
            `${runtimeUrl}/${context.awsRequestId}/error`, 
            {method: 'POST', body: JSON.stringify({type: e.type, message: e.message})}
        )
    }
}

const loop = async () => {
    while (true) {
        await run()
    }
}

loop().catch(err => console.error(err))

ここでは、下記の関数ハンドラーを使う事にします。

sample_node/app.js (関数ハンドラー)
exports.handler = async (event, context) => {
    console.log('*** call handler')
    console.log(`event = ${JSON.stringify(event)}`)
    console.log(`context = ${JSON.stringify(context)}`)
    console.log(`remaining time = ${context.getRemainingTimeInMillis()}`)

    return 'sample-node'
}

動作確認2

runtime.js を使って app.js の handler を呼び出すように実行します。

実行
> set AWS_LAMBDA_RUNTIME_API=localhost:8080
> cd sample_node
> node ../node_runtime/runtime.js app.handler

ランタイム API サーバーの next が呼び出されました。

server.js(ランタイム API サーバー)の出力内容4
> node server.js
started: http://127.0.0.1:8080
・・・
*** next
null

/invoke を呼び出します。

invoke の呼び出し3
$ curl -s -XPOST -H "Content-Type: application/json" http://localhost:8080/invoke -d '{"request-id":"c3", "function-arn":"sample2", "body":{"name":"def","value":2}}'
{}

runtime.js + app.js の出力内容は以下のようになり、関数ハンドラーが呼び出されました。

runtime.js + app.js の出力内容
> node ../node_runtime/runtime.js app.handler
*** call handler
event = {"name":"def","value":2}
context = {"awsRequestId":"c3","invokedFunctionArn":"sample2","identity":{},"clientContext":{}}
remaining time = 4945
* handler result: sample-node

ランタイム API サーバーの出力も以下のようになり、問題は無さそうです。

server.js(ランタイム API サーバー)の出力内容5
> node server.js
started: http://127.0.0.1:8080
・・・
*** invoke: {"request-id":"c3","function-arn":"sample2","body":{"name":"def","value":2}}
・・・
*** response: id = c3
sample-node
*** next
null

エラーが発生するように /invoke を呼び出します。

invoke の呼び出し4
$ curl -s -XPOST -H "Content-Type: application/json" http://localhost:8080/invoke -d '{"request-id":"d4"}'
{}

こちらも問題は無さそうです。

server.js(ランタイム API サーバー)の出力内容6
> node server.js
started: http://127.0.0.1:8080
・・・
*** invoke: {"request-id":"d4"}
・・・
*** error: id = d4
{"type":"invalid-json","message":"invalid json response body at http://localhost:8080/2018-06-01/runtime/invocation/next reason: Unexpected end of JSON input"}
*** next
null

Go言語で GraphQL - graph-gophers/graphql-go で Interface を試す

前回graph-gophers/graphql-go を使って、GraphQL の Interface を扱ってみます。

ソースは http://github.com/fits/try_samples/tree/master/blog/20210125/

はじめに

GraphQL には Interface と Union という類似の機能が用意されており、共通のフィールドを設けるかどうかによって使い分けるようになっています。

graph-gophers/graphql-go では、具体的な型(Interface の実装型や Union の要素型)への変換メソッドを用意する事で Interface や Union を扱えるようになっています。

具体型への変換メソッド
func To<GraphQLの型名>() (<Goの型>, bool)

Go 側の実装方法はいくつか考えられるので、試しに 3通りで実装してみました。

(1) 基本形

まずは、graph-gophers/graphql-go の examples で使われている実装方法です。

Go の実装方法
GraphQL Interface interface 埋め込み struct
GraphQL Interface 実装型 struct
GraphQL 実装型への変換 struct へのキャスト
sample1.go
package main

import (
    "context"
    "encoding/json"

    graphql "github.com/graph-gophers/graphql-go"
)

const (
    // GraphQL スキーマ定義
    schemaString = `
      interface Event {
          id: ID!
      }

      type Created implements Event {
          id: ID!
          title: String!
      }

      type Deleted implements Event {
          id: ID!
          reason: String
      }

      type Query {
          events: [Event!]!
      }
  `
)
// GraphQL の Event に対応
type event interface {
    ID() graphql.ID
}
// GraphQL の Created に対応
type created struct {
    id    string
    title string
}

func (c *created) ID() graphql.ID {
    return graphql.ID(c.id)
}

func (c *created) Title() string {
    return c.title
}

// GraphQL の Deleted に対応
type deleted struct {
    id     string
    reason string
}

func (d *deleted) ID() graphql.ID {
    return graphql.ID(d.id)
}

func (d *deleted) Reason() *string {
    if d.reason == "" {
        return nil
    }

    return &d.reason
}
// GraphQL の Event に対応
type eventResolver struct {
    event
}
// GraphQL の Created 型への変換処理
func (r *eventResolver) ToCreated() (*created, bool) {
    c, ok := r.event.(*created)
    return c, ok
}
// GraphQL の Deleted 型への変換処理
func (r *eventResolver) ToDeleted() (*deleted, bool) {
    d, ok := r.event.(*deleted)
    return d, ok
}

type resolver struct{}

func (r *resolver) Events() []*eventResolver {
    return []*eventResolver{
        {&created{id: "i-1", title: "sample1"}},
        {&deleted{id: "i-1"}},
        {&created{id: "i-2", title: "sample2"}},
        {&created{id: "i-3", title: "sample3"}},
        {&deleted{id: "i-3", reason: "test"}},
    }
}

func main() {
    schema := graphql.MustParseSchema(schemaString, new(resolver))

    q := `
      {
          events {
              __typename
              id
              ... on Created {
                  title
              }
              ... on Deleted {
                  reason
              }
          }
      }
  `

    r := schema.Exec(context.Background(), q, "", nil)
    b, err := json.Marshal(r)

    if err != nil {
        panic(err)
    }

    println(string(b))
}

実行結果は以下の通りです。

実行結果
> go build sample1.go

> sample1
{"data":{"events":[{"__typename":"Created","id":"i-1","title":"sample1"},{"__typename":"Deleted","id":"i-1","reason":null},{"__typename":"Created","id":"i-2","title":"sample2"},{"__typename":"Created","id":"i-3","title":"sample3"},{"__typename":"Deleted","id":"i-3","reason":"test"}]}}

Union の場合

Interface の代わりに Union を使った場合は以下のようになります。

sample1_union.go
・・・

const (
    schemaString = `
      union Event = Created | Deleted

      type Created {
          id: ID!
          title: String!
      }

      type Deleted {
          id: ID!
          reason: String
      }

      type Query {
          events: [Event!]!
      }
  `
)

type event interface{}

・・・

func main() {
    schema := graphql.MustParseSchema(schemaString, new(resolver))

    q := `
      {
          events {
              __typename
              ... on Created {
                  id
                  title
              }
              ... on Deleted {
                  id
                  reason
              }
          }
      }
  `

    ・・・
}

(2) OneOf

次は、gRPC の oneof を参考にした実装です。

こちらは Interface よりも Union の実装に向いているかもしれません。

Go の実装方法
GraphQL Interface GraphQL 実装型毎にフィールドを用意した struct
GraphQL Interface 実装型 struct
GraphQL 実装型への変換 nil では無いフィールド値を返す
sample2.go
・・・

// GraphQL の Created に対応
type created struct {
    id    string
    title string
}

func (c *created) ID() graphql.ID {
    return graphql.ID(c.id)
}

func (c *created) Title() string {
    return c.title
}

// GraphQL の Deleted に対応
type deleted struct {
    id     string
    reason string
}

func (d *deleted) ID() graphql.ID {
    return graphql.ID(d.id)
}

func (d *deleted) Reason() *string {
    if d.reason == "" {
        return nil
    }

    return &d.reason
}
// GraphQL の Event に対応
type event struct {
    created *created
    deleted *deleted
}

func (e *event) ID() graphql.ID {
    if e.created == nil {
        return e.deleted.ID()
    }

    return e.created.ID()
}
// GraphQL の Created 型への変換処理
func (e *event) ToCreated() (*created, bool) {
    if e.created == nil {
        return nil, false
    }

    return e.created, true
}
// GraphQL の Deleted 型への変換処理
func (e *event) ToDeleted() (*deleted, bool) {
    if e.deleted == nil {
        return nil, false
    }

    return e.deleted, true
}

type resolver struct{}

func (r *resolver) Events() []*event {
    return []*event{
        {created: &created{id: "i-1", title: "sample1"}},
        {deleted: &deleted{id: "i-1"}},
        {created: &created{id: "i-2", title: "sample2"}},
        {created: &created{id: "i-3", title: "sample3"}},
        {deleted: &deleted{id: "i-3", reason: "test"}},
    }
}

func main() {
    ・・・
}
実行結果
> go build sample2.go

> sample2
{"data":{"events":[{"__typename":"Created","id":"i-1","title":"sample1"},{"__typename":"Deleted","id":"i-1","reason":null},{"__typename":"Created","id":"i-2","title":"sample2"},{"__typename":"Created","id":"i-3","title":"sample3"},{"__typename":"Deleted","id":"i-3","reason":"test"}]}}

(c) オールインワン

最後に、GraphQL Interface の実装型を単一の struct へ集約してみました。

実装内容が分かり難くなりそうで微妙かもしれません。

Go の実装方法
GraphQL Interface GraphQL 実装型の全フィールドを備えた struct
GraphQL Interface 実装型 interface
GraphQL 実装型への変換 フラグやフィールド値の有無で判定して自身を返す
sample3.go
・・・

// GraphQL の Created に対応
type created interface {
    ID() graphql.ID
    Title() string
}
// GraphQL の Deleted に対応
type deleted interface {
    ID() graphql.ID
    Reason() *string
}
// GraphQL の Event に対応
type event struct {
    id     string
    title  string
    reason string
    del    bool   // Created と Deleted の判定用
}

func (e *event) ID() graphql.ID {
    return graphql.ID(e.id)
}

func (e *event) Title() string {
    return e.title
}

func (e *event) Reason() *string {
    if e.reason == "" {
        return nil
    }

    return &e.reason
}
// GraphQL の Created 型への変換処理
func (e *event) ToCreated() (created, bool) {
    if e.del {
        return nil, false
    }

    return e, true
}
// GraphQL の Deleted 型への変換処理
func (e *event) ToDeleted() (deleted, bool) {
    if e.del {
        return e, true
    }

    return nil, false
}

type resolver struct{}

func (r *resolver) Events() []*event {
    return []*event{
        {id: "i-1", title: "sample1"},
        {id: "i-1", del: true},
        {id: "i-2", title: "sample2"},
        {id: "i-3", title: "sample3"},
        {id: "i-3", reason: "test", del: true},
    }
}

func main() {
    ・・・
}
実行結果
> go build sample3.go

> sample3
{"data":{"events":[{"__typename":"Created","id":"i-1","title":"sample1"},{"__typename":"Deleted","id":"i-1","reason":null},{"__typename":"Created","id":"i-2","title":"sample2"},{"__typename":"Created","id":"i-3","title":"sample3"},{"__typename":"Deleted","id":"i-3","reason":"test"}]}}

Go言語で GraphQL - graph-gophers/graphql-go で Query, Mutation, Subscription を試す

Go言語で GraphQL を扱うライブラリはいくつかありますが、今回は下記を試しました。

文字列として定義した GraphQL スキーマを使うようになっており、それなりに使い易いと思います。

今回のソースは http://github.com/fits/try_samples/tree/master/blog/20210112/

Query 処理

まずは、単純な Query 処理を実装してみます。

MustParseSchema に GraphQL スキーマ文字列(以下の schemaString)と処理の実装(以下の resolver)を与えて、Schema を取得します。

ExecContext、クエリ文字列、オペレーション名、クエリ用の変数を与えてクエリを実行します。

クエリの実行結果は Response として返ってくるので json.MarshalJSON 文字列化して出力しています。

デフォルトで、GraphQL スキーマのフィールド(下記の idvalue)の値は、該当するメソッドから取得するようになっています。※

 ※ 大文字・小文字は区別せず、
    "_" を除いた名称が合致するメソッドを探している模様
sample1.go
package main

import (
    "context"
    "encoding/json"

    graphql "github.com/graph-gophers/graphql-go"
)

const (
    // GraphQL スキーマ定義
    schemaString = `
      type Item {
          id: ID!
          value: Int!
      }

      type Query {
          one: Item!
      }
  `
)

type item struct {
    id    string
    value int32
}
// GraphQL の id フィールドの値
func (i *item) ID() graphql.ID {
    return graphql.ID(i.id)
}
// GraphQL の value フィールドの値
func (i *item) Value() int32 {
    return i.value
}

type resolver struct{}
// Query の one を実装
func (r *resolver) One() *item {
    return &item{"item-1", 12}
}

func main() {
    // GraphQL スキーマのパース
    schema := graphql.MustParseSchema(schemaString, new(resolver))

    q := `
      {
          one {
              id
              value
          }
      }
  `

    // GraphQL クエリの実行
    r := schema.Exec(context.Background(), q, "", nil)
    // JSON 文字列化
    b, err := json.Marshal(r)

    if err != nil {
        panic(err)
    }

    println(string(b))
}

実行結果は以下の通りです。

ビルドと実行
> go build sample1.go

> sample1
{"data":{"one":{"id":"item-1","value":12}}}

構造体のフィールドを使用

graph-gophers/graphql-go のソースコード internal/exec/resolvable/resolvable.go を確認してみたところ、GraphQL フィールド値の取得先は以下のように探しているようでした。

  • (1) 該当するメソッドを探す(findMethod の実施)
  • (2) (1) で見つからず、UseFieldResolvers が適用されている場合は該当するフィールドを探す(findField の実施)

そこで、UseFieldResolvers を適用し、item 構造体のフィールドから値を取得するようにしてみました。

sample2.go
・・・

type item struct {
    ID    graphql.ID
    Value int32
}

type resolver struct{}

func (r *resolver) One() *item {
    return &item{graphql.ID("item-2"), 34}
}

func main() {
    // UseFieldResolvers 適用
    schema := graphql.MustParseSchema(gqlSchema, new(resolver), graphql.UseFieldResolvers())

    ・・・
}

実行結果は以下の通りです。

ビルドと実行
> go build sample2.go

> sample1_field
{"data":{"one":{"id":"item-2","value":34}}}

なお、このコードで UseFieldResolvers を適用しなかった場合、実行時に panic: *main.item does not resolve "Item": missing method for field "id" となりました。

Mutation, Subscription 処理

最後に、GraphQL の enum や input を使って Mutation や Subscription を行う処理を実装してみました。

enum は string、input は 構造体で扱う事ができます。

Subscription は Subscribe で実施し、その実装メソッドは受信用 channel(<-chan T)を戻り値にします。

Exec の戻り値である Response の Data フィールドの型は json.RawMessage となっているので、構造体や map へアンマーシャルする事が可能です。

sample3.go
package main

import (
    "context"
    "encoding/json"
    "log"
    "sync"

    "github.com/google/uuid"
    graphql "github.com/graph-gophers/graphql-go"
)

const (
    schemaString = `
      enum Category {
          Standard
          Extra
      }

      input CreateItem {
          category: Category!
          value: Int!
      }

      type Item {
          id: ID!
          category: Category!
          value: Int!
      }

      type Mutation {
          create(input: CreateItem!): Item
      }

      type Query {
          find(id: ID!): Item
      }

      type Subscription {
          created: Item
      }
  `
)
// input
type createItem struct {
    Category string
    Value    int32
}

type item struct {
    id       string
    category string
    value    int32
}

func (i *item) ID() graphql.ID {
    return graphql.ID(i.id)
}

func (i *item) Category() string {
    return i.category
}

func (i *item) Value() int32 {
    return i.value
}
// item 管理
type store struct {
    sync.RWMutex
    items []*item
}

func (s *store) add(i *item) {
    s.Lock()
    defer s.Unlock()

    s.items = append(s.items, i)
}

func (s *store) get(id graphql.ID) *item {
    s.RLock()
    defer s.RUnlock()

    for _, i := range s.items {
        if i.ID() == id {
            return i
        }
    }

    return nil
}
// Subscription 用の channel 管理
type broker struct {
    sync.RWMutex
    subscribes []chan<- *item
}

func (b *broker) subscribe(ch chan<- *item) {
    log.Println("[INFO] subscribe")

    b.Lock()
    defer b.Unlock()

    b.subscribes = append(b.subscribes, ch)
}

func (b *broker) unsubscribe(ch chan<- *item) {
    log.Println("[INFO] unsubscribe")

    var tmp []chan<- *item

    b.Lock()
    defer b.Unlock()

    for _, s := range b.subscribes {
        if s != ch {
            tmp = append(tmp, s)
        }
    }

    b.subscribes = tmp
}

func (b *broker) publish(i *item) {
    b.RLock()
    defer b.RUnlock()

    for _, s := range b.subscribes {
        s <- i
    }
}

type resolver struct {
    store  *store
    broker *broker
}
// Mutation の実装
func (r *resolver) Create(args struct{ Input createItem }) (*item, error) {
    id, err := uuid.NewRandom()

    if err != nil {
        return nil, err
    }

    i := item{id.String(), args.Input.Category, args.Input.Value}

    r.store.add(&i)

    go func() {
        r.broker.publish(&i)
    }()

    return &i, nil
}
// Query の実装
func (r *resolver) Find(args struct{ ID graphql.ID }) *item {
    return r.store.get(args.ID)
}
// Subscription の実装
func (r *resolver) Created(ctx context.Context) <-chan *item {
    ch := make(chan *item)
    r.broker.subscribe(ch)

    go func() {
        // Context キャンセル時
        <-ctx.Done()
        log.Println("[INFO] context done")

        r.broker.unsubscribe(ch)
        close(ch)
    }()

    return ch
}
// Subscription の受信
func onCreated(ch <-chan interface{}) {
    for {
        r, ok := <-ch

        if !ok {
            log.Println("[INFO] closed channel")
            return
        }

        b, _ := json.Marshal(r)

        log.Println("[SUBSCRIPTION]", string(b))
    }
}

func printResponse(r *graphql.Response) error {
    b, err := json.Marshal(r)

    if err != nil {
        return err
    }

    log.Println(string(b))

    return nil
}

func main() {
    resolver := resolver{new(store), new(broker)}
    schema := graphql.MustParseSchema(schemaString, &resolver)

    ctx, cancel := context.WithCancel(context.Background())

    s := `
      subscription {
          created {
              id
              category
              value
          }
      }
  `
    // Subscription の実施
    ch, err := schema.Subscribe(ctx, s, "", nil)

    if err != nil {
        panic(err)
    }

    go onCreated(ch)

    m1 := `
      mutation {
          create(input: { category: Standard, value: 10 }) {
              id
          }
      }
  `

    mr1 := schema.Exec(context.Background(), m1, "", nil)
    _ = printResponse(mr1)

    var cr1 struct {
        Create struct {
            ID string
        }
    }
    // Mutation 結果の data の内容を構造体へアンマーシャル
    err = json.Unmarshal(mr1.Data, &cr1)

    if err != nil {
        panic(err)
    }

    q := `
      query findItem($id: ID!) {
          find(id: $id) {
              id
              category
              value
          }
      }
  `

    qr1 := schema.Exec(context.Background(), q, "",
        map[string]interface{}{"id": cr1.Create.ID})

    _ = printResponse(qr1)

    m2 := `
      mutation Create($p: CreateItem!) {
          create(input: $p) {
              id
          }
      }
  `
    // GraphQL のクエリ用変数
    vs := map[string]interface{}{
        "p": map[string]interface{}{
            "category": "Extra",
            "value":    123,
        },
    }

    mr2 := schema.Exec(context.Background(), m2, "", vs)
    _ = printResponse(mr2)

    var cr2 map[string]map[string]interface{}

    // Mutation 結果の data の内容を map へアンマーシャル
    err = json.Unmarshal(mr2.Data, &cr2)

    if err != nil {
        panic(err)
    }

    qr2 := schema.Exec(context.Background(), q, "", cr2["create"])
    _ = printResponse(qr2)

    // Subscription のキャンセル
    cancel()

    mr3 := schema.Exec(context.Background(), m2, "", map[string]interface{}{
        "p": map[string]interface{}{
            "category": "Extra",
            "value":    987,
        },
    })
    _ = printResponse(mr3)

    mr4 := schema.Exec(context.Background(), m2, "", map[string]interface{}{
        "p": map[string]interface{}{
            "category": "Standard",
            "value":    567,
        },
    })
    _ = printResponse(mr4)

    qr5 := schema.Exec(context.Background(), q, "",
        map[string]interface{}{"id": "invalid-id"})

    _ = printResponse(qr5)
}
ビルドと実行
> go build sample3.go

> sample3
2021/01/11 21:03:40 [INFO] subscribe
2021/01/11 21:03:40 {"data":{"create":{"id":"507dae03-1f93-4b1a-a75e-3fc54b297ad4"}}}
2021/01/11 21:03:40 [SUBSCRIPTION] {"data":{"created":{"id":"507dae03-1f93-4b1a-a75e-3fc54b297ad4","category":"Standard","value":10}}}
2021/01/11 21:03:40 {"data":{"find":{"id":"507dae03-1f93-4b1a-a75e-3fc54b297ad4","category":"Standard","value":10}}}
2021/01/11 21:03:40 {"data":{"create":{"id":"b47bf46c-5c10-4a8f-892e-9ebfa83d576a"}}}
2021/01/11 21:03:40 [SUBSCRIPTION] {"data":{"created":{"id":"b47bf46c-5c10-4a8f-892e-9ebfa83d576a","category":"Extra","value":123}}}
2021/01/11 21:03:40 {"data":{"find":{"id":"b47bf46c-5c10-4a8f-892e-9ebfa83d576a","category":"Extra","value":123}}}
2021/01/11 21:03:40 [INFO] closed channel
2021/01/11 21:03:40 [INFO] context done
2021/01/11 21:03:40 [INFO] unsubscribe
2021/01/11 21:03:40 {"data":{"create":{"id":"aef942a6-3aa7-4b31-89c4-cd44f748bed6"}}}
2021/01/11 21:03:40 {"data":{"create":{"id":"fe3db2a4-5578-4d33-b54a-26a8b6e281f3"}}}
2021/01/11 21:03:40 {"data":{"find":null}}

Go言語でインターフェースのメソッドを列挙する

Go 言語のリフレクションで型情報を扱う Type は以下で取得できます。

func TypeOf(i interface{}) Type

引数 i には値を指定する事になりますが、インターフェースの型情報を取得したい場合はどうするのか気になって試してみたところ、以下で取得できました。

インターフェースの型情報を取得
TypeOf( (*対象インターフェース)(nil) ).Elem()

対象とするインターフェースのポインタ型の nil を使って TypeOf を実施した後、Elem ※ を使ってインターフェースの型情報を取得します。

 ※ Elem() で Array, Chan, Map, Ptr, Slice から
    要素の型情報を取得できる

これを利用して、インターフェースのメソッドを列挙するとこのようになります。

sample.go
package main

import (
    "fmt"
    "reflect"
)

type Counter interface {
    Up(v int)
    Down(v int)
    Current() int
    reset()
    end() int
}
// メソッド情報の出力
func printMethods(t reflect.Type) {
    for i := 0; i < t.NumMethod(); i++ {
        fmt.Println("method:", t.Method(i))
    }
}

func main() {
    // Counter インターフェースの型情報を取得
    t := reflect.TypeOf((*Counter)(nil)).Elem()

    printMethods(t)
}
ビルドと実行
> go build sample.go

> sample
method: {Current  func() int <invalid Value> 0}
method: {Down  func(int) <invalid Value> 1}
method: {Up  func(int) <invalid Value> 2}
method: {end main func() int <invalid Value> 3}
method: {reset main func() <invalid Value> 4}

ちなみに、上記コードで t := reflect.TypeOf((Counter)(nil)) のようにすると、t.NumMethod()panic: runtime error: invalid memory address or nil pointer dereference となりました。