gRPC Server Reflection のクライアント処理

gRPC Server Reflection を呼び出す処理を Node.js で実装してみました。

ソースは http://github.com/fits/try_samples/tree/master/blog/20191008/

事前準備(サーバー実装)

まずは、gRPC Server Reflection を有効にしたサーバー処理を用意します。 Node.js での実装は無理そうだったので(未実装らしい) Go 言語で実装します。

protoc コマンドをインストールした後、Go 言語用の protoc プラグインである protoc-gen-go をインストールします。

protoc-gen-go インストール
> go get -u github.com/golang/protobuf/protoc-gen-go

google.golang.org/grpc 等のライブラリをビルド時に自動ダウンロードするように、Go のモジュールファイル go.mod を用意しておきます。

go.mod の作成
> go mod init sample

proto ファイルを作成してインターフェースを定義します。 今回は、go_package を使って Go 用のパッケージを別途定義してみました。

proto/item/item.proto
syntax = "proto3";

import "google/protobuf/empty.proto";

option go_package = "sample/proto/item";

package item;

message ItemRequest {
    string item_id = 1;
}

message AddItemRequest {
    string item_id = 1;
    uint64 price = 2;
}

message Item {
    string item_id = 1;
    uint64 price = 2;
}

service ItemService {
    rpc GetItem(ItemRequest) returns (Item);
    rpc AddItem(AddItemRequest) returns (google.protobuf.Empty);
}

protoc コマンドで Go 用のコードを生成します。

protoc によるコード生成
> protoc -I=proto --go_out=plugins=grpc,paths=source_relative:./proto proto/item/item.proto

サーバー処理を実装します。

Server Reflection を有効化するには google.golang.org/grpc/reflection を import して reflection.Register を適用するだけなので簡単です。

server/main.go
package main

import (
    "context"
    "fmt"
    "net"
    "log"
    "google.golang.org/grpc"
    "google.golang.org/grpc/reflection"
    empty "github.com/golang/protobuf/ptypes/empty"
    pb "sample/proto/item"
)

type Server struct {
    Items map[string]pb.Item
}

func (s *Server) GetItem(ctx context.Context, req *pb.ItemRequest) (*pb.Item, error) {
    log.Println("call GetItem: ", req)

    item, ok := s.Items[req.GetItemId()]

    if !ok {
        return nil, fmt.Errorf("item not found: %s", req.GetItemId())
    }

    return &item, nil
}

func (s *Server) AddItem(ctx context.Context, req *pb.AddItemRequest) (*empty.Empty, error) {
    log.Println("call AddItem: ", req)

    s.Items[req.GetItemId()] = pb.Item{ItemId: req.GetItemId(), Price: req.GetPrice()}

    return &empty.Empty{}, nil
}

func main() {
    address := ":50051"

    listen, err := net.Listen("tcp", address)

    if err != nil {
        log.Fatalf("error: %v", err)
    }

    s := grpc.NewServer()

    pb.RegisterItemServiceServer(s, &Server{Items: make(map[string]pb.Item)})
    // gRPC Server Reflection 有効化
    reflection.Register(s)

    log.Println("server start:", address)

    if err := s.Serve(listen); err != nil {
        log.Fatalf("failed serve: %v", err)
    }
}

上記を実行しておきます。(go run でビルドも実施されます)

初回実行時は google.golang.org/grpc 等の依存ライブラリが自動的にダウンロードされます。

実行
> go run server/main.go

・・・
2019/10/06 22:00:00 server start: :50051

サーバー側のファイル構成は以下のようになっています。

ファイル構成
  • go.mod
  • go.sum
  • proto
    • item
      • item.proto
      • item.pb.go
  • server
    • main.go

gRPC Server Reflection クライアント処理

それでは、本題の gRPC Server Reflection を呼び出す処理を実装していきます。

準備

どのように実装すべきか分からなかったので、とりあえず今回は https://github.com/grpc/grpc/blob/master/src/proto/grpc/reflection/v1alpha/reflection.proto を使う事にしました。

まずは、この proto ファイルをローカルにダウンロードしておきます。

proto ファイルのダウンロード
> curl -O https://raw.githubusercontent.com/grpc/grpc/master/src/proto/grpc/reflection/v1alpha/reflection.proto

Node.js で gRPC 処理を実装するため grpcproto-loader をインストールしておきます。

grpc と proto-loader のインストール
> npm install grpc @grpc/proto-loader

(a) サービスのリスト取得

はじめに、サービス名をリストアップする処理を実装してみます。

reflection.proto を見てみると、以下のように ServerReflectionInfo メソッドの引数である ServerReflectionRequestmessage_request にどのフィールドを指定するかで取得内容が変わるようになっています。

サービスのリストを取得するには list_services フィールドを使えば良さそうです。

reflection.proto の該当箇所
・・・
package grpc.reflection.v1alpha;

service ServerReflection {
  rpc ServerReflectionInfo(stream ServerReflectionRequest)
      returns (stream ServerReflectionResponse);
}

