AWS Lambda のランタイム API サーバーを自作して関数ハンドラーをローカル実行

AWS Lambda では、(Lambda 関数の)ランタイムがランタイム API(ランタイムインターフェース)から呼び出しイベントを受け取って、関数ハンドラーを実行し、その結果をランタイム API へ返すような流れで処理が実施されているようです。

そこで、自作のランタイム API サーバーを使って、Go と Node.js でそれぞれ実装した AWS Lambda の関数ハンドラーを(Docker 等を使わずに)ローカル実行してみました。

ソースは http://github.com/fits/try_samples/tree/master/blog/20210211/

ランタイム API サーバーの実装

以下のようなオープンソースのランタイム API サーバーが提供されており、自作する必要は無かったりするのですが。

今回は、fastify を使ってランタイム API サーバーを実装してみました。

AWS Lambda ランタイム API のページから OpenAPI 仕様(2018-06-01)を入手できるので、このスキーマ定義を参考に必要最小限の処理だけを実装します。

まず、ランタイム API として下記を実装する事になります。(ただし、今回の用途では (d) を使いません)

  • (a) 次の呼び出し : GET /runtime/invocation/next
  • (b) 呼び出しレスポンス : POST /runtime/invocation/{AwsRequestId}/response
  • (c) 呼び出しエラー : POST /runtime/invocation/{AwsRequestId}/error
  • (d) 初期化エラー : POST /runtime/init/error

(b) ~ (d) は成功時に 202 ステータスコードを返す事になります。

(b) や (c) における AwsRequestId の値は、(a) のレスポンスヘッダーで通知する事になり、(a) のレスポンスヘッダーで最低限必要な項目は下記でした。(aws-lambda-go の場合)

  • Lambda-Runtime-Aws-Request-Id (AwsRequestId の値)
  • Lambda-Runtime-Deadline-Ms (実行期限)

一方で、ランタイム側は以下のような手順でランタイム API を呼び出すようになっているようです。

  • (1) (a) を GET してレスポンスが返ってくるのを待つ
  • (2) (a) からレスポンス(呼び出しイベント)が返ってくると関数ハンドラーを呼び出す
  • (3) 関数ハンドラーの呼び出し結果を (b) へ POST、エラーが発生した場合は (c) へ POST
  • (4) (1) へ戻る

つまり、(a) で固定の内容をすぐに返すような実装にしてしまうと、(1) ~ (4) が繰り返されてしまい不都合が生じます。

そこで、下記では /invoke を呼び出した際に (a) のレスポンスを返すようにしています。

また、Go と Node.js の関数ハンドラーを同時に扱うために、(a) が呼び出された際の reply を replies へ溜めておいて、forEach で処理するようにしています。

runtime-api-server/server.js (ランタイム API サーバー)
const logger = false
const fastify = require('fastify')({ logger })

const RUNTIME_PATH = '/2018-06-01/runtime'
const TIMEOUT = 5 * 1000

const port = process.env['SERVER_PORT'] ? 
    parseInt(process.env['SERVER_PORT']) : 8080

const replies = []

fastify.post('/invoke', (req, reply) => {
    const data = req.body

    console.log(`*** invoke: ${JSON.stringify(data)}`)

    replies.splice(0).forEach(r => {
        const deadline = Date.now() + TIMEOUT
        // (a) のレスポンスを返す
        r.code(200)
            .header('Lambda-Runtime-Aws-Request-Id', data['request-id'])
            .header('Lambda-Runtime-Deadline-Ms', deadline.toString())
            .header('Lambda-Runtime-Trace-Id', data['trace-id'])
            .header('Lambda-Runtime-Invoked-Function-Arn', data['function-arn'])
            .send(data.body)
    })

    reply.code(200).send({})
})

// Runtime API の実装

// (a) 次の呼び出し
fastify.get(`${RUNTIME_PATH}/invocation/next`, (req, reply) => {
    console.log('*** next')
    console.log(req.body)

    replies.push(reply)
})
// (b) 呼び出しレスポンス
fastify.post(`${RUNTIME_PATH}/invocation/:id/response`, (req, reply) => {
    console.log(`*** response: id = ${req.params['id']}`)
    console.log(req.body)

    reply.code(202).send({ status: '' })
})
// (c) 呼び出しエラー
fastify.post(`${RUNTIME_PATH}/invocation/:id/error`, (req, reply) => {
    console.log(`*** error: id = ${req.params['id']}`)
    console.log(req.body)

    reply.code(202).send({ status: '' })
})
// (d) 初期化エラー
fastify.post(`${RUNTIME_PATH}/init/error`, (req, reply) => {
    console.log('*** init error')
    console.log(req.body)

    reply.code(202).send({ status: '' })
})

