gRPC Server Reflection のクライアント処理
gRPC Server Reflection を呼び出す処理を Node.js で実装してみました。
ソースは http://github.com/fits/try_samples/tree/master/blog/20191008/
事前準備(サーバー実装)
まずは、gRPC Server Reflection を有効にしたサーバー処理を用意します。 Node.js での実装は無理そうだったので(未実装らしい) Go 言語で実装します。
protoc コマンドをインストールした後、Go 言語用の protoc プラグインである protoc-gen-go
をインストールします。
protoc-gen-go インストール
> go get -u github.com/golang/protobuf/protoc-gen-go
google.golang.org/grpc
等のライブラリをビルド時に自動ダウンロードするように、Go のモジュールファイル go.mod
を用意しておきます。
go.mod の作成
> go mod init sample
proto ファイルを作成してインターフェースを定義します。
今回は、go_package
を使って Go 用のパッケージを別途定義してみました。
proto/item/item.proto
syntax = "proto3"; import "google/protobuf/empty.proto"; option go_package = "sample/proto/item"; package item; message ItemRequest { string item_id = 1; } message AddItemRequest { string item_id = 1; uint64 price = 2; } message Item { string item_id = 1; uint64 price = 2; } service ItemService { rpc GetItem(ItemRequest) returns (Item); rpc AddItem(AddItemRequest) returns (google.protobuf.Empty); }
protoc コマンドで Go 用のコードを生成します。
protoc によるコード生成
> protoc -I=proto --go_out=plugins=grpc,paths=source_relative:./proto proto/item/item.proto
サーバー処理を実装します。
Server Reflection を有効化するには google.golang.org/grpc/reflection
を import して reflection.Register
を適用するだけなので簡単です。
server/main.go
package main import ( "context" "fmt" "net" "log" "google.golang.org/grpc" "google.golang.org/grpc/reflection" empty "github.com/golang/protobuf/ptypes/empty" pb "sample/proto/item" ) type Server struct { Items map[string]pb.Item } func (s *Server) GetItem(ctx context.Context, req *pb.ItemRequest) (*pb.Item, error) { log.Println("call GetItem: ", req) item, ok := s.Items[req.GetItemId()] if !ok { return nil, fmt.Errorf("item not found: %s", req.GetItemId()) } return &item, nil } func (s *Server) AddItem(ctx context.Context, req *pb.AddItemRequest) (*empty.Empty, error) { log.Println("call AddItem: ", req) s.Items[req.GetItemId()] = pb.Item{ItemId: req.GetItemId(), Price: req.GetPrice()} return &empty.Empty{}, nil } func main() { address := ":50051" listen, err := net.Listen("tcp", address) if err != nil { log.Fatalf("error: %v", err) } s := grpc.NewServer() pb.RegisterItemServiceServer(s, &Server{Items: make(map[string]pb.Item)}) // gRPC Server Reflection 有効化 reflection.Register(s) log.Println("server start:", address) if err := s.Serve(listen); err != nil { log.Fatalf("failed serve: %v", err) } }
上記を実行しておきます。(go run でビルドも実施されます)
初回実行時は google.golang.org/grpc
等の依存ライブラリが自動的にダウンロードされます。
実行
> go run server/main.go ・・・ 2019/10/06 22:00:00 server start: :50051
サーバー側のファイル構成は以下のようになっています。
ファイル構成
- go.mod
- go.sum
- proto
- item
- item.proto
- item.pb.go
- item
- server
- main.go
gRPC Server Reflection クライアント処理
それでは、本題の gRPC Server Reflection を呼び出す処理を実装していきます。
準備
どのように実装すべきか分からなかったので、とりあえず今回は https://github.com/grpc/grpc/blob/master/src/proto/grpc/reflection/v1alpha/reflection.proto を使う事にしました。
まずは、この proto ファイルをローカルにダウンロードしておきます。
proto ファイルのダウンロード
> curl -O https://raw.githubusercontent.com/grpc/grpc/master/src/proto/grpc/reflection/v1alpha/reflection.proto
Node.js で gRPC 処理を実装するため grpc
と proto-loader
をインストールしておきます。
grpc と proto-loader のインストール
> npm install grpc @grpc/proto-loader
(a) サービスのリスト取得
はじめに、サービス名をリストアップする処理を実装してみます。
reflection.proto
を見てみると、以下のように ServerReflectionInfo
メソッドの引数である ServerReflectionRequest
の message_request
にどのフィールドを指定するかで取得内容が変わるようになっています。
サービスのリストを取得するには list_services
フィールドを使えば良さそうです。
reflection.proto の該当箇所
・・・ package grpc.reflection.v1alpha; service ServerReflection { rpc ServerReflectionInfo(stream ServerReflectionRequest) returns (stream ServerReflectionResponse); } message ServerReflectionRequest { string host = 1; oneof message_request { string file_by_filename = 3; string file_containing_symbol = 4; ExtensionRequest file_containing_extension = 5; string all_extension_numbers_of_type = 6; string list_services = 7; } } ・・・
ServerReflectionInfo
メソッドは引数と戻り値の両方に stream
が指定されている双方向ストリーミング RPC となっているため、以下のように write
でリクエストメッセージを送信して on('data', ・・・)
でレスポンスメッセージを取得する事になります。
また、end
でストリーミング処理を終了します。
list_services.js
const grpc = require('grpc') const protoLoader = require('@grpc/proto-loader') const pd = protoLoader.loadSync('reflection.proto', { keepCase: true, defaults: true }) const proto = grpc.loadPackageDefinition(pd).grpc.reflection.v1alpha const client = new proto.ServerReflection( '127.0.0.1:50051', grpc.credentials.createInsecure() ) const call = client.ServerReflectionInfo() call.on('error', err => { // ストリーミングの終了 call.end() console.error(err) }) // レスポンスメッセージの受信 call.on('data', res => { // ストリーミングの終了 call.end() // サービス名の出力 res.list_services_response.service.forEach(s => { console.log(s.name) }) }) // リクエストメッセージの送信 call.write({host: 'localhost', list_services: ''})
実行結果は以下の通り、サービス名を出力できました。
実行結果
> node list_services.js grpc.reflection.v1alpha.ServerReflection item.ItemService
(b) サービスのインターフェース定義取得
次に、サービスの内容(インターフェース定義)を取得してみます。
こちらは、リクエストメッセージの file_containing_symbol
フィールドにサービス名を指定する事で取得できます。
ただ、レスポンスメッセージの該当フィールドの内容が reflection.proto のコメントにあるように FileDescriptorProto
をシリアライズした結果 ※ (の配列)となっている点に注意が必要です。
※ bytes 型は Node.js では Buffer として扱われる
reflection.proto の該当箇所
・・・ message FileDescriptorResponse { // Serialized FileDescriptorProto messages. We avoid taking a dependency on // descriptor.proto, which uses proto2 only features, by making them opaque // bytes instead. repeated bytes file_descriptor_proto = 1; } ・・・
FileDescriptorProto へのデシリアライズに関しては試行錯誤しましたが、最も手軽そうな protobufjs/ext/descriptor
の FileDescriptorProto.decode
を使う事にしました。
load_symbol.js
const grpc = require('grpc') const protoLoader = require('@grpc/proto-loader') const descriptor = require('protobufjs/ext/descriptor') const serviceName = process.argv[2] ・・・ const call = client.ServerReflectionInfo() call.on('error', err => { call.end() console.error(err) }) call.on('data', res => { call.end() res.file_descriptor_response.file_descriptor_proto // FileDescriptorProto デシリアライズ .map(buf => descriptor.FileDescriptorProto.decode(buf)) .forEach(d => { // JSON 化して出力 console.log(JSON.stringify(d, null, 2)) }) }) call.write({host: 'localhost', file_containing_symbol: serviceName})
item.ItemService のサービス内容を取得してみた結果です。 go_package の内容も含め、問題なく取得できているようです。
実行結果
> node load_symbol.js item.ItemService { "name": "item/item.proto", "package": "item", "dependency": [ "google/protobuf/empty.proto" ], "messageType": [ { "name": "ItemRequest", "field": [ { "name": "item_id", "number": 1, "label": "LABEL_OPTIONAL", "type": "TYPE_STRING", "jsonName": "itemId" } ] }, { "name": "AddItemRequest", "field": [ { "name": "item_id", "number": 1, "label": "LABEL_OPTIONAL", "type": "TYPE_STRING", "jsonName": "itemId" }, { "name": "price", "number": 2, "label": "LABEL_OPTIONAL", "type": "TYPE_UINT64", "jsonName": "price" } ] }, { "name": "Item", "field": [ { "name": "item_id", "number": 1, "label": "LABEL_OPTIONAL", "type": "TYPE_STRING", "jsonName": "itemId" }, { "name": "price", "number": 2, "label": "LABEL_OPTIONAL", "type": "TYPE_UINT64", "jsonName": "price" } ] } ], "service": [ { "name": "ItemService", "method": [ { "name": "GetItem", "inputType": ".item.ItemRequest", "outputType": ".item.Item" }, { "name": "AddItem", "inputType": ".item.AddItemRequest", "outputType": ".google.protobuf.Empty" } ] } ], "options": { "goPackage": "sample/proto/item" }, "syntax": "proto3" }
(c) 全サービスのインターフェース定義取得
最後に、全サービスのインターフェース定義を取得する処理を実装してみます。
双方向ストリーミング RPC を活用して、サービスのリスト取得とサービス内容の取得を同じストリーミング上で行ってみました。
load_symbols.js
const grpc = require('grpc') const protoLoader = require('@grpc/proto-loader') const descriptor = require('protobufjs/ext/descriptor') ・・・ const call = client.ServerReflectionInfo() call.on('error', err => { call.end() console.error(err) }) let count = 0 call.on('data', res => { // サービスのリスト取得時の処理 if (res.list_services_response) { const names = res.list_services_response.service.map(s => s.name) count = names.length names.forEach(name => call.write({host: 'localhost', file_containing_symbol: name}) ) } // サービスのインターフェース定義取得時の処理 else if (res.file_descriptor_response) { if (--count == 0) { // インターフェース定義を全て取得したら終了 call.end() } res.file_descriptor_response.file_descriptor_proto .map(buf => descriptor.FileDescriptorProto.decode(buf)) .forEach(d => { console.log(JSON.stringify(d, null, 2)) }) } else { console.log(res) call.end() } }) call.write({host: 'localhost', list_services: ''})
実行結果は以下の通りです。
実行結果
> node load_symbols.js { "name": "grpc_reflection_v1alpha/reflection.proto", "package": "grpc.reflection.v1alpha", ・・・ "service": [ { "name": "ServerReflection", "method": [ { "name": "ServerReflectionInfo", "inputType": ".grpc.reflection.v1alpha.ServerReflectionRequest", "outputType": ".grpc.reflection.v1alpha.ServerReflectionResponse", "clientStreaming": true, "serverStreaming": true } ] } ], "syntax": "proto3" } { "name": "item/item.proto", "package": "item", "dependency": [ "google/protobuf/empty.proto" ], ・・・ "service": [ { "name": "ItemService", "method": [ { "name": "GetItem", "inputType": ".item.ItemRequest", "outputType": ".item.Item" }, { "name": "AddItem", "inputType": ".item.AddItemRequest", "outputType": ".google.protobuf.Empty" } ] } ], "options": { "goPackage": "sample/proto/item" }, "syntax": "proto3" }
備考
別の実装例として、処理毎に個別のストリーミングで処理するようにして Promise 化してみました。
load_symbols2.js
const grpc = require('grpc') const protoLoader = require('@grpc/proto-loader') const descriptor = require('protobufjs/ext/descriptor') ・・・ const merge = a => b => Object.fromEntries([a, b].map(Object.entries).flat()) const serverReflectionInfo = (f, g) => new Promise((resolve, revoke) => { const call = client.ServerReflectionInfo() call.on('error', err => { call.end() revoke(err) }) call.on('data', res => { call.end() resolve( g(res) ) }) call.write( f({host: 'localhost'}) ) }) // サービスのリスト取得 const listServices = () => serverReflectionInfo( merge({list_services: ''}), res => res.list_services_response.service.map(s => s.name) ) // サービスのインターフェース定義取得 const loadSymbol = name => serverReflectionInfo( merge({file_containing_symbol: name}), res => res.file_descriptor_response.file_descriptor_proto .map(buf => descriptor.FileDescriptorProto.decode(buf)) ) listServices() .then(names => Promise.all(names.map(loadSymbol)) ) .then(ds => ds.flat()) .then(ds => ds.forEach(d => console.log(JSON.stringify(d, null, 2)))) .catch(err => console.error(err))
実行結果
> node load_symbols2.js { "name": "grpc_reflection_v1alpha/reflection.proto", "package": "grpc.reflection.v1alpha", ・・・ "service": [ { "name": "ServerReflection", "method": [ { "name": "ServerReflectionInfo", "inputType": ".grpc.reflection.v1alpha.ServerReflectionRequest", "outputType": ".grpc.reflection.v1alpha.ServerReflectionResponse", "clientStreaming": true, "serverStreaming": true } ] } ], "syntax": "proto3" } { "name": "item/item.proto", "package": "item", "dependency": [ "google/protobuf/empty.proto" ], ・・・ "service": [ { "name": "ItemService", "method": [ { "name": "GetItem", "inputType": ".item.ItemRequest", "outputType": ".item.Item" }, { "name": "AddItem", "inputType": ".item.AddItemRequest", "outputType": ".google.protobuf.Empty" } ] } ], "options": { "goPackage": "sample/proto/item" }, "syntax": "proto3" }