Node.js で GraphQL over gRPC 的な事をやってみる

gRPC 上で GraphQL を扱う GraphQL over gRPC 的な処理を Node.js で試しに実装してみました。

今回のコードは http://github.com/fits/try_samples/tree/master/blog/20201124/

はじめに

GraphQL はクエリ言語なので基本的に通信プロトコルには依存していません。

Web フロントエンドの用途では Apollo GraphQL が公開している GraphQL over WebSocket Protocol が有力そうですが、マイクロサービス等の用途で GraphQL を利用する事を考えると WebSocket よりも gRPC の方が適しているように思います。

  • GraphQL の Query や Mutation は gRPC の Unary RPC で実現可能
  • GraphQL の Subscription は gRPC の Server streaming RPC で実現可能 ※

そこで、とりあえず実装し確認してみたというのが本件の趣旨となっています。

 ※ GraphQL の Subscription を使わずに
    gRPC の streaming RPC で代用する事も考えられる

なお、GraphQL の処理に関しては「Deno で GraphQL」の内容をベースに、gRPC は「Node.js で gRPC を試す」の静的コード生成を用いて実装しています。

Query と Mutation - sample1

まずは Subscription を除いた Query と Mutation について実装してみます。

gRPC サービス定義

gRPC のサービス定義を下記のように定義してみました。

GraphQL のクエリは文字列で扱うので型は string、結果は実質的に JSON となるので型は google.protobuf.Struct としました。

ついでに、クエリの変数も渡せるようにして型は google.protobuf.Value としています。

