Node.js で gRPC を試す

gRPC Server Reflection のクライアント処理」では Node.js で gRPC クライアントを実装しましたが、今回はサーバー側も実装してみます。

サンプルコードは http://github.com/fits/try_samples/tree/master/blog/20201115/

はじめに

gRPC on Node.js では、以下の 2通りの手法が用意されており、それぞれ使用するパッケージが異なります。

  • (a) 動的コード生成 (@grpc/proto-loader パッケージを使用)
  • (b) 静的コード生成 (grpc-tools パッケージを使用)

更に、gRPC の実装ライブラリとして以下の 2種類が用意されており、どちらかを使う事になります。

  • C-based Client and Server (grpc パッケージを使用)
  • Pure JavaScript Client (@grpc/grpc-js パッケージを使用)

@grpc/grpc-js は現時点で Pure JavaScript Client と表現されていますが、クライアントだけではなくサーバーの実装にも使えます。

ここでは、Pure JavaScript 実装の @grpc/grpc-js を使って、(a) と (b) の両方を試してみます。

サービス定義(proto ファイル)

gRPC のサービス定義として下記ファイルを使用します。

Unary RPC(1リクエスト / 1レスポンス)と Server streaming RPC(1リクエスト / 多レスポンス)、message の oneofgoogle.protobuf.Empty の扱い等を確認するような内容にしてみました。

proto/item.proto
syntax = "proto3";

import "google/protobuf/empty.proto";

package item;

message AddItemRequest {
    string item_id = 1;
    uint64 price = 2;
}

message ItemRequest {
    string item_id = 1;
}

message Item {
    string item_id = 1;
    uint64 price = 2;
}

message ItemSubscribeRequest {
}

message AddedItem {
    string item_id = 1;
    uint64 price = 2;
}

message RemovedItem {
    string item_id = 1;
}

message ItemEvent {
    oneof event {
        AddedItem added = 1;
        RemovedItem removed = 2;
    }
}

service ItemManage {
    rpc AddItem(AddItemRequest) returns (google.protobuf.Empty);
    rpc RemoveItem(ItemRequest) returns (google.protobuf.Empty);
    rpc GetItem(ItemRequest) returns (Item);

    rpc Subscribe(ItemSubscribeRequest) returns (stream ItemEvent);
}

(a) 動的コード生成(@grpc/proto-loader)

まずは、@grpc/proto-loader を使った動的コード生成を試します。

インストール

@grpc/proto-loader@grpc/grpc-js をインストールしておきます。

> npm install --save @grpc/proto-loader @grpc/grpc-js

サーバー実装

proto-loader の loadSync 関数 ※ で proto ファイルをロードした結果を grpc-js の loadPackageDefinition で処理する事で型定義などを動的に生成します。

 ※ 非同期版の load 関数も用意されています

addService で gRPC のサービス定義と処理をマッピングし、bindAsync 後に start を呼び出す事でサーバー処理を開始します。

proto ファイルで定義したメッセージ型と同じフィールドを持つ JavaScript オブジェクトを gRPC のリクエストやレスポンスで使う事ができるようです。

Unary RPC の場合は、第二引数の callback へ失敗時の値と成功時の値をそれぞれ渡す事で処理結果を返します。

任意のエラーを返したい場合は、code で gRPC のステータスコードを、details でエラー内容を指定します。

google.protobuf.Empty の箇所は null もしくは undefined で代用できるようです。

Server streaming RPC の場合は、第一引数(下記コードでは call)の write を呼び出す事で処理結果を返す事ができます。

クライアントが途中で切断したりすると cancelled が発生するようになっており、cancelled 発生後に write を呼び出してもエラー等は発生しないようになっていました。

server.js
const protoLoader = require('@grpc/proto-loader')
const grpc = require('@grpc/grpc-js')

const protoFile = './proto/item.proto'
// proto ファイルのロード
const pd = protoLoader.loadSync(protoFile)
// gPRC 用の動的な型定義生成
const proto = grpc.loadPackageDefinition(pd)