message ServerReflectionRequest {
  string host = 1;

  oneof message_request {
    string file_by_filename = 3;
    string file_containing_symbol = 4;
    ExtensionRequest file_containing_extension = 5;
    string all_extension_numbers_of_type = 6;
    string list_services = 7;
  }
}
・・・

ServerReflectionInfo メソッドは引数と戻り値の両方に stream が指定されている双方向ストリーミング RPC となっているため、以下のように write でリクエストメッセージを送信して on('data', ・・・) でレスポンスメッセージを取得する事になります。

また、end でストリーミング処理を終了します。

list_services.js
const grpc = require('grpc')
const protoLoader = require('@grpc/proto-loader')

const pd = protoLoader.loadSync('reflection.proto', {
    keepCase: true,
    defaults: true
})

const proto = grpc.loadPackageDefinition(pd).grpc.reflection.v1alpha

const client = new proto.ServerReflection(
    '127.0.0.1:50051', 
    grpc.credentials.createInsecure()
)

const call = client.ServerReflectionInfo()

call.on('error', err => {
    // ストリーミングの終了
    call.end()
    console.error(err)
})
// レスポンスメッセージの受信
call.on('data', res => {
    // ストリーミングの終了
    call.end()
    
    // サービス名の出力
    res.list_services_response.service.forEach(s => {
        console.log(s.name)
    })
})
// リクエストメッセージの送信
call.write({host: 'localhost', list_services: ''})

実行結果は以下の通り、サービス名を出力できました。

実行結果
> node list_services.js

grpc.reflection.v1alpha.ServerReflection
item.ItemService

(b) サービスのインターフェース定義取得

次に、サービスの内容(インターフェース定義)を取得してみます。

こちらは、リクエストメッセージの file_containing_symbol フィールドにサービス名を指定する事で取得できます。

ただ、レスポンスメッセージの該当フィールドの内容が reflection.proto のコメントにあるように FileDescriptorProtoシリアライズした結果 ※ (の配列)となっている点に注意が必要です。

 ※ bytes 型は Node.js では Buffer として扱われる
reflection.proto の該当箇所
・・・
message FileDescriptorResponse {
  // Serialized FileDescriptorProto messages. We avoid taking a dependency on
  // descriptor.proto, which uses proto2 only features, by making them opaque
  // bytes instead.
  repeated bytes file_descriptor_proto = 1;
}
・・・

FileDescriptorProto へのデシリアライズに関しては試行錯誤しましたが、最も手軽そうな protobufjs/ext/descriptorFileDescriptorProto.decode を使う事にしました。

load_symbol.js
const grpc = require('grpc')
const protoLoader = require('@grpc/proto-loader')
const descriptor = require('protobufjs/ext/descriptor')

const serviceName = process.argv[2]

・・・

const call = client.ServerReflectionInfo()

call.on('error', err => {
    call.end()
    console.error(err)
})

call.on('data', res => {
    call.end()
    
    res.file_descriptor_response.file_descriptor_proto
        // FileDescriptorProto デシリアライズ
        .map(buf => descriptor.FileDescriptorProto.decode(buf))
        .forEach(d => {
            // JSON 化して出力
            console.log(JSON.stringify(d, null, 2))
        })
})

call.write({host: 'localhost', file_containing_symbol: serviceName})

item.ItemService のサービス内容を取得してみた結果です。 go_package の内容も含め、問題なく取得できているようです。

実行結果
> node load_symbol.js item.ItemService

{
  "name": "item/item.proto",
  "package": "item",
  "dependency": [
    "google/protobuf/empty.proto"
  ],
  "messageType": [
    {
      "name": "ItemRequest",
      "field": [
        {
          "name": "item_id",
          "number": 1,
          "label": "LABEL_OPTIONAL",
          "type": "TYPE_STRING",
          "jsonName": "itemId"
        }
      ]
    },
    {
      "name": "AddItemRequest",
      "field": [
        {
          "name": "item_id",
          "number": 1,
          "label": "LABEL_OPTIONAL",
          "type": "TYPE_STRING",
          "jsonName": "itemId"
        },
        {
          "name": "price",
          "number": 2,
          "label": "LABEL_OPTIONAL",
          "type": "TYPE_UINT64",
          "jsonName": "price"
        }
      ]
    },
    {
      "name": "Item",
      "field": [
        {
          "name": "item_id",
          "number": 1,
          "label": "LABEL_OPTIONAL",
          "type": "TYPE_STRING",
          "jsonName": "itemId"
        },
        {
          "name": "price",
          "number": 2,
          "label": "LABEL_OPTIONAL",
          "type": "TYPE_UINT64",
          "jsonName": "price"
        }
      ]
    }
  ],
  "service": [
    {
      "name": "ItemService",
      "method": [
        {
          "name": "GetItem",
          "inputType": ".item.ItemRequest",
          "outputType": ".item.Item"
        },
        {
          "name": "AddItem",
          "inputType": ".item.AddItemRequest",
          "outputType": ".google.protobuf.Empty"
        }
      ]
    }
  ],
  "options": {
    "goPackage": "sample/proto/item"
  },
  "syntax": "proto3"
}

(c) 全サービスのインターフェース定義取得