fastify.listen(port)
    .then(r => console.log(`started: ${r}`))
    .catch(err => console.error(err))

実行

動作確認のために実行しておきます。

> cd runtime-api-server
> node server.js
started: http://127.0.0.1:8080

Go による関数ハンドラー

Go の場合、aws-lambda-go にランタイム API とやり取りを行うランタイムの処理が実装されています。

そのため、aws-lambda-go を使って実装した関数ハンドラーをローカル環境でビルドして実行するだけです。

接続するランタイム API サーバーのアドレスは AWS_LAMBDA_RUNTIME_API 環境変数で設定します。

ここでは、以下の関数ハンドラーを使用します。

sample_go/app.go (関数ハンドラー)
package main

import (
    "context"
    "fmt"

    "github.com/aws/aws-lambda-go/lambda"
    "github.com/aws/aws-lambda-go/lambdacontext"
)

func handler(ctx context.Context, event interface{}) (string, error) {
    c, _ := lambdacontext.FromContext(ctx)

    fmt.Println("*** call handler")
    fmt.Printf("event = %#v\n", event)
    fmt.Printf("context = %#v\n", c)

    return "sample-go", nil
}

func main() {
    lambda.Start(handler)
}

動作確認1

AWS_LAMBDA_RUNTIME_API 環境変数にランタイム API サーバーのアドレスを設定し、app.go をビルドして実行します。

ビルドと実行
> set AWS_LAMBDA_RUNTIME_API=localhost:8080
> cd sample_go
> go build app.go
> app

ランタイム API サーバー側の出力内容は下記のようになり、app.go から (a) の API が呼び出されている事を確認できます。

server.js(ランタイム API サーバー)の出力内容1
> node server.js
started: http://127.0.0.1:8080
*** next
null

/invoke を呼び出して、関数ハンドラーを実行してみます。

invoke の呼び出し1
$ curl -s -XPOST -H "Content-Type: application/json" http://localhost:8080/invoke -d '{"request-id":"a1", "function-arn":"sample", "body":{"name":"abc","value":1}}'
{}

app.go の出力内容は以下のようになり、関数ハンドラーの実行を確認できました。

app.go の出力内容
> app
*** call handler
event = map[string]interface {}{"name":"abc", "value":1}
context = &lambdacontext.LambdaContext{AwsRequestID:"a1", InvokedFunctionArn:"sample", Identity:lambdacontext.CognitoIdentity{CognitoIdentityID:"", CognitoIdentityPoolID:""}, ClientContext:lambdacontext.ClientContext{Client:lambdacontext.ClientApplication{InstallationID:"", AppTitle:"", AppVersionCode:"", AppPackageName:""}, Env:map[string]string(nil), Custom:map[string]string(nil)}}

ランタイム API サーバー側は以下のように、app.go からのレスポンスを受け取っている事が確認できます。

server.js(ランタイム API サーバー)の出力内容2
> node server.js
started: http://127.0.0.1:8080
・・・
*** invoke: {"request-id":"a1","function-arn":"sample","body":{"name":"abc","value":1}}
*** response: id = a1
sample-go
*** next
null

続いて、エラー時の挙動を確認するために下記を実施します。

invoke の呼び出し2
$ curl -s -XPOST -H "Content-Type: application/json" http://localhost:8080/invoke -d '{"request-id":"b2"}'
{}

app.go には何も出力されず、ランタイム API サーバー側は以下のようになり、エラーの内容を受け取っています。

server.js(ランタイム API サーバー)の出力内容3
> node server.js
started: http://127.0.0.1:8080
・・・
*** invoke: {"request-id":"b2"}
*** error: id = b2
{
  errorMessage: 'unexpected end of JSON input',
  errorType: 'SyntaxError'
}
*** next
null

Node.js による関数ハンドラー

Node.js の場合、通常は関数ハンドラーそのものを実装するだけなので、Go とは違ってランタイムが別途必要となります。

Node.js 用のランタイムとしては下記が提供されており、ネイティブコードなどを使った本格的な作りとなっています。(本番利用を想定していると思われる)

今回はこれを使わず、簡易的なランタイムを node-fetch を使って自作してみました。

node_runtime/runtime.js (Node.js 用の簡易ランタイム)
const fetch = require('node-fetch')