let store = []
let subscribeList = []

const findItem = itemId => store.find(i => i.itemId == itemId)

const addItem = (itemId, price) => {
    if (findItem(itemId)) {
        return undefined
    }

    const item = { itemId, price }

    store.push(item)

    return item
}

const removeItem = itemId => {
    const item = findItem(itemId)

    if (item) {
        store = store.filter(i => i.itemId != item.itemId)
    }

    return item
}
// ItemEvent の配信
const publishEvent = event => {
    console.log(`*** publish event: ${JSON.stringify(event)}`)
    subscribeList.forEach(s => s.write(event))
}

const server = new grpc.Server()
// サービス定義と処理のマッピング
server.addService(proto.item.ItemManage.service, {
    AddItem(call, callback) {
        const itemId = call.request.itemId
        const price = call.request.price

        const item = addItem(itemId, price)

        if (item) {
            callback()
            publishEvent({ added: { itemId, price }})
        }
        else {
            const err = { code: grpc.status.ALREADY_EXISTS, details: 'exists item' }
            callback(err)
        }
    },
    RemoveItem(call, callback) {
        const itemId = call.request.itemId

        if (removeItem(itemId)) {
            callback()
            publishEvent({ removed: { itemId }})
        }
        else {
            const err = { code: grpc.status.NOT_FOUND, details: 'item not found' }
            callback(err)
        }
    },
    GetItem(call, callback) {
        const itemId = call.request.itemId
        const item = findItem(itemId)

        if (item) {
            callback(null, item)
        }
        else {
            const err = { code: grpc.status.NOT_FOUND, details: 'item not found' }
            callback(err)
        }
    },
    Subscribe(call) {
        console.log('*** subscribed')
        subscribeList.push(call)
        // クライアント切断時の処理
        call.on('cancelled', () => {
            console.log('*** unsubscribed')
            subscribeList = subscribeList.filter(s => s != call)
        })
    }
})

server.bindAsync(
    '127.0.0.1:50051',
    grpc.ServerCredentials.createInsecure(),
    (err, port) => {
        if (err) {
            console.error(err)
            return
        }

        console.log(`start server: ${port}`)
        // 開始
        server.start()
    }
)

クライアント実装1

まずは、Unary RPC の API のみ(Subscribe 以外)を呼び出すクライアントを実装してみます。

loadPackageDefinition を実施するところまではサーバーと同じです。

Unary RPC はコールバック関数を伴ったメソッドとして用意されますが、このメソッドに Node.js の util.promisify を直接適用すると不都合が生じたため、Promise 化は自前の関数(下記の promisify)で実施するようにしました。

client.js
const protoLoader = require('@grpc/proto-loader')
const grpc = require('@grpc/grpc-js')

const protoFile = './proto/item.proto'

const pd = protoLoader.loadSync(protoFile)
const proto = grpc.loadPackageDefinition(pd)

const id = process.argv[2]

const client = new proto.item.ItemManage(
    '127.0.0.1:50051',
    grpc.credentials.createInsecure()
)
// Unary RPC の Promise 化
const promisify = (obj, methodName) => args => 
    new Promise((resolve, reject) => {
        obj[methodName](args, (err, res) => {
            if (err) {
                reject(err)
            }
            else {
                resolve(res)
            }
        })
    })

const addItem = promisify(client, 'AddItem')
const removeItem = promisify(client, 'RemoveItem')
const getItem = promisify(client, 'GetItem')

const printItem = item => {
    console.log(`id = ${item.itemId}, price = ${item.price}`)
}

