Node.js で gRPC を試す
「gRPC Server Reflection のクライアント処理」では Node.js で gRPC クライアントを実装しましたが、今回はサーバー側も実装してみます。
サンプルコードは http://github.com/fits/try_samples/tree/master/blog/20201115/
はじめに
gRPC on Node.js では、以下の 2通りの手法が用意されており、それぞれ使用するパッケージが異なります。
- (a) 動的コード生成 (
@grpc/proto-loader
パッケージを使用) - (b) 静的コード生成 (
grpc-tools
パッケージを使用)
更に、gRPC の実装ライブラリとして以下の 2種類が用意されており、どちらかを使う事になります。
- C-based Client and Server (
grpc
パッケージを使用) - Pure JavaScript Client (
@grpc/grpc-js
パッケージを使用)
@grpc/grpc-js
は現時点で Pure JavaScript Client と表現されていますが、クライアントだけではなくサーバーの実装にも使えます。
ここでは、Pure JavaScript 実装の @grpc/grpc-js
を使って、(a) と (b) の両方を試してみます。
サービス定義(proto ファイル)
gRPC のサービス定義として下記ファイルを使用します。
Unary RPC(1リクエスト / 1レスポンス)と Server streaming RPC(1リクエスト / 多レスポンス)、message の oneof
、google.protobuf.Empty
の扱い等を確認するような内容にしてみました。
proto/item.proto
syntax = "proto3"; import "google/protobuf/empty.proto"; package item; message AddItemRequest { string item_id = 1; uint64 price = 2; } message ItemRequest { string item_id = 1; } message Item { string item_id = 1; uint64 price = 2; } message ItemSubscribeRequest { } message AddedItem { string item_id = 1; uint64 price = 2; } message RemovedItem { string item_id = 1; } message ItemEvent { oneof event { AddedItem added = 1; RemovedItem removed = 2; } } service ItemManage { rpc AddItem(AddItemRequest) returns (google.protobuf.Empty); rpc RemoveItem(ItemRequest) returns (google.protobuf.Empty); rpc GetItem(ItemRequest) returns (Item); rpc Subscribe(ItemSubscribeRequest) returns (stream ItemEvent); }
(a) 動的コード生成(@grpc/proto-loader)
まずは、@grpc/proto-loader
を使った動的コード生成を試します。
インストール
@grpc/proto-loader
と @grpc/grpc-js
をインストールしておきます。
> npm install --save @grpc/proto-loader @grpc/grpc-js
サーバー実装
proto-loader の loadSync
関数 ※ で proto ファイルをロードした結果を grpc-js の loadPackageDefinition
で処理する事で型定義などを動的に生成します。
※ 非同期版の load 関数も用意されています
addService
で gRPC のサービス定義と処理をマッピングし、bindAsync
後に start
を呼び出す事でサーバー処理を開始します。
proto ファイルで定義したメッセージ型と同じフィールドを持つ JavaScript オブジェクトを gRPC のリクエストやレスポンスで使う事ができるようです。
Unary RPC の場合は、第二引数の callback
へ失敗時の値と成功時の値をそれぞれ渡す事で処理結果を返します。
任意のエラーを返したい場合は、code
で gRPC のステータスコードを、details
でエラー内容を指定します。
google.protobuf.Empty の箇所は null
もしくは undefined
で代用できるようです。
Server streaming RPC の場合は、第一引数(下記コードでは call
)の write
を呼び出す事で処理結果を返す事ができます。
クライアントが途中で切断したりすると cancelled
が発生するようになっており、cancelled 発生後に write
を呼び出してもエラー等は発生しないようになっていました。
server.js
const protoLoader = require('@grpc/proto-loader') const grpc = require('@grpc/grpc-js') const protoFile = './proto/item.proto' // proto ファイルのロード const pd = protoLoader.loadSync(protoFile) // gPRC 用の動的な型定義生成 const proto = grpc.loadPackageDefinition(pd) let store = [] let subscribeList = [] const findItem = itemId => store.find(i => i.itemId == itemId) const addItem = (itemId, price) => { if (findItem(itemId)) { return undefined } const item = { itemId, price } store.push(item) return item } const removeItem = itemId => { const item = findItem(itemId) if (item) { store = store.filter(i => i.itemId != item.itemId) } return item } // ItemEvent の配信 const publishEvent = event => { console.log(`*** publish event: ${JSON.stringify(event)}`) subscribeList.forEach(s => s.write(event)) } const server = new grpc.Server() // サービス定義と処理のマッピング server.addService(proto.item.ItemManage.service, { AddItem(call, callback) { const itemId = call.request.itemId const price = call.request.price const item = addItem(itemId, price) if (item) { callback() publishEvent({ added: { itemId, price }}) } else { const err = { code: grpc.status.ALREADY_EXISTS, details: 'exists item' } callback(err) } }, RemoveItem(call, callback) { const itemId = call.request.itemId if (removeItem(itemId)) { callback() publishEvent({ removed: { itemId }}) } else { const err = { code: grpc.status.NOT_FOUND, details: 'item not found' } callback(err) } }, GetItem(call, callback) { const itemId = call.request.itemId const item = findItem(itemId) if (item) { callback(null, item) } else { const err = { code: grpc.status.NOT_FOUND, details: 'item not found' } callback(err) } }, Subscribe(call) { console.log('*** subscribed') subscribeList.push(call) // クライアント切断時の処理 call.on('cancelled', () => { console.log('*** unsubscribed') subscribeList = subscribeList.filter(s => s != call) }) } }) server.bindAsync( '127.0.0.1:50051', grpc.ServerCredentials.createInsecure(), (err, port) => { if (err) { console.error(err) return } console.log(`start server: ${port}`) // 開始 server.start() } )
クライアント実装1
まずは、Unary RPC の API のみ(Subscribe 以外)を呼び出すクライアントを実装してみます。
loadPackageDefinition を実施するところまではサーバーと同じです。
Unary RPC はコールバック関数を伴ったメソッドとして用意されますが、このメソッドに Node.js の util.promisify
を直接適用すると不都合が生じたため、Promise 化は自前の関数(下記の promisify
)で実施するようにしました。
client.js
const protoLoader = require('@grpc/proto-loader') const grpc = require('@grpc/grpc-js') const protoFile = './proto/item.proto' const pd = protoLoader.loadSync(protoFile) const proto = grpc.loadPackageDefinition(pd) const id = process.argv[2] const client = new proto.item.ItemManage( '127.0.0.1:50051', grpc.credentials.createInsecure() ) // Unary RPC の Promise 化 const promisify = (obj, methodName) => args => new Promise((resolve, reject) => { obj[methodName](args, (err, res) => { if (err) { reject(err) } else { resolve(res) } }) }) const addItem = promisify(client, 'AddItem') const removeItem = promisify(client, 'RemoveItem') const getItem = promisify(client, 'GetItem') const printItem = item => { console.log(`id = ${item.itemId}, price = ${item.price}`) } const run = async () => { await addItem({ itemId: `${id}_item-1`, price: 100 }) const item1 = await getItem({ itemId: `${id}_item-1` }) printItem(item1) await addItem({ itemId: `${id}_item-2`, price: 20 }) const item2 = await getItem({ itemId: `${id}_item-2` }) printItem(item2) await addItem({ itemId: `${id}_item-1`, price: 50 }) .catch(err => console.error(`*** ERROR = ${err.message}`)) await removeItem({ itemId: `${id}_item-1` }) await getItem({ itemId: `${id}_item-1` }) .catch(err => console.error(`*** ERROR = ${err.message}`)) await removeItem({ itemId: `${id}_item-2` }) } run().catch(err => console.error(err))
クライアント実装2
次は Server streaming RPC のクライアント実装です。
client_subscribe.js
const protoLoader = require('@grpc/proto-loader') const grpc = require('@grpc/grpc-js') const protoFile = './proto/item.proto' const pd = protoLoader.loadSync(protoFile) const proto = grpc.loadPackageDefinition(pd) const client = new proto.item.ItemManage( '127.0.0.1:50051', grpc.credentials.createInsecure() ) const stream = client.Subscribe({}) // メッセージ受信時 stream.on('data', event => { console.log(`*** received event = ${JSON.stringify(event)}`) }) // サーバー終了時 stream.on('end', () => console.log('*** stream end')) stream.on('error', err => console.log(`*** stream error: ${err}`))
動作確認
server.js の実行後、client_subscribe.js を 2つ起動した後に client.js を実行してみます。
Server 実行
> node server.js start server: 50051
Client2-1 実行
> node client_subscribe.js
Client2-2 実行
> node client_subscribe.js
Client1 実行
> node client.js a1 id = a1_item-1, price = 100 id = a1_item-2, price = 20 *** ERROR = 6 ALREADY_EXISTS: exists item *** ERROR = 5 NOT_FOUND: item not found
この時点で出力内容は以下のようになりました。
Server 出力内容
> node server.js start server: 50051 *** subscribed *** subscribed *** publish event: {"added":{"itemId":"a1_item-1","price":100}} *** publish event: {"added":{"itemId":"a1_item-2","price":20}} *** publish event: {"removed":{"itemId":"a1_item-1"}} *** publish event: {"removed":{"itemId":"a1_item-2"}}
Client2-1、Client2-2 出力内容
> node client_subscribe.js *** received event = {"added":{"itemId":"a1_item-1","price":{"low":100,"high":0,"unsigned":true}}} *** received event = {"added":{"itemId":"a1_item-2","price":{"low":20,"high":0,"unsigned":true}}} *** received event = {"removed":{"itemId":"a1_item-1"}} *** received event = {"removed":{"itemId":"a1_item-2"}}
Client2-2 を Ctrl + c で終了後に、Server も Ctrl + c で終了すると以下のようになり、Client2-1 のプロセスは終了しました。
Server 出力内容
> node server.js start server: 50051 ・・・ *** publish event: {"removed":{"itemId":"a1_item-2"}} *** unsubscribed ^C
Client2-1 出力内容
> node client_subscribe.js ・・・ *** received event = {"removed":{"itemId":"a1_item-2"}} *** stream error: Error: 13 INTERNAL: Received RST_STREAM with code 2 (Internal server error) *** stream end
特に問題はなく、正常に動作しているようです。
(b) 静的コード生成(grpc-tools)
grpc-tools
を使った静的コード生成を試します。
インストールとコード生成
grpc-tools
、@grpc/grpc-js
、google-protobuf
をインストールしておきます。
> npm install --save-dev grpc-tools ・・・ > npm install --save @grpc/grpc-js google-protobuf ・・・
grpc-tools をインストールする事で使えるようになる grpc_tools_node_protoc
コマンドで proto ファイルからコードを生成します。
grpc_tools_node_protoc コマンドは内部的に protoc
コマンドを grpc_node_plugin
プラグインを伴って呼び出すようになっています。
--grpc_out
でサービス定義用のファイル xxx_grpc_pb.js が生成され、--js_out
でメッセージ定義用のファイルが生成されます。
サービス定義 xxx_grpc_pb.js は --js_out で import_style=commonjs
オプションを指定する事を前提としたコードになっています。※
※ import_style=commonjs オプションを指定した際に生成される xxx_pb.js を参照するようになっている
また、--grpc_out はデフォルトで grpc
パッケージ用のコードを生成するため、ここでは grpc_js
オプションを指定して @grpc/grpc-js
用のコードを生成するようにしています。
静的コード生成例(grpc_tools_node_protoc コマンド)
> mkdir generated > grpc_tools_node_protoc --grpc_out=grpc_js:generated --js_out=import_style=commonjs:generated proto/*.proto ・・・
サーバー実装
(a) の場合と処理内容に大きな違いはありませんが、リクエストやレスポンスでは生成された型を使います。
アクセサメソッド(getter、setter)で値の取得や設定ができるようになっており、new 時に配列として全フィールドの値を指定する事もできるようです。
JavaScript オブジェクトへ変換したい場合は toObject
メソッドを使用します。
addService でマッピングする際のメソッド名の一文字目が小文字になっています。
proto ファイルで定義したサービス名の後に Service
を付けた型(ここでは ItemManageService
)がサーバー処理用、Client
を付けた型がクライアント処理用の型定義となるようです。
server.js
const grpc = require('@grpc/grpc-js') const { Item, AddedItem, RemovedItem, ItemEvent } = require('./generated/proto/item_pb') const { ItemManageService } = require('./generated/proto/item_grpc_pb') const { Empty } = require('google-protobuf/google/protobuf/empty_pb') let store = [] let subscribeList = [] const findItem = itemId => store.find(i => i.getItemId() == itemId) const addItem = (itemId, price) => { if (findItem(itemId)) { return undefined } const item = new Item([itemId, price]) store.push(item) return item } const removeItem = itemId => { const item = findItem(itemId) if (item) { store = store.filter(i => i.getItemId() != item.getItemId()) } return item } const createAddedEvent = (itemId, price) => { const event = new ItemEvent() event.setAdded(new AddedItem([itemId, price])) return event } const createRemovedEvent = itemId => { const event = new ItemEvent() event.setRemoved(new RemovedItem([itemId])) return event } const publishEvent = event => { // toObject で JavaScript オブジェクトへ変換 console.log(`*** publish event: ${JSON.stringify(event.toObject())}`) subscribeList.forEach(s => s.write(event)) } const server = new grpc.Server() server.addService(ItemManageService, { addItem(call, callback) { const itemId = call.request.getItemId() const price = call.request.getPrice() const item = addItem(itemId, price) if (item) { callback(null, new Empty()) publishEvent(createAddedEvent(itemId, price)) } else { const err = { code: grpc.status.ALREADY_EXISTS, details: 'exists item' } callback(err) } }, removeItem(call, callback) { const itemId = call.request.getItemId() if (removeItem(itemId)) { callback(null, new Empty()) publishEvent(createRemovedEvent(itemId)) } else { const err = { code: grpc.status.NOT_FOUND, details: 'item not found' } callback(err) } }, getItem(call, callback) { const itemId = call.request.getItemId() const item = findItem(itemId) if (item) { callback(null, item) } else { const err = { code: grpc.status.NOT_FOUND, details: 'item not found' } callback(err) } }, subscribe(call) { console.log('*** subscribed') subscribeList.push(call) call.on('cancelled', () => { console.log('*** unsubscribed') subscribeList = subscribeList.filter(s => s != call) }) } }) server.bindAsync( ・・・ )
クライアント実装1
生成された型を使う点とメソッド名の先頭が小文字になっている点を除くと、基本的に (a) と同じです。
client.js
const grpc = require('@grpc/grpc-js') const { AddItemRequest, ItemRequest } = require('./generated/proto/item_pb') const { ItemManageClient } = require('./generated/proto/item_grpc_pb') const id = process.argv[2] const client = new ItemManageClient( '127.0.0.1:50051', grpc.credentials.createInsecure() ) const promisify = (obj, methodName) => args => new Promise((resolve, reject) => { ・・・ }) const addItem = promisify(client, 'addItem') const removeItem = promisify(client, 'removeItem') const getItem = promisify(client, 'getItem') const printItem = item => { console.log(`id = ${item.getItemId()}, price = ${item.getPrice()}`) } const run = async () => { await addItem(new AddItemRequest([`${id}_item-1`, 100])) const item1 = await getItem(new ItemRequest([`${id}_item-1`])) printItem(item1) await addItem(new AddItemRequest([`${id}_item-2`, 20])) const item2 = await getItem(new ItemRequest([`${id}_item-2`])) printItem(item2) await addItem(new AddItemRequest([`${id}_item-1`, 50])) .catch(err => console.error(`*** ERROR = ${err.message}`)) await removeItem(new ItemRequest([`${id}_item-1`])) await getItem(new ItemRequest([`${id}_item-1`])) .catch(err => console.error(`*** ERROR = ${err.message}`)) await removeItem(new ItemRequest([`${id}_item-2`])) } run().catch(err => console.error(err))
クライアント実装2
こちらも同様です。
client_subscribe.js
const grpc = require('@grpc/grpc-js') const { ItemSubscribeRequest } = require('./generated/proto/item_pb') const { ItemManageClient } = require('./generated/proto/item_grpc_pb') const client = new ItemManageClient( ・・・ ) const stream = client.subscribe(new ItemSubscribeRequest()) stream.on('data', event => { // toObject で JavaScript オブジェクトへ変換 console.log(`*** received event = ${JSON.stringify(event.toObject())}`) }) ・・・
動作確認
(a) と同じ操作を行った結果は以下のようになりました。
Server 出力内容
> node server.js start server: 50051 *** subscribed *** subscribed *** publish event: {"added":{"itemId":"a1_item-1","price":100}} *** publish event: {"added":{"itemId":"a1_item-2","price":20}} *** publish event: {"removed":{"itemId":"a1_item-1"}} *** publish event: {"removed":{"itemId":"a1_item-2"}} *** unsubscribed ^C
Client1 出力内容
> node client.js a1 id = a1_item-1, price = 100 id = a1_item-2, price = 20 *** ERROR = 6 ALREADY_EXISTS: exists item *** ERROR = 5 NOT_FOUND: item not found
Client2-1 出力内容
> node client_subscribe.js *** received event = {"added":{"itemId":"a1_item-1","price":100}} *** received event = {"added":{"itemId":"a1_item-2","price":20}} *** received event = {"removed":{"itemId":"a1_item-1"}} *** received event = {"removed":{"itemId":"a1_item-2"}} *** stream error: Error: 13 INTERNAL: Received RST_STREAM with code 2 (Internal server error) *** stream end
Client2-2 出力内容
> node client_subscribe.js *** received event = {"added":{"itemId":"a1_item-1","price":100}} *** received event = {"added":{"itemId":"a1_item-2","price":20}} *** received event = {"removed":{"itemId":"a1_item-1"}} *** received event = {"removed":{"itemId":"a1_item-2"}} ^C
(a) と (b) は同一の gRPC サービス(proto ファイル)を実装したものなので当然ですが、(a) と (b) を相互接続しても特に問題はありませんでした。