const runtimeUrl = `http://${process.env["AWS_LAMBDA_RUNTIME_API"]}/2018-06-01/runtime/invocation`

const appRoot = process.cwd()
const [moduleName, handlerName] = process.argv[2].split('.')

const app = require(`${appRoot}/${moduleName}`)

const envData = {
    functionVersion: process.env["AWS_LAMBDA_FUNCTION_VERSION"],
    functionName: process.env["AWS_LAMBDA_FUNCTION_NAME"],
    memoryLimitInMB: process.env["AWS_LAMBDA_FUNCTION_MEMORY_SIZE"],
    logGroupName: process.env["AWS_LAMBDA_LOG_GROUP_NAME"],
    logStreamName: process.env["AWS_LAMBDA_LOG_STREAM_NAME"],
}

const run = async () => {
    // (a) の API を呼び出す
    const nextRes = await fetch(`${runtimeUrl}/next`)

    const deadline = parseInt(nextRes.headers.get('lambda-runtime-deadline-ms'))

    const context = Object.assign(
        {
            getRemainingTimeInMillis: () => deadline - Date.now(),
            awsRequestId: nextRes.headers.get('lambda-runtime-aws-request-id'),
            invokedFunctionArn: 
                nextRes.headers.get('lambda-runtime-invoked-function-arn'),
            identity: {},
            clientContext: {},
        }, 
        envData
    )

    try {
        const event = await nextRes.json()
        // 関数ハンドラーの呼び出し
        const res = await app[handlerName](event, context)
        console.log(`* handler result: ${res}`)
        // (b) の API を呼び出す
        await fetch(
            `${runtimeUrl}/${context.awsRequestId}/response`, 
            {method: 'POST', body: res}
        )

    } catch (e) {
        // (c) の API を呼び出す
        await fetch(
            `${runtimeUrl}/${context.awsRequestId}/error`, 
            {method: 'POST', body: JSON.stringify({type: e.type, message: e.message})}
        )
    }
}

const loop = async () => {
    while (true) {
        await run()
    }
}

loop().catch(err => console.error(err))

ここでは、下記の関数ハンドラーを使う事にします。

sample_node/app.js (関数ハンドラー)
exports.handler = async (event, context) => {
    console.log('*** call handler')
    console.log(`event = ${JSON.stringify(event)}`)
    console.log(`context = ${JSON.stringify(context)}`)
    console.log(`remaining time = ${context.getRemainingTimeInMillis()}`)

    return 'sample-node'
}

動作確認2

runtime.js を使って app.js の handler を呼び出すように実行します。

実行
> set AWS_LAMBDA_RUNTIME_API=localhost:8080
> cd sample_node
> node ../node_runtime/runtime.js app.handler

ランタイム API サーバーの next が呼び出されました。

server.js(ランタイム API サーバー)の出力内容4
> node server.js
started: http://127.0.0.1:8080
・・・
*** next
null

/invoke を呼び出します。

invoke の呼び出し3
$ curl -s -XPOST -H "Content-Type: application/json" http://localhost:8080/invoke -d '{"request-id":"c3", "function-arn":"sample2", "body":{"name":"def","value":2}}'
{}

runtime.js + app.js の出力内容は以下のようになり、関数ハンドラーが呼び出されました。

runtime.js + app.js の出力内容
> node ../node_runtime/runtime.js app.handler
*** call handler
event = {"name":"def","value":2}
context = {"awsRequestId":"c3","invokedFunctionArn":"sample2","identity":{},"clientContext":{}}
remaining time = 4945
* handler result: sample-node

ランタイム API サーバーの出力も以下のようになり、問題は無さそうです。

server.js(ランタイム API サーバー)の出力内容5
> node server.js
started: http://127.0.0.1:8080
・・・
*** invoke: {"request-id":"c3","function-arn":"sample2","body":{"name":"def","value":2}}
・・・
*** response: id = c3
sample-node
*** next
null

エラーが発生するように /invoke を呼び出します。

invoke の呼び出し4
$ curl -s -XPOST -H "Content-Type: application/json" http://localhost:8080/invoke -d '{"request-id":"d4"}'
{}

こちらも問題は無さそうです。

server.js(ランタイム API サーバー)の出力内容6
> node server.js
started: http://127.0.0.1:8080
・・・
*** invoke: {"request-id":"d4"}
・・・
*** error: id = d4
{"type":"invalid-json","message":"invalid json response body at http://localhost:8080/2018-06-01/runtime/invocation/next reason: Unexpected end of JSON input"}
*** next
null