ここで、google.protobuf.StructJSON Object を Protocol Buffers で表現するための型として定義されたもので、google.protobuf.ValueJSON Value(null、文字列、数値、配列、JSON Object 等)を表現するための型です。(参照 google/protobuf/struct.proto

proto/graphql.proto
syntax = "proto3";

import "google/protobuf/struct.proto";

package gql;

message QueryRequest {
    string query = 1;
    google.protobuf.Value variables = 2;
}

service GraphQL {
    rpc Query(QueryRequest) returns (google.protobuf.Struct);
}

この proto/graphql.proto から gRPC のコードを生成しておきます。

静的コード生成
> mkdir generated

> npm run gen-grpc
・・・
grpc_tools_node_protoc --grpc_out=grpc_js:generated --js_out=import_style=commonjs:generated proto/*.proto

package.json の内容は以下の通りです。

package.json
{
  "name": "sample1",
  "version": "1.0.0",
  "description": "",
  "scripts": {
    "gen-grpc": "grpc_tools_node_protoc --grpc_out=grpc_js:generated --js_out=import_style=commonjs:generated proto/*.proto"
  },
  "dependencies": {
    "@grpc/grpc-js": "^1.2.1",
    "google-protobuf": "^3.14.0",
    "graphql": "^15.4.0",
    "uuid": "^8.3.1"
  },
  "devDependencies": {
    "grpc-tools": "^1.10.0"
  }
}

サーバー実装

GraphQL を扱う gRPC サーバーを実装します。

gRPC のリクエストから GraphQL のクエリやその変数の内容を取得し、graphql 関数で処理した結果をレスポンスとして返すような処理となります。

google.protobuf.Struct や Value に該当する型は google-protobuf/google/protobuf/struct_pb.js にて StructValue として定義されており、プレーンな JavaScript オブジェクトと相互変換するための fromJavaScripttoJavaScript メソッドが用意されています。

下記コードでは QueryRequest の variables の内容を JavaScript オブジェクトとして取得するために toJavaScript を、graphql の処理結果を Struct で返すために fromJavaScript をそれぞれ使用しています。

server.js
const grpc = require('@grpc/grpc-js')
const { GraphQLService } = require('./generated/proto/graphql_grpc_pb')
const { Struct } = require('google-protobuf/google/protobuf/struct_pb')

const { graphql, buildSchema } = require('graphql')

const { v4: uuidv4 } = require('uuid')

// GraphQL スキーマ定義
const schema = buildSchema(`
    enum Category {
        Standard
        Extra
    }

    input CreateItem {
        category: Category!
        value: Int!
    }

    type Item {
        id: ID!
        category: Category!
        value: Int!
    }

    type Mutation {
        create(input: CreateItem!): Item
    }

    type Query {
        find(id: ID!): Item
    }
`)

const store = {}
// スキーマ定義に応じた GraphQL 処理の実装
const root = {
    create: ({ input: { category, value } }) => {
        console.log(`*** call create: category = ${category}, value = ${value}`)

        const id = `item-${uuidv4()}`
        const item = { id, category, value }

        store[id] = item

        return item
    },
    find: ({ id }) => {
        console.log(`*** call find: ${id}`)
        return store[id]
    }
}

const server = new grpc.Server()

server.addService(GraphQLService, {
    async query(call, callback) {
        try {
            const query = call.request.getQuery()
            const variables = call.request.getVariables().toJavaScript()
            // GraphQL の処理
            const r = await graphql(schema, query, root, {}, variables)

            callback(null, Struct.fromJavaScript(r))

        } catch(e) {
            console.error(e)
            callback(e)
        }
    }
})

server.bindAsync(
    '127.0.0.1:50051',
    grpc.ServerCredentials.createInsecure(),
    (err, port) => {
        if (err) {
            console.error(err)
            return
        }

        console.log(`start server: ${port}`)

        server.start()
    }
)

クライアント実装

クライアント側は以下のようになります。

client.js
const grpc = require('@grpc/grpc-js')
const { QueryRequest } = require('./generated/proto/graphql_pb')
const { GraphQLClient } = require('./generated/proto/graphql_grpc_pb')
const { Value } = require('google-protobuf/google/protobuf/struct_pb')

const client = new GraphQLClient(
    '127.0.0.1:50051',
    grpc.credentials.createInsecure()
)

const promisify = (obj, methodName) => args => 
    new Promise((resolve, reject) => {
        obj[methodName](args, (err, res) => {
            if (err) {
                reject(err)
            }
            else {
                resolve(res)
            }
        })
    })

const query = promisify(client, 'query')

const createRequest = (q, v = null) => {
    const req = new QueryRequest()

    req.setQuery(q)
    req.setVariables(Value.fromJavaScript(v))

    return req
}

const run = async () => {
    // Item の作成
    const r1 = await query(createRequest(`
        mutation {
            create(input: { category: Extra, value: 123 }) {
                id
            }
        }
    `))

    console.log(r1.toJavaScript())
    // 存在しない Item の find
    const r2 = await query(createRequest(`
        {
            find(id: "a1") {
                id
                value
            }
        }
    `))

    console.log(r2.toJavaScript())

    const id = r1.toJavaScript().data.create.id
    // 作成した Item の find (クエリ変数の使用)
    const r3 = await query(createRequest(
        `
            query findItem($id: ID!) {
                find(id: $id) {
                    id
                    category
                    value
                }
            }
        `,
        { id }
    ))

    console.log(r3.toJavaScript())
}

run().catch(err => console.error(err))

動作確認

server.js を実行しておきます。

server.js 実行
> node server.js
start server: 50051

client.js を実行した結果は以下の通りで、特に問題無く動作しているようです。

client.js 実行
> node client.js
{
  data: { create: { id: 'item-63bb7704-27b6-44ae-b955-61cbad83248d' } }
}
{ data: { find: null } }
{
  data: {
    find: {
      category: 'Extra',
      id: 'item-63bb7704-27b6-44ae-b955-61cbad83248d',
      value: 123
    }
  }
}

Subscription - sample2

次は Subscription の機能を追加します。

gRPC サービス定義

gRPC のサービス定義に Subscription 用のメソッドを追加し、sample1 と同様にコードを生成しておきます。

proto/graphql.proto
syntax = "proto3";

import "google/protobuf/struct.proto";

package gql;

message QueryRequest {
    string query = 1;
    google.protobuf.Value variables = 2;
}

service GraphQL {
    rpc Query(QueryRequest) returns (google.protobuf.Struct);
    rpc Subscription(QueryRequest) returns (stream google.protobuf.Struct);
}

サーバー実装

Deno で GraphQL」では単一の Subscription を処理するだけの実装だったので、複数クライアントからの Subscription を処理するために PubSub というクラスを追加し、subscription の呼び出し毎に MessageBox を作成、(クライアントが接続中の)有効な全ての MessageBox へメッセージを配信するようにしています。

server.js
const grpc = require('@grpc/grpc-js')
const { GraphQLService } = require('./generated/proto/graphql_grpc_pb')
const { Struct } = require('google-protobuf/google/protobuf/struct_pb')

const { graphql, buildSchema, subscribe, parse } = require('graphql')

const { v4: uuidv4 } = require('uuid')
// GraphQL スキーマ定義
const schema = buildSchema(`
    enum Category {
        Standard
        Extra
    }

    input CreateItem {
        category: Category!
        value: Int!
    }

    type Item {
        id: ID!
        category: Category!
        value: Int!
    }

    type Mutation {
        create(input: CreateItem!): Item
    }

    type Query {
        find(id: ID!): Item
    }

    type Subscription {
        created: Item
    }
`)

class MessageBox {
    #promises = []
    #resolves = []

    #appendPromise = () => this.#promises.push(
        new Promise(res => this.#resolves.push(res))
    )

    publish(msg) {
        if (this.#resolves.length == 0) {
            this.#appendPromise()
        }

        this.#resolves.shift()(msg)
    }

    [Symbol.asyncIterator]() {
        return {
            next: async () => {
                console.log('*** asyncIterator next')

                if (this.#promises.length == 0) {
                    this.#appendPromise()
                }

                const value = await this.#promises.shift()
                return { value, done: false }
            }
        }
    }
}
// クライアント毎の MessageBox を管理
class PubSub {
    #subscribes = []

    publish(msg) {
        this.#subscribes.forEach(s => s.publish(msg))
    }

    subscribe() {
        const sub = new MessageBox()
        this.#subscribes.push(sub)

        return sub
    }

    unsubscribe(sub) {
        this.#subscribes = this.#subscribes.filter(s => s != sub)
    }
}

const store = {}
const pubsub = new PubSub()

const root = {
    create: ({ input: { category, value } }) => {
        ・・・
    },
    find: ({ id }) => {
        ・・・
    }
}

const server = new grpc.Server()

server.addService(GraphQLService, {
   async query(call, callback) {
       ・・・
    },
    async subscription(call) {
        console.log('*** subscribed')

        try {
            const query = call.request.getQuery()
            const variables = call.request.getVariables().toJavaScript()

            const sub = pubsub.subscribe()

            call.on('cancelled', () => {
                console.log('*** unsubscribed')
                pubsub.unsubscribe(sub)
            })

            const subRoot = {
                created: () => sub
            }
            // GraphQL の Subscription 処理
            const aiter = await subscribe(schema, parse(query), subRoot, {}, variables)

            for await (const r of aiter) {
                // メッセージの配信
                call.write(Struct.fromJavaScript(r))
            }
        } catch(e) {
            console.error(e)
            call.destroy(e)
        }
    }
})

server.bindAsync(
    ・・・
)

Subscription 用クライアント実装

Subscription を呼び出すクライアントは以下のようになります。

client_subscribe.js
const grpc = require('@grpc/grpc-js')
const { QueryRequest } = require('./generated/proto/graphql_pb')
const { GraphQLClient } = require('./generated/proto/graphql_grpc_pb')
const { Value } = require('google-protobuf/google/protobuf/struct_pb')

const client = new GraphQLClient(
    '127.0.0.1:50051',
    grpc.credentials.createInsecure()
)

const req = new QueryRequest()

req.setQuery(`
    subscription {
        created {
            id
            category
        }
    }
`)

req.setVariables(Value.fromJavaScript(null))

const stream = client.subscription(req)

stream.on('data', msg => {
    const event = msg.toJavaScript()
    console.log(`*** received event = ${JSON.stringify(event)}`)
})

stream.on('end', () => console.log('*** stream end'))
stream.on('error', err => console.log(`*** stream error: ${err}`))

動作確認

server.js を実行した後、client_subscribe.js を 2つ実行して sample1 で作成した client.js を実行すると以下のようになりました。

server.js の出力結果
> node server.js
start server: 50051
*** subscribed
*** asyncIterator next
*** subscribed
*** asyncIterator next
*** call create: category = Extra, value = 123
*** asyncIterator next
*** asyncIterator next
*** call find: a1
*** call find: item-5e3f81ed-774a-4f7f-afc5-000a2db34859
client_subscribe.js の出力結果
> node client_subscribe.js
*** received event = {"data":{"created":{"category":"Extra","id":"item-5e3f81ed-774a-4f7f-afc5-000a2db34859"}}}