最後に、全サービスのインターフェース定義を取得する処理を実装してみます。

双方向ストリーミング RPC を活用して、サービスのリスト取得とサービス内容の取得を同じストリーミング上で行ってみました。

load_symbols.js
const grpc = require('grpc')
const protoLoader = require('@grpc/proto-loader')
const descriptor = require('protobufjs/ext/descriptor')

・・・

const call = client.ServerReflectionInfo()

call.on('error', err => {
    call.end()
    console.error(err)
})

let count = 0

call.on('data', res => {
    // サービスのリスト取得時の処理
    if (res.list_services_response) {
        const names = res.list_services_response.service.map(s => s.name)

        count = names.length

        names.forEach(name => 
            call.write({host: 'localhost', file_containing_symbol: name})
        )
    }
    // サービスのインターフェース定義取得時の処理
    else if (res.file_descriptor_response) {
        if (--count == 0) {
            // インターフェース定義を全て取得したら終了
            call.end()
        }

        res.file_descriptor_response.file_descriptor_proto
            .map(buf => descriptor.FileDescriptorProto.decode(buf))
            .forEach(d => {
                console.log(JSON.stringify(d, null, 2))
            })
    }
    else {
        console.log(res)
        call.end()
    }
})

call.write({host: 'localhost', list_services: ''})

実行結果は以下の通りです。

実行結果
> node load_symbols.js

{
  "name": "grpc_reflection_v1alpha/reflection.proto",
  "package": "grpc.reflection.v1alpha",
  ・・・
  "service": [
    {
      "name": "ServerReflection",
      "method": [
        {
          "name": "ServerReflectionInfo",
          "inputType": ".grpc.reflection.v1alpha.ServerReflectionRequest",
          "outputType": ".grpc.reflection.v1alpha.ServerReflectionResponse",
          "clientStreaming": true,
          "serverStreaming": true
        }
      ]
    }
  ],
  "syntax": "proto3"
}
{
  "name": "item/item.proto",
  "package": "item",
  "dependency": [
    "google/protobuf/empty.proto"
  ],
  ・・・
  "service": [
    {
      "name": "ItemService",
      "method": [
        {
          "name": "GetItem",
          "inputType": ".item.ItemRequest",
          "outputType": ".item.Item"
        },
        {
          "name": "AddItem",
          "inputType": ".item.AddItemRequest",
          "outputType": ".google.protobuf.Empty"
        }
      ]
    }
  ],
  "options": {
    "goPackage": "sample/proto/item"
  },
  "syntax": "proto3"
}

備考

別の実装例として、処理毎に個別のストリーミングで処理するようにして Promise 化してみました。

load_symbols2.js
const grpc = require('grpc')
const protoLoader = require('@grpc/proto-loader')
const descriptor = require('protobufjs/ext/descriptor')

・・・

const merge = a => b => Object.fromEntries([a, b].map(Object.entries).flat())

const serverReflectionInfo = (f, g) => new Promise((resolve, revoke) => {
    const call = client.ServerReflectionInfo()

    call.on('error', err => {
        call.end()
        revoke(err)
    })

    call.on('data', res => {
        call.end()
        resolve( g(res) )
    })

    call.write( f({host: 'localhost'}) )
})
// サービスのリスト取得
const listServices = () => serverReflectionInfo(
    merge({list_services: ''}),
    res => res.list_services_response.service.map(s => s.name)
)
// サービスのインターフェース定義取得
const loadSymbol = name => serverReflectionInfo(
    merge({file_containing_symbol: name}),
    res => res.file_descriptor_response.file_descriptor_proto
                .map(buf => descriptor.FileDescriptorProto.decode(buf))
)

listServices()
    .then(names => 
        Promise.all(names.map(loadSymbol))
    )
    .then(ds => ds.flat())
    .then(ds => ds.forEach(d => console.log(JSON.stringify(d, null, 2))))
    .catch(err => console.error(err))
実行結果
> node load_symbols2.js

{
  "name": "grpc_reflection_v1alpha/reflection.proto",
  "package": "grpc.reflection.v1alpha",
  ・・・
  "service": [
    {
      "name": "ServerReflection",
      "method": [
        {
          "name": "ServerReflectionInfo",
          "inputType": ".grpc.reflection.v1alpha.ServerReflectionRequest",
          "outputType": ".grpc.reflection.v1alpha.ServerReflectionResponse",
          "clientStreaming": true,
          "serverStreaming": true
        }
      ]
    }
  ],
  "syntax": "proto3"
}
{
  "name": "item/item.proto",
  "package": "item",
  "dependency": [
    "google/protobuf/empty.proto"
  ],
  ・・・
  "service": [
    {
      "name": "ItemService",
      "method": [
        {
          "name": "GetItem",
          "inputType": ".item.ItemRequest",
          "outputType": ".item.Item"
        },
        {
          "name": "AddItem",
          "inputType": ".item.AddItemRequest",
          "outputType": ".google.protobuf.Empty"
        }
      ]
    }
  ],
  "options": {
    "goPackage": "sample/proto/item"
  },
  "syntax": "proto3"
}