const run = async () => {
    await addItem({ itemId: `${id}_item-1`, price: 100 })

    const item1 = await getItem({ itemId: `${id}_item-1` })
    printItem(item1)

    await addItem({ itemId: `${id}_item-2`, price: 20 })

    const item2 = await getItem({ itemId: `${id}_item-2` })
    printItem(item2)

    await addItem({ itemId: `${id}_item-1`, price: 50 })
        .catch(err => console.error(`*** ERROR = ${err.message}`))

    await removeItem({ itemId: `${id}_item-1` })

    await getItem({ itemId: `${id}_item-1` })
        .catch(err => console.error(`*** ERROR = ${err.message}`))

    await removeItem({ itemId: `${id}_item-2` })
}

run().catch(err => console.error(err))

クライアント実装2

次は Server streaming RPC のクライアント実装です。

client_subscribe.js
const protoLoader = require('@grpc/proto-loader')
const grpc = require('@grpc/grpc-js')

const protoFile = './proto/item.proto'

const pd = protoLoader.loadSync(protoFile)
const proto = grpc.loadPackageDefinition(pd)

const client = new proto.item.ItemManage(
    '127.0.0.1:50051',
    grpc.credentials.createInsecure()
)

const stream = client.Subscribe({})
// メッセージ受信時
stream.on('data', event => {
    console.log(`*** received event = ${JSON.stringify(event)}`)
})

// サーバー終了時
stream.on('end', () => console.log('*** stream end'))
stream.on('error', err => console.log(`*** stream error: ${err}`))

動作確認

server.js の実行後、client_subscribe.js を 2つ起動した後に client.js を実行してみます。

Server 実行
> node server.js
start server: 50051
Client2-1 実行
> node client_subscribe.js
Client2-2 実行
> node client_subscribe.js
Client1 実行
> node client.js a1
id = a1_item-1, price = 100
id = a1_item-2, price = 20
*** ERROR = 6 ALREADY_EXISTS: exists item
*** ERROR = 5 NOT_FOUND: item not found

この時点で出力内容は以下のようになりました。

Server 出力内容
> node server.js
start server: 50051
*** subscribed
*** subscribed
*** publish event: {"added":{"itemId":"a1_item-1","price":100}}
*** publish event: {"added":{"itemId":"a1_item-2","price":20}}
*** publish event: {"removed":{"itemId":"a1_item-1"}}
*** publish event: {"removed":{"itemId":"a1_item-2"}}
Client2-1、Client2-2 出力内容
> node client_subscribe.js
*** received event = {"added":{"itemId":"a1_item-1","price":{"low":100,"high":0,"unsigned":true}}}
*** received event = {"added":{"itemId":"a1_item-2","price":{"low":20,"high":0,"unsigned":true}}}
*** received event = {"removed":{"itemId":"a1_item-1"}}
*** received event = {"removed":{"itemId":"a1_item-2"}}

Client2-2 を Ctrl + c で終了後に、Server も Ctrl + c で終了すると以下のようになり、Client2-1 のプロセスは終了しました。

Server 出力内容
> node server.js
start server: 50051
・・・
*** publish event: {"removed":{"itemId":"a1_item-2"}}
*** unsubscribed
^C
Client2-1 出力内容
> node client_subscribe.js
・・・
*** received event = {"removed":{"itemId":"a1_item-2"}}
*** stream error: Error: 13 INTERNAL: Received RST_STREAM with code 2 (Internal server error)
*** stream end

特に問題はなく、正常に動作しているようです。

(b) 静的コード生成(grpc-tools)

grpc-tools を使った静的コード生成を試します。

インストールとコード生成

grpc-tools@grpc/grpc-jsgoogle-protobuf をインストールしておきます。

> npm install --save-dev grpc-tools
・・・
> npm install --save @grpc/grpc-js google-protobuf
・・・

grpc-tools をインストールする事で使えるようになる grpc_tools_node_protoc コマンドで proto ファイルからコードを生成します。

grpc_tools_node_protoc コマンドは内部的に protoc コマンドを grpc_node_plugin プラグインを伴って呼び出すようになっています。

--grpc_out でサービス定義用のファイル xxx_grpc_pb.js が生成され、--js_out でメッセージ定義用のファイルが生成されます。

サービス定義 xxx_grpc_pb.js は --js_out で import_style=commonjs オプションを指定する事を前提としたコードになっています。※

 ※ import_style=commonjs オプションを指定した際に生成される
    xxx_pb.js を参照するようになっている

また、--grpc_out はデフォルトで grpc パッケージ用のコードを生成するため、ここでは grpc_js オプションを指定して @grpc/grpc-js 用のコードを生成するようにしています。

静的コード生成例(grpc_tools_node_protoc コマンド)
> mkdir generated

> grpc_tools_node_protoc --grpc_out=grpc_js:generated --js_out=import_style=commonjs:generated proto/*.proto
・・・

サーバー実装

(a) の場合と処理内容に大きな違いはありませんが、リクエストやレスポンスでは生成された型を使います。

アクセサメソッド(getter、setter)で値の取得や設定ができるようになっており、new 時に配列として全フィールドの値を指定する事もできるようです。 JavaScript オブジェクトへ変換したい場合は toObject メソッドを使用します。

addService でマッピングする際のメソッド名の一文字目が小文字になっています。

proto ファイルで定義したサービス名の後に Service を付けた型(ここでは ItemManageService)がサーバー処理用、Client を付けた型がクライアント処理用の型定義となるようです。

server.js
const grpc = require('@grpc/grpc-js')

const { Item, AddedItem, RemovedItem, ItemEvent } = require('./generated/proto/item_pb')
const { ItemManageService } = require('./generated/proto/item_grpc_pb')
const { Empty } = require('google-protobuf/google/protobuf/empty_pb')

let store = []
let subscribeList = []

const findItem = itemId => store.find(i => i.getItemId() == itemId)

const addItem = (itemId, price) => {
    if (findItem(itemId)) {
        return undefined
    }

    const item = new Item([itemId, price])

    store.push(item)

    return item
}

const removeItem = itemId => {
    const item = findItem(itemId)

    if (item) {
        store = store.filter(i => i.getItemId() != item.getItemId())
    }

    return item
}

const createAddedEvent = (itemId, price) => {
    const event = new ItemEvent()
    event.setAdded(new AddedItem([itemId, price]))

    return event
}

const createRemovedEvent = itemId => {
    const event = new ItemEvent()
    event.setRemoved(new RemovedItem([itemId]))

    return event
}

const publishEvent = event => {
    // toObject で JavaScript オブジェクトへ変換
    console.log(`*** publish event: ${JSON.stringify(event.toObject())}`)
    subscribeList.forEach(s => s.write(event))
}

const server = new grpc.Server()

server.addService(ItemManageService, {
    addItem(call, callback) {
        const itemId = call.request.getItemId()
        const price = call.request.getPrice()

        const item = addItem(itemId, price)

        if (item) {
            callback(null, new Empty())
            publishEvent(createAddedEvent(itemId, price))
        }
        else {
            const err = { code: grpc.status.ALREADY_EXISTS, details: 'exists item' }
            callback(err)
        }
    },
    removeItem(call, callback) {
        const itemId = call.request.getItemId()

        if (removeItem(itemId)) {
            callback(null, new Empty())
            publishEvent(createRemovedEvent(itemId))
        }
        else {
            const err = { code: grpc.status.NOT_FOUND, details: 'item not found' }
            callback(err)
        }
    },
    getItem(call, callback) {
        const itemId = call.request.getItemId()
        const item = findItem(itemId)

        if (item) {
            callback(null, item)
        }
        else {
            const err = { code: grpc.status.NOT_FOUND, details: 'item not found' }
            callback(err)
        }
    },
    subscribe(call) {
        console.log('*** subscribed')
        subscribeList.push(call)

        call.on('cancelled', () => {
            console.log('*** unsubscribed')
            subscribeList = subscribeList.filter(s => s != call)
        })
    }
})

server.bindAsync(
    ・・・
)

クライアント実装1

生成された型を使う点とメソッド名の先頭が小文字になっている点を除くと、基本的に (a) と同じです。

client.js
const grpc = require('@grpc/grpc-js')

const { AddItemRequest, ItemRequest } = require('./generated/proto/item_pb')
const { ItemManageClient } = require('./generated/proto/item_grpc_pb')

const id = process.argv[2]

const client = new ItemManageClient(
    '127.0.0.1:50051',
    grpc.credentials.createInsecure()
)

const promisify = (obj, methodName) => args => 
    new Promise((resolve, reject) => {
        ・・・
    })

const addItem = promisify(client, 'addItem')
const removeItem = promisify(client, 'removeItem')
const getItem = promisify(client, 'getItem')

const printItem = item => {
    console.log(`id = ${item.getItemId()}, price = ${item.getPrice()}`)
}

const run = async () => {
    await addItem(new AddItemRequest([`${id}_item-1`, 100]))

    const item1 = await getItem(new ItemRequest([`${id}_item-1`]))
    printItem(item1)

    await addItem(new AddItemRequest([`${id}_item-2`, 20]))

    const item2 = await getItem(new ItemRequest([`${id}_item-2`]))
    printItem(item2)

    await addItem(new AddItemRequest([`${id}_item-1`, 50]))
        .catch(err => console.error(`*** ERROR = ${err.message}`))

    await removeItem(new ItemRequest([`${id}_item-1`]))

    await getItem(new ItemRequest([`${id}_item-1`]))
        .catch(err => console.error(`*** ERROR = ${err.message}`))

    await removeItem(new ItemRequest([`${id}_item-2`]))
}

run().catch(err => console.error(err))

クライアント実装2

こちらも同様です。

client_subscribe.js
const grpc = require('@grpc/grpc-js')

const { ItemSubscribeRequest } = require('./generated/proto/item_pb')
const { ItemManageClient } = require('./generated/proto/item_grpc_pb')

const client = new ItemManageClient(
    ・・・
)

const stream = client.subscribe(new ItemSubscribeRequest())

stream.on('data', event => {
    // toObject で JavaScript オブジェクトへ変換
    console.log(`*** received event = ${JSON.stringify(event.toObject())}`)
})

・・・

動作確認

(a) と同じ操作を行った結果は以下のようになりました。

Server 出力内容
> node server.js
start server: 50051
*** subscribed
*** subscribed
*** publish event: {"added":{"itemId":"a1_item-1","price":100}}
*** publish event: {"added":{"itemId":"a1_item-2","price":20}}
*** publish event: {"removed":{"itemId":"a1_item-1"}}
*** publish event: {"removed":{"itemId":"a1_item-2"}}
*** unsubscribed
^C
Client1 出力内容
> node client.js a1
id = a1_item-1, price = 100
id = a1_item-2, price = 20
*** ERROR = 6 ALREADY_EXISTS: exists item
*** ERROR = 5 NOT_FOUND: item not found
Client2-1 出力内容
> node client_subscribe.js
*** received event = {"added":{"itemId":"a1_item-1","price":100}}
*** received event = {"added":{"itemId":"a1_item-2","price":20}}
*** received event = {"removed":{"itemId":"a1_item-1"}}
*** received event = {"removed":{"itemId":"a1_item-2"}}
*** stream error: Error: 13 INTERNAL: Received RST_STREAM with code 2 (Internal server error)
*** stream end
Client2-2 出力内容
> node client_subscribe.js
*** received event = {"added":{"itemId":"a1_item-1","price":100}}
*** received event = {"added":{"itemId":"a1_item-2","price":20}}
*** received event = {"removed":{"itemId":"a1_item-1"}}
*** received event = {"removed":{"itemId":"a1_item-2"}}
^C

(a) と (b) は同一の gRPC サービス(proto ファイル)を実装したものなので当然ですが、(a) と (b) を相互接続しても特に問題はありませんでした。