Node.js のマイクロサービスフレームワーク Moleculer

Moleculer という名のなかなか期待できそうな Node.js 用マイクロサービスフレームワークを見つけたのでご紹介します。

ロードバランサー、サーキットブレーカー、メトリクス収集、トレーシング、キャッシュ機能等、多彩な機能を備えていますが。

個人的に、イベント駆動アーキテクチャを採用し Kafka や NATS 等のメッセージブローカーと容易に連携できる事から、コレオグラフィ型のマイクロサービスを手軽に作れそうな点に注目しています。

なんとなく Akka と似ている気もしますが、Akka を使ったマイクロサービスフレームワークLagom などと比べても、簡単かつ手軽に使えるのが利点かと思います。

今回のソースは http://github.com/fits/try_samples/tree/master/blog/20200224/

(1) ローカルアプリケーション

とりあえず簡単なローカルアプリケーションを作成してみます。

以下のモジュールを使うのでインストールしておきます。

  • moleculer 0.14.2
moleculer インストール例
> npm install --save moleculer

Moleculer では、メソッド・アクション・イベントハンドリング等を定義した ServiceServiceBroker へ登録する事で処理を組み立てます。

ServiceBroker はノード毎にインスタンスを作る事になり、アクションの呼び出しやイベントの通知、別ノードとの連携等を担います。

アクションは call('<サービス名>.<アクション名>', <パラメータ>) で呼び出す事ができ、アクションで return した値が call の戻り値となります。

イベントの発火(通知)は emit('<イベント名>', <イベント内容>)broadcast('<イベント名>', <イベント内容>) ※ で行います。

 ※ emit と broadcast の違いは、
    broadcast は全ノードの全サービスへ通知されますが、
    emit は同じサービスを複数のノードで実行していると
    その中の 1つのノードへ通知されます
order1.js
const { ServiceBroker } = require('moleculer')

const broker = new ServiceBroker()

broker.createService({
    name: 'order-service',
    actions: {
        order(ctx) {
            console.log(`# order-service.order: ${JSON.stringify(ctx.params)}`)
            // order.ordered イベントの発火
            ctx.emit('order.ordered', ctx.params)

            return ctx.params.id
        }
    }
})

broker.createService({
    name: 'order-check-service',
    events: {
        // order.ordered イベントのハンドリング
        'order.ordered'(ctx) {
            console.log(`## order-check-service ${ctx.eventName}: ${JSON.stringify(ctx.params)}`)
        }
    }
})

const run = async () => {
    await broker.start()

    const order1 = {id: 'order1', item: 'item1', qty: 2}
    // order アクションの呼び出し
    const res1 = await broker.call('order-service.order', order1)

    console.log(`order result: ${res1}`)
}

run().catch(err => console.error(err))
実行結果
> node order1.js

[2020-02-18T14:08:35.336Z] INFO  ・・・: Moleculer v0.14.2 is starting...
・・・
# order-service.order: {"id":"order1","item":"item1","qty":2}
## order-check-service order.ordered: {"id":"order1","item":"item1","qty":2}
order result: order1
・・・
[2020-02-18T14:08:35.440Z] INFO  ・・・: ServiceBroker is stopped. Good bye.

上記の処理へサービスを 1つ追加してみます。

イベントのハンドリングではワイルドカード * も使えるようになっていますので使ってみます。

order2.js
const { ServiceBroker } = require('moleculer')

const broker = new ServiceBroker()

broker.createService({
    name: 'order-service',
    actions: {
        order(ctx) {
            console.log(`# order-service.order: ${JSON.stringify(ctx.params)}`)
            ctx.emit('order.ordered', ctx.params)
            return ctx.params.id
        }
    },
    events: {
        // order.validated と order.invalidated イベントをハンドリング
        'order.*validated'(ctx) {
            console.log(`# order-service ${ctx.eventName}: ${JSON.stringify(ctx.params)}`)
        }
    }
})

broker.createService({
    name: 'order-check-service',
    events: {
        async 'order.ordered'(ctx) {
            console.log(`## order-check-service ${ctx.eventName}: ${JSON.stringify(ctx.params)}`)

            // item-service の check アクション呼び出し
            const isValid = await ctx.call('item-service.check', ctx.params)

            if (isValid) {
                ctx.emit('order.validated', ctx.params)
            }
            else {
                ctx.emit('order.invalidated', ctx.params)
            }
        }
    }
})

broker.createService({
    name: 'item-service',
    actions: {
        check(ctx) {
            console.log(`### item-service.check: ${JSON.stringify(ctx.params)}`)
            return ['item1', 'item2'].includes(ctx.params.item)
        }
    }
})

const run = async () => {
    await broker.start()

    const order1 = {id: 'order1', item: 'item1', qty: 2}
    const order2 = {id: 'order2', item: 'item3', qty: 1}

    const res1 = await broker.call('order-service.order', order1)

    console.log(`order result: ${res1}`)

    const res2 = await broker.call('order-service.order', order2)

    console.log(`order result: ${res2}`)
}

run().catch(err => console.error(err))
実行結果
> node order2.js

・・・
# order-service.order: {"id":"order1","item":"item1","qty":2}
## order-check-service order.ordered: {"id":"order1","item":"item1","qty":2}
### item-service.check: {"id":"order1","item":"item1","qty":2}
order result: order1
# order-service.order: {"id":"order2","item":"item3","qty":1}
## order-check-service order.ordered: {"id":"order2","item":"item3","qty":1}
### item-service.check: {"id":"order2","item":"item3","qty":1}
# order-service order.validated: {"id":"order1","item":"item1","qty":2}
order result: order2
# order-service order.invalidated: {"id":"order2","item":"item3","qty":1}
・・・

(2) メッセージブローカー(NATS Streaming)利用

次に、別ノードのサービスとメッセージブローカー経由で連携してみます。

処理内容としては Web API を通して受け取ったメッセージを別ノードのサービスに call・emit・broadcast します。

ここでは、メッセージブローカーとして Go言語で実装されており手軽に実行できる NATS Streaming を使用します。

まずは、以下のモジュールを使うのでインストールしておきます。

  • moleculer 0.14.2
  • moleculer-web 0.9.0
  • node-nats-streaming 0.2.6
モジュールインストール例
> npm install --save moleculer moleculer-web node-nats-streaming

Web API としてリクエストを受け付ける処理を実装します。

moleculer-web を require した結果を mixins して routes で HTTP リクエストとそれに応じて実行するアクションをマッピングします。

NATS Streaming を使用するため transporterSTAN を設定します。

nodeID はデフォルトでホスト名とプロセスIDから生成されるようになっていますが、他のノードとの接続に使うので、ここでは明示的に設定しています。

pub.js
const { ServiceBroker } = require('moleculer')

const HTTPServer = require('moleculer-web')

const broker = new ServiceBroker({
    nodeID: 'pub',
    transporter: 'STAN'
})

broker.createService({
    name: 'web',
    mixins: [HTTPServer],
    settings: {
        routes: [
            { aliases: {
                'PUT /pub': 'pub.publish'
            }}
        ]
    }
})

broker.createService({
    name: 'pub',
    actions: {
        async publish(ctx) {
            const params = ctx.params

            // アクション呼び出し
            const res = await ctx.call('sub.direct', params)

            // イベントの emit
            ctx.emit('event.emit', params)
            // イベントの broadcast
            ctx.broadcast('event.broadcast', params)

            return res
        }
    }
})

broker.start().catch(err => console.error(err))

次に、NATS Streaming 経由でアクションの呼び出しやイベント通知を受け取る処理を実装します。

こちらは複数プロセスで実行できるように nodeID をコマンド引数として渡すようにしています。

sub.js
const { ServiceBroker } = require('moleculer')

const id = process.argv[2]

const broker = new ServiceBroker({
    nodeID: id,
    transporter: 'STAN'
})

broker.createService({
    name: 'sub',
    actions: {
        direct(ctx) {
            console.log(`* sub.direct: nodeID=${broker.nodeID}, params=${JSON.stringify(ctx.params)}`)
            return broker.nodeID
        }
    },
    events: {
        'event.*'(ctx) {
            console.log(`* ${ctx.eventName}: nodeID=${broker.nodeID}, params=${JSON.stringify(ctx.params)}`)
        }
    }
})

broker.start().catch(err => console.error(err))

動作確認

まずは NATS Streaming を実行しておきます。

NATS Streaming 実行
> nats-streaming-server

・・・
[2668] 2020/02/23 19:27:33.566759 [[32mINF[0m] Server is ready
[2668] 2020/02/23 19:27:33.591774 [[32mINF[0m] STREAM: Recovering the state...
[2668] 2020/02/23 19:27:33.592779 [[32mINF[0m] STREAM: No recovered state
[2668] 2020/02/23 19:27:33.843799 [[32mINF[0m] STREAM: Message store is MEMORY
[2668] 2020/02/23 19:27:33.844764 [[32mINF[0m] STREAM: ---------- Store Limits ----------
[2668] 2020/02/23 19:27:33.846740 [[32mINF[0m] STREAM: Channels:                  100 *
[2668] 2020/02/23 19:27:33.848741 [[32mINF[0m] STREAM: --------- Channels Limits --------
[2668] 2020/02/23 19:27:33.849804 [[32mINF[0m] STREAM:   Subscriptions:          1000 *
[2668] 2020/02/23 19:27:33.850745 [[32mINF[0m] STREAM:   Messages     :       1000000 *
[2668] 2020/02/23 19:27:33.852756 [[32mINF[0m] STREAM:   Bytes        :     976.56 MB *
[2668] 2020/02/23 19:27:33.853740 [[32mINF[0m] STREAM:   Age          :     unlimited *
[2668] 2020/02/23 19:27:33.854745 [[32mINF[0m] STREAM:   Inactivity   :     unlimited *
[2668] 2020/02/23 19:27:33.857744 [[32mINF[0m] STREAM: ----------------------------------
[2668] 2020/02/23 19:27:33.858745 [[32mINF[0m] STREAM: Streaming Server is ready

pub.js を起動し、2つ sub.js を実行しておきます。

これによって、NATS Streaming へ MOL.DISCOVER.<nodeID>MOL.EVENT.<nodeID>MOL.REQ.<nodeID> 等のような様々なチャネルが登録され、ノード間の接続が確立します。

pub.js 実行
> node pub.js

・・・
[2020-02-23T10:29:58.028Z] INFO  pub/REGISTRY: Strategy: RoundRobinStrategy
・・・
[2020-02-23T10:29:58.102Z] INFO  pub/WEB: Register route to '/'
[2020-02-23T10:29:58.158Z] INFO  pub/WEB:      PUT /pub => pub.publish
・・・
[2020-02-23T10:30:29.936Z] INFO  pub/REGISTRY: Node 'sub1' connected.
[2020-02-23T10:30:36.918Z] INFO  pub/REGISTRY: Node 'sub2' connected.
sub.js 実行1(sub1)
> node sub.js sub1

・・・
[2020-02-23T10:30:34.026Z] INFO  sub1/REGISTRY: Node 'pub' connected.
[2020-02-23T10:30:36.919Z] INFO  sub1/REGISTRY: Node 'sub2' connected.
sub.js 実行2(sub2)
> node sub.js sub2

・・・
[2020-02-23T10:30:39.027Z] INFO  sub2/REGISTRY: Node 'pub' connected.
[2020-02-23T10:30:40.111Z] INFO  sub2/REGISTRY: Node 'sub1' connected.

この状態で 2回 HTTP リクエストを実施してみます。

HTTP リクエスト 1回目
$ curl -s -XPUT -H "Content-Type: application/json" http://localhost:3000/pub -d "{\"no\": 1}"
"sub1"
HTTP リクエスト 2回目
$ curl -s -XPUT -H "Content-Type: application/json" http://localhost:3000/pub -d "{\"no\": 2}"
"sub2"

1回目の call と emit は sub1 側で、2回目は sub2 側で実行しておりロードバランスされています(デフォルトではラウンドロビンとなる)

broadcast は sub1 と sub2 のどちらにも通知されています。

sub.js sub1 の結果
・・・
* sub.direct: nodeID=sub1, params={"no":1}
* event.emit: nodeID=sub1, params={"no":1}
* event.broadcast: nodeID=sub1, params={"no":1}
* event.broadcast: nodeID=sub1, params={"no":2}
sub.js sub2 の結果
・・・
* event.broadcast: nodeID=sub2, params={"no":1}
* sub.direct: nodeID=sub2, params={"no":2}
* event.emit: nodeID=sub2, params={"no":2}
* event.broadcast: nodeID=sub2, params={"no":2}

(3) MongoDB への CRUDREST API

最後に、MongoDB へデータを CRUD する処理を REST API 化してみます。

以下のモジュールを使うのでインストールしておきます。

  • moleculer 0.14.2
  • moleculer-web 0.9.0
  • node-nats-streaming 0.2.6
  • moleculer-db 0.8.5
  • moleculer-db-adapter-mongo 0.4.7
モジュールインストール例
> npm install --save moleculer moleculer-web node-nats-streaming moleculer-db moleculer-db-adapter-mongo

これまでは createService でサービスを定義していましたが、ここでは別ファイルでサービスを定義する方法を試してみます。

ServiceBroker の loadServices を使うと、デフォルトで services/xxx.service.js (xxx は任意) ファイルをロードして登録してくれます。

server.js
const { ServiceBroker } = require('moleculer')

const id = process.argv[2]

const broker = new ServiceBroker({
    nodeID: id,
    transporter: 'STAN'
})

broker.loadServices()

broker.start().catch(err => console.error(err))

REST API のサービスを実装します。

aliases で REST とするとアクション側で "POST /" のような HTTP メソッドとパスを(rest 項目で)設定する事になります。

services/api.service.js
const HTTPServer = require('moleculer-web')

module.exports = {
    name: 'api',
    mixins: [HTTPServer],
    settings: {
        routes: [
            { aliases: { 'REST items': 'item' } }
        ]
    }
}

moleculer-db がデフォルトで REST API に対応した CRUD 処理をアクションとして定義してくれているので、今回はこれをそのまま使う事にします。

ここでは entityCreated 等を使って更新時のイベント通知のみを実装しています。

なお、MongoDB の接続文字列やコレクション名は環境変数で設定するようにしています。

services/item.service.js
const DbService = require('moleculer-db')
const MongoDBAdapter = require('moleculer-db-adapter-mongo')

const mongoUri = process.env['MONGO_URI']
const colName = process.env['MONGO_COLLECTION_NAME']

module.exports = {
    name: 'item',
    mixins: [DbService],
    adapter: new MongoDBAdapter(mongoUri, {
         useUnifiedTopology: true
    }),
    collection: colName,
    entityCreated(entity, ctx) {
        console.log(`entityCreated: ${JSON.stringify(entity)}`)
        ctx.emit('item.created', entity)
    },
    entityUpdated(entity, ctx) {
        console.log(`entityUpdated: ${JSON.stringify(entity)}`)
        ctx.emit('item.updated', entity)
    },
    entityRemoved(entity, ctx) {
        console.log(`entityRemoved: ${JSON.stringify(entity)}`)
        ctx.emit('item.removed', entity)
    }
}

動作確認用にイベントをハンドリングする処理も用意しました。

handler.js
const { ServiceBroker } = require('moleculer')

const id = process.argv[2]

const broker = new ServiceBroker({
    nodeID: id,
    transporter: 'STAN'
})

broker.createService({
    name: 'handler',
    events: {
        'item.*'(ctx) {
            console.log(`* ${ctx.eventName}: nodeID=${broker.nodeID}, params=${JSON.stringify(ctx.params)}`)
        }
    }
})

broker.start().catch(err => console.error(err))

動作確認

MongoDB と NATS Streaming を起動しておきます。

MongoDB 実行
> mongod --dbpath itemdb
NATS Streaming 実行
> nats-streaming-server

接続先の MongoDB やコレクション名を環境変数で設定してから、server.js を実行します。

server.js 実行
> set MONGO_URI=mongodb://localhost/sample
> set MONGO_COLLECTION_NAME=items

> node server.js server1

・・・
[2020-02-23T11:41:06.978Z] INFO  server1/API: Register route to '/'
[2020-02-23T11:41:07.031Z] INFO  server1/API:      GET /items => item.list
[2020-02-23T11:41:07.034Z] INFO  server1/API:      GET /items/:id => item.get
[2020-02-23T11:41:07.035Z] INFO  server1/API:     POST /items => item.create
[2020-02-23T11:41:07.036Z] INFO  server1/API:      PUT /items/:id => item.update
[2020-02-23T11:41:07.037Z] INFO  server1/API:    PATCH /items/:id => item.patch
[2020-02-23T11:41:07.038Z] INFO  server1/API:   DELETE /items/:id => item.remove
・・・
[2020-02-23T11:41:47.389Z] INFO  server1/REGISTRY: Node 'handler1' connected.

イベントハンドラも別途実行しておきます。

handler.js 実行
> node handler.js handler1

・・・
[2020-02-23T11:41:48.322Z] INFO  handler1/REGISTRY: Node 'server1' connected.

準備が整ったので REST API を呼び出して CRUD 処理を順番に呼び出してみます。

create 実施
$ curl -s -XPOST -H "Content-Type: application/json" http://localhost:3000/items -d "{\"name\": \"item1\", \"qty\": 1}"

{"name":"item1","qty":1,"_id":"5e5266a735ebff2c9083a0af"}
read 実施
$ curl -s http://localhost:3000/items/5e5266a735ebff2c9083a0af

{"_id":"5e5266a735ebff2c9083a0af","name":"item1","qty":1}
update 実施
$ curl -s -XPUT -H "Content-Type: application/json" http://localhost:3000/items/5e5266a735ebff2c9083a0af -d "{\"qty\": 2}"

{"_id":"5e5266a735ebff2c9083a0af","name":"item1","qty":2}
delete 実施
$ curl -s -XDELETE http://localhost:3000/items/5e5266a735ebff2c9083a0af

{"_id":"5e5266a735ebff2c9083a0af","name":"item1","qty":2}
read 実施
$ curl -s http://localhost:3000/items/5e5266a735ebff2c9083a0af

{
  "name": "EntityNotFoundError",
  "message": "Entity not found",
  "code": 404,
  "type": null,
  "data": {
    "id": "5e5266a735ebff2c9083a0af"
  }
}

問題なく動作しているようです。 handler.js の出力結果も以下のようになりました。

handler.js の出力結果
・・・
* item.created: nodeID=handler1, params={"name":"item1","qty":1,"_id":"5e5266a735ebff2c9083a0af"}
* item.updated: nodeID=handler1, params={"_id":"5e5266a735ebff2c9083a0af","name":"item1","qty":2}
* item.removed: nodeID=handler1, params={"_id":"5e5266a735ebff2c9083a0af","name":"item1","qty":2}

Ramda で入れ子のオブジェクトをフラットにする

Ramda を使って入れ子になったオブジェクトをフラットにする処理を考えてみました。

ソースは http://github.com/fits/try_samples/tree/master/blog/20200216/

サンプル1

まずは、以下のように入れ子になったオブジェクトを使って

{
    item: {
        name: 'item-1',
        details: {
            color: 'white',
            size: 'L'
        }
    },
    num: 5
}

下記のように変換してみます。

{ name: 'item-1', color: 'white', size: 'L', num: 5 }

方法はいろいろありそうですが、ここではオブジェクトの値部分を単一要素のオブジェクト {<項目名>: <値>} へ変換してから R.mergeAll でマージするようにしてみました。

sample1.js
const R = require('ramda')

const data = {
    item: {
        name: 'item-1',
        details: {
            color: 'white',
            size: 'L'
        }
    },
    num: 5
}

// Object の判定
const isObject = R.pipe(R.type, R.equals('Object'))

// 値部分を単一要素のオブジェクトへ変換
const valuesToObjOf = 
    R.pipe(
        R.mapObjIndexed(
            R.ifElse(
                isObject,
                v => valuesToObjOf(v), 
                R.flip(R.objOf)
            )
        ),
        R.values,
        R.flatten
    )

const flatObj = 
    R.pipe(
        valuesToObjOf,
        R.mergeAll
    )

console.log( valuesToObjOf(data) )

console.log('------')

console.log( flatObj(data) )

値部分がオブジェクトの場合は再帰的に処理するようにしていますが、その判定に R.is(Object) を使うと Array 等も true になってしまい都合が悪いので、R.type の結果が Object か否かで判定するようにしています。

実行結果
> node sample1.js

[ { name: 'item-1' }, { color: 'white' }, { size: 'L' }, { num: 5 } ]
------
{ name: 'item-1', color: 'white', size: 'L', num: 5 }

サンプル2

上記の処理だと項目名が重複した際に不都合が生じるため、項目名を連結するように valuesToObjOf を改良してみました。

sample2.js
・・・

const valuesToObjOf = (obj, prefix = '') =>
    R.pipe(
        R.mapObjIndexed(
            R.ifElse(
                isObject,
                (v, k) => valuesToObjOf(v, `${prefix}${k}_`), 
                (v, k) => R.objOf(`${prefix}${k}`, v)
            )
        ),
        R.values,
        R.flatten
    )(obj)

const flatObj = 
    R.pipe(
        valuesToObjOf,
        R.mergeAll
    )

console.log( flatObj(data) )
実行結果
> node sample2.js

{
  item_name: 'item-1',
  item_details_color: 'white',
  item_details_size: 'L',
  num: 5
}

Odoo の在庫モジュールを JSON-RPC で操作2

前回 の続きで、今回は Odoo の在庫モジュールを JSON-RPC で操作し在庫移動(顧客への出荷)を処理してみます。

ソースは http://github.com/fits/try_samples/tree/master/blog/20200112/

在庫管理

Odoo の画面では在庫移動(顧客への出荷等)を以下のように処理しているようでした。(ステータスは ドラフト -> 待機中 -> 準備完了 -> 完了 のように遷移)

  • (1) stock.picking を create - 作成
  • (2) action_confirm の実行 - 処理準備
  • (3) action_assign の実行 - 利用可能準備(引当)
  • (4) button_validate の実行
  • (5) stock.immediate.transfer の process を実施

ただ、画面処理を前提としたような (4) 以降の処理を API として呼び出すには違和感があります。

そこで、button_validate の処理等を参照して別の方法で処理できないか調べてみたところ、以下のようにしても実現できそうでした。

  • (4') (1) で作られた stock.move.line の qty_done を更新
  • (5') action_done の実行

よって、今回はこの両方を試してみます。

(c) 在庫移動1 - button_validate 実行

まずは、button_validate を呼び出す方を試してみます。 (認証などの処理は前回と同じです)

sample3.js
const axios = require('axios')

・・・

const pickingTypeId = parseInt(process.argv[2])
const locationId = parseInt(process.argv[3])
const locationDestId = parseInt(process.argv[4])
const productId = parseInt(process.argv[5])
const qty = parseInt(process.argv[6])

const main = async () => {
    const auth = ・・・

    const uid = auth.data.result

    // (1) 作成
    const sp1 = await axios.post(url, {
        jsonrpc: '2.0',
        method: 'call',
        id: 'c2',
        params: {
            service: 'object',
            method: 'execute',
            args: [db, uid, password, 'stock.picking', 'create', {
                picking_type_id: pickingTypeId, 
                location_id: locationId, 
                location_dest_id: locationDestId, 
                move_lines: [
                    [0, 0, {
                        name: 'move',
                        product_id: productId,
                        product_uom: 1, 
                        product_uom_qty: qty
                    }]
                ]
            }]
        }
    })

    console.log(sp1.data)

    if (sp1.data.error) {
        return
    }

    const pid = sp1.data.result

    console.log(`id: ${pid}`)

    // (2) 処理準備
    const sp2 = await axios.post(url, {
        jsonrpc: '2.0',
        method: 'call',
        id: 'c3',
        params: {
            service: 'object',
            method: 'execute',
            args: [db, uid, password, 'stock.picking', 'action_confirm', pid]
        }
    })

    console.log(sp2.data)

    if (sp2.data.error) {
        return
    }

    // (3) 利用可能準備(引当)
    const sp3 = await axios.post(url, {
        jsonrpc: '2.0',
        method: 'call',
        id: 'c4',
        params: {
            service: 'object',
            method: 'execute',
            args: [db, uid, password, 'stock.picking', 'action_assign', pid]
        }
    })

    console.log(sp3.data)

    if (sp3.data.error) {
        return
    }

    // (4) 検証(button_validate の実行)
    const sp4 = await axios.post(url, {
        jsonrpc: '2.0',
        method: 'call',
        id: 'c5',
        params: {
            service: 'object',
            method: 'execute',
            args: [db, uid, password, 'stock.picking', 'button_validate', pid]
        }
    })

    console.log(sp4.data)

    if (sp4.data.error) {
        return
    }

    const vld = sp4.data.result

    // (5) stock.immediate.transfer の process を実行
    const res = await axios.post(url, {
        jsonrpc: '2.0',
        method: 'call',
        id: 'c7',
        params: {
            service: 'object',
            method: 'execute',
            args: [db, uid, password, vld.res_model, 'process', vld.res_id]
        }
    })

    console.log(res.data)
}

main().catch(err => console.error(err))

上記を使って、ピッキングタイプ 2(Delivery Orders) で在庫ロケーション 8(WH/Stock) から顧客出荷用のロケーション 5(Partner Locations/Customers) へ商品 19(Large Desk) を 2個移動してみます。

実行
> node sample3.js 2 8 5 19 2

{ jsonrpc: '2.0', id: 'c2', result: 70 }
id: 70
{ jsonrpc: '2.0', id: 'c3', result: true }
{ jsonrpc: '2.0', id: 'c4', result: true }
{
  jsonrpc: '2.0',
  id: 'c5',
  result: {
    name: 'Immediate Transfer?',
    type: 'ir.actions.act_window',
    view_mode: 'form',
    res_model: 'stock.immediate.transfer',
    views: [ [Array] ],
    view_id: 477,
    target: 'new',
    res_id: 8,
    context: {}
  }
}
{ jsonrpc: '2.0', id: 'c7', result: false }

在庫数を確認してみると 100個あった在庫が 98個となっているため、とりあえずは成功しているようです。

在庫数の確認
> node sample1.js 19

・・・
[
  {
    id: 19,
    name: 'Large Desk',
    product_tmpl_id: [ 13, '[E-COM09] Large Desk' ],
    qty_available: 98,
    virtual_available: 98
  }
]

(d) 在庫移動2 - qty_done の更新

次に qty_done を更新する方法も試してみます。

sample4.js
const axios = require('axios')

・・・

const pickingTypeId = parseInt(process.argv[2])
const locationId = parseInt(process.argv[3])
const locationDestId = parseInt(process.argv[4])
const productId = parseInt(process.argv[5])
const qty = parseInt(process.argv[6])

const main = async () => {
    ・・・

    // (3) 利用可能準備(引当)
    const sp3 = await axios.post(url, {
        jsonrpc: '2.0',
        method: 'call',
        id: 'd4',
        params: {
            service: 'object',
            method: 'execute',
            args: [db, uid, password, 'stock.picking', 'action_assign', pid]
        }
    })

    console.log(sp3.data)

    if (sp3.data.error) {
        return
    }

    // move_line_ids の取得
    const pick = await axios.post(url, {
        jsonrpc: '2.0',
        method: 'call',
        id: 'd5',
        params: {
            service: 'object',
            method: 'execute_kw',
            args: [db, uid, password, 'stock.picking', 'read', [pid], {
                fields: ['move_line_ids']
            }]
        }
    })

    console.log(pick.data)

    const lineIds = pick.data.result[0].move_line_ids

    console.log(`move line ids: ${lineIds}`)

    // (4') stock.move.line の qty_done(処理済み在庫数)を更新
    const sml = await axios.post(url, {
        jsonrpc: '2.0',
        method: 'call',
        id: 'd6',
        params: {
            service: 'object',
            method: 'execute_kw',
            args: [
                db, 
                uid, 
                password, 
                'stock.move.line', 
                'write', 
                [ lineIds, { qty_done: qty }],
                {}
            ]
        }
    })

    console.log(sml.data)

    if (sml.data.error) {
        return
    }

    // (5') action_done の実行
    const sp4 = await axios.post(url, {
        jsonrpc: '2.0',
        method: 'call',
        id: 'd7',
        params: {
            service: 'object',
            method: 'execute',
            args: [db, uid, password, 'stock.picking', 'action_done', pid]
        }
    })

    console.log(sp4.data)
}

main().catch(err => console.error(err))

button_validate の時と同じようにピッキングタイプ 2(Delivery Orders) で在庫ロケーション 8(WH/Stock) から顧客出荷用のロケーション 5(Partner Locations/Customers) へ商品 19(Large Desk) を今度は 3個移動してみます。

実行
> node sample4.js 2 8 5 19 3

{ jsonrpc: '2.0', id: 'd2', result: 71 }
id: 71
{ jsonrpc: '2.0', id: 'd3', result: true }
{ jsonrpc: '2.0', id: 'd4', result: true }
{
  jsonrpc: '2.0',
  id: 'd5',
  result: [ { id: 71, move_line_ids: [Array] } ]
}
move line ids: 73
{ jsonrpc: '2.0', id: 'd6', result: true }
{ jsonrpc: '2.0', id: 'd7', result: true }

98個あった在庫が 95個となっているので、こちらも一応は成功しているようです。

在庫確認
> node sample1.js 19

・・・
[
  {
    id: 19,
    name: 'Large Desk',
    product_tmpl_id: [ 13, '[E-COM09] Large Desk' ],
    qty_available: 95,
    virtual_available: 95
  }
]

Odoo の在庫モジュールを JSON-RPC で操作1

Python で実装されたオープンソース ERP である Odoo の在庫モジュールを JSON-RPC を使って操作してみました。

ソースは http://github.com/fits/try_samples/tree/master/blog/20200112/

はじめに

Docker で PostgreSQL と Odoo を実行しておきます。

PostgreSQL 実行
$ docker run -d -e POSTGRES_USER=odoo -e POSTGRES_PASSWORD=odoo -e POSTGRES_DB=postgres --name db postgres
Odoo 実行
$ docker run -d -p 8069:8069 --name odoo --link db:db odoo

http://localhost:8069/ へアクセスすると DB の初期構築画面(以下)が表示されるので設定します。

f:id:fits:20200112193002p:plain

「Demo data」 にチェックを付けておくと、デモ用のデータが登録されるので、今回はこのデータを使います。

デフォルトの状態では在庫管理モジュールが導入されていないため、「アプリ」画面で「在庫」モジュールをインストールしておきます。

f:id:fits:20200112193044p:plain

Odoo には JSON-RPC と XML-RPC が用意されており、これらを利用することで永続モデルクラス(models ディレクトリに配置)等のメソッドを外部から呼び出せるようになっています。

今回は JSON-RPC の方を使いますが、参考までに XML-RPC の処理を Python で実装してみると以下のようになりました。

なお、qty_available は在庫モジュールをインストールする事によって Product クラス(product.product)へ追加される項目です。

sample.py (XML-RPC で商品情報を取得するサンプル)
import xmlrpc.client

url = 'http://localhost:8069'

db = 'odoo1'
user = 'admin@example.com'
password = 'pass'

common = xmlrpc.client.ServerProxy(f'{url}/xmlrpc/2/common')

# 認証
uid = common.authenticate(db, user, password, {})

print(f'uid: {uid}')

models = xmlrpc.client.ServerProxy(f'{url}/xmlrpc/2/object')

obj = 'product.product'

# 商品の件数取得
count = models.execute(db, uid, password, obj, 'search_count', [])

print(f'count: {count}')

# 取得する項目
kw = {'fields': ['name', 'lst_price', 'product_tmpl_id', 'qty_available']}

# 商品情報取得
products = models.execute_kw(db, uid, password, obj, 'search_read', [], kw)

for p in products:
    print(p)
実行結果
> python sample.py

uid: 2
count: 32
{'id': 15, 'name': 'Customizable Desk (CONFIG)', 'lst_price': 800.4, 'product_tmpl_id': [9, 'Customizable Desk (CONFIG)'], 'qty_available': 60.0}
{'id': 16, 'name': 'Corner Desk Right Sit', 'lst_price': 147.0, 'product_tmpl_id': [10, '[E-COM06] Corner Desk Right Sit'], 'qty_available': 45.0}
{'id': 17, 'name': 'Large Cabinet', 'lst_price': 320.0, 'product_tmpl_id': [11, '[E-COM07] Large Cabinet'], 'qty_available': 250.0}
・・・

在庫管理

それでは、JSON-RPC で在庫の操作を行ってみます。

JSON-RPC や XML-RPC で任意の処理を呼び出すためには、認証を行って uid を取得します。

JSON-RPC における認証は、下記のような JSON/jsonrpc へ POST する事で実施します。

args には DB の初期構築時に設定した Database NameEmailPassword の値を順に指定した後、最後の引数を {}null 等にしておきます。

id はリクエストとレスポンスを識別するためのもので、任意の文字列や数値を指定する事ができるようです。

認証用リクエスJSON
{
  "jsonrpc": "2.0",
  "method": "call",
  "id": "a1",
  "params": {
    "service": "common",
    "method": "authenticate",
    "args": ["odoo1", "admin@example.com", "pass", {}]
  }
}

認証に成功すると以下のようなレスポンスが返り、result 項目の値が uid となっています。

id はリクエストで指定したものと同じ値になります。

認証用レスポンス JSON
{"jsonrpc": "2.0", "id": "a1", "result": 2}

認証で取得した uid を使ってモデルクラスのメソッドを呼び出すには、以下のような JSON を使います。(execute_kw の場合は args 内の要素が 1つ増える)

任意のメソッド呼び出し用リクエスJSON(execute の場合)
{
    "jsonrpc": "2.0",
    "method": "call",
    "id": <任意の値>,
    "params": {
        "service": "object",
        "method": "execute",
        "args": [<Database Name>, <uid>, <Password>, <モデル名>, <メソッド名>, <メソッドの引数>]
    }
}

(a) 在庫数の取得

まずは、指定商品の在庫数を取得してみます。

商品の在庫数は、在庫モジュールによって Product クラスへ追加された qty_availablevirtual_available で取得できるようです。

qty_available が商品の在庫画面上で 手持在庫 として表示される実際の在庫数、virtual_available が画面上の 見通し として表示される数値で実際の在庫数に入出荷の予定を加味したものです。( virtual_available = qty_available + incoming_qty - outgoing_qty

sample1.js
const axios = require('axios')

const url = 'http://localhost:8069/jsonrpc'

const db = 'odoo1'
const user = 'admin@example.com'
const password = 'pass'

const productId = parseInt(process.argv[2])

const main = async () => {
    // 認証
    const auth = await axios.post(url, {
        jsonrpc: '2.0',
        method: 'call',
        id: 'a1',
        params: {
            service: 'common',
            method: 'authenticate',
            args: [db, user, password, {}]
        }
    })

    const uid = auth.data.result

    console.log(`uid: ${uid}`)
    // 商品情報の取得
    const prd = await axios.post(url, {
        jsonrpc: '2.0',
        method: 'call',
        id: 'a2',
        params: {
            service: 'object',
            method: 'execute_kw',
            args: [
                db, 
                uid, 
                password, 
                'product.product', 
                'read', 
                [ productId ],
                { fields: ['name', 'product_tmpl_id', 'qty_available', 'virtual_available'] }
            ]
        }
    })

    console.log(prd.data.result)
}

main().catch(err => console.error(err))

実行結果は以下の通りです。

実行結果1
> node sample1.js 19

uid: 2
[
  {
    id: 19,
    name: 'Large Desk',
    product_tmpl_id: [ 13, '[E-COM09] Large Desk' ],
    qty_available: 0,
    virtual_available: 0
  }
]
実行結果2
> node sample1.js 21

・・・
[
  {
    id: 21,
    name: 'Cabinet with Doors',
    product_tmpl_id: [ 15, '[E-COM11] Cabinet with Doors' ],
    qty_available: 8,
    virtual_available: 128
  }
]

(b) 在庫数の調整

次に、指定した商品の在庫数を更新してみます。

Odoo の在庫モジュールでは、在庫ロケーション(stock.location)間の在庫移動(stock.move)として在庫数の変動を表現しているようです。

調整用途で在庫数を更新する際にも調整用ロケーション Virtual Locations/YourCompany: Inventory adjustment から在庫ロケーション WH/Stock 等への在庫移動という形で処理する事になります。

ここでは、商品の在庫数を簡単に調整するためのウィザード処理 ※ として用意されている(と思われる) ProductChangeQuantitystock.change.product.qty) を使って在庫数を更新してみます。

 ※ 在庫モジュールの wizard ディレクトリにソースが配置されている

それには、stock.change.product.qtycreate として change_product_qty を呼び出せば良さそうです。

create の際に product_idproduct_tmpl_id で商品を、new_quantity で更新後の在庫数を指定すれば、stock.warehouselot_stock_id から在庫のロケーションを特定して在庫移動が作成されるようです。

sample2.js
const axios = require('axios')

・・・

const productId = parseInt(process.argv[2])
const productTmplId = parseInt(process.argv[3])
const qty = parseInt(process.argv[4])

const main = async () => {
    const auth = ・・・

    const uid = auth.data.result

    // 作成
    const chg = await axios.post(url, {
        jsonrpc: '2.0',
        method: 'call',
        id: 'b2',
        params: {
            service: 'object',
            method: 'execute',
            args: [db, uid, password, 'stock.change.product.qty', 'create', {
                product_id: productId,
                product_tmpl_id: productTmplId,
                new_quantity: qty
            }]
        }
    })

    const sid = chg.data.result

    console.log(`create id: ${sid}`)

    // 在庫数の更新
    const res = await axios.post(url, {
        jsonrpc: '2.0',
        method: 'call',
        id: 'b3',
        params: {
            service: 'object',
            method: 'execute',
            args: [db, uid, password, 'stock.change.product.qty', 'change_product_qty', sid]
        }
    })

    console.log(res.data.result)
}

main().catch(err => console.error(err))

上記を実行して、(a) で在庫数が 0 だった Large Desk(product_id = 19, product_tmpl_id = 13) の在庫数を 100 にしてみます。

実行結果
> node sample2.js 19 13 100

create id: 6
{ type: 'ir.actions.act_window_close' }

商品の在庫数を確認してみると 100 に変化しており、処理は成功しているようです。

在庫数の確認
> node sample1.js 19

・・・
[
  {
    id: 19,
    name: 'Large Desk',
    product_tmpl_id: [ 13, '[E-COM09] Large Desk' ],
    qty_available: 100,
    virtual_available: 100
  }
]

DB の stock_move テーブルを確認してみると下記のレコードが追加されていました。

stock_move テーブルへ追加されたレコード(を JSON 化したもの)
{
    "id" : 44,
    "name" : "Product Quantity Updated",
    ・・・
    "company_id" : 1,
    ・・・
    "product_id" : 19,
    "description_picking" : null,
    "product_qty" : 100.0,
    "product_uom_qty" : 100.000,
    "product_uom" : 1,
    "location_id" : 14,
    "location_dest_id" : 8,
    ・・・
    "state" : "done",
    ・・・
    "procure_method" : "make_to_stock",
    ・・・
    "create_uid" : 2,
    ・・・
}

在庫ロケーション 14(Virtual Locations/YourCompany: Inventory adjustment)から 8(WH/Stock)への在庫移動として処理されています。

今回はここまで、次回は顧客への出荷を処理してみたいと思います。

gRPC Server Reflection のクライアント処理

gRPC Server Reflection を呼び出す処理を Node.js で実装してみました。

ソースは http://github.com/fits/try_samples/tree/master/blog/20191008/

事前準備(サーバー実装)

まずは、gRPC Server Reflection を有効にしたサーバー処理を用意します。 Node.js での実装は無理そうだったので(未実装らしい) Go 言語で実装します。

protoc コマンドをインストールした後、Go 言語用の protoc プラグインである protoc-gen-go をインストールします。

protoc-gen-go インストール
> go get -u github.com/golang/protobuf/protoc-gen-go

google.golang.org/grpc 等のライブラリをビルド時に自動ダウンロードするように、Go のモジュールファイル go.mod を用意しておきます。

go.mod の作成
> go mod init sample

proto ファイルを作成してインターフェースを定義します。 今回は、go_package を使って Go 用のパッケージを別途定義してみました。

proto/item/item.proto
syntax = "proto3";

import "google/protobuf/empty.proto";

option go_package = "sample/proto/item";

package item;

message ItemRequest {
    string item_id = 1;
}

message AddItemRequest {
    string item_id = 1;
    uint64 price = 2;
}

message Item {
    string item_id = 1;
    uint64 price = 2;
}

service ItemService {
    rpc GetItem(ItemRequest) returns (Item);
    rpc AddItem(AddItemRequest) returns (google.protobuf.Empty);
}

protoc コマンドで Go 用のコードを生成します。

protoc によるコード生成
> protoc -I=proto --go_out=plugins=grpc,paths=source_relative:./proto proto/item/item.proto

サーバー処理を実装します。

Server Reflection を有効化するには google.golang.org/grpc/reflection を import して reflection.Register を適用するだけなので簡単です。

server/main.go
package main

import (
    "context"
    "fmt"
    "net"
    "log"
    "google.golang.org/grpc"
    "google.golang.org/grpc/reflection"
    empty "github.com/golang/protobuf/ptypes/empty"
    pb "sample/proto/item"
)

type Server struct {
    Items map[string]pb.Item
}

func (s *Server) GetItem(ctx context.Context, req *pb.ItemRequest) (*pb.Item, error) {
    log.Println("call GetItem: ", req)

    item, ok := s.Items[req.GetItemId()]

    if !ok {
        return nil, fmt.Errorf("item not found: %s", req.GetItemId())
    }

    return &item, nil
}

func (s *Server) AddItem(ctx context.Context, req *pb.AddItemRequest) (*empty.Empty, error) {
    log.Println("call AddItem: ", req)

    s.Items[req.GetItemId()] = pb.Item{ItemId: req.GetItemId(), Price: req.GetPrice()}

    return &empty.Empty{}, nil
}

func main() {
    address := ":50051"

    listen, err := net.Listen("tcp", address)

    if err != nil {
        log.Fatalf("error: %v", err)
    }

    s := grpc.NewServer()

    pb.RegisterItemServiceServer(s, &Server{Items: make(map[string]pb.Item)})
    // gRPC Server Reflection 有効化
    reflection.Register(s)

    log.Println("server start:", address)

    if err := s.Serve(listen); err != nil {
        log.Fatalf("failed serve: %v", err)
    }
}

上記を実行しておきます。(go run でビルドも実施されます)

初回実行時は google.golang.org/grpc 等の依存ライブラリが自動的にダウンロードされます。

実行
> go run server/main.go

・・・
2019/10/06 22:00:00 server start: :50051

サーバー側のファイル構成は以下のようになっています。

ファイル構成
  • go.mod
  • go.sum
  • proto
    • item
      • item.proto
      • item.pb.go
  • server
    • main.go

gRPC Server Reflection クライアント処理

それでは、本題の gRPC Server Reflection を呼び出す処理を実装していきます。

準備

どのように実装すべきか分からなかったので、とりあえず今回は https://github.com/grpc/grpc/blob/master/src/proto/grpc/reflection/v1alpha/reflection.proto を使う事にしました。

まずは、この proto ファイルをローカルにダウンロードしておきます。

proto ファイルのダウンロード
> curl -O https://raw.githubusercontent.com/grpc/grpc/master/src/proto/grpc/reflection/v1alpha/reflection.proto

Node.js で gRPC 処理を実装するため grpcproto-loader をインストールしておきます。

grpc と proto-loader のインストール
> npm install grpc @grpc/proto-loader

(a) サービスのリスト取得

はじめに、サービス名をリストアップする処理を実装してみます。

reflection.proto を見てみると、以下のように ServerReflectionInfo メソッドの引数である ServerReflectionRequestmessage_request にどのフィールドを指定するかで取得内容が変わるようになっています。

サービスのリストを取得するには list_services フィールドを使えば良さそうです。

reflection.proto の該当箇所
・・・
package grpc.reflection.v1alpha;

service ServerReflection {
  rpc ServerReflectionInfo(stream ServerReflectionRequest)
      returns (stream ServerReflectionResponse);
}

message ServerReflectionRequest {
  string host = 1;

  oneof message_request {
    string file_by_filename = 3;
    string file_containing_symbol = 4;
    ExtensionRequest file_containing_extension = 5;
    string all_extension_numbers_of_type = 6;
    string list_services = 7;
  }
}
・・・

ServerReflectionInfo メソッドは引数と戻り値の両方に stream が指定されている双方向ストリーミング RPC となっているため、以下のように write でリクエストメッセージを送信して on('data', ・・・) でレスポンスメッセージを取得する事になります。

また、end でストリーミング処理を終了します。

list_services.js
const grpc = require('grpc')
const protoLoader = require('@grpc/proto-loader')

const pd = protoLoader.loadSync('reflection.proto', {
    keepCase: true,
    defaults: true
})

const proto = grpc.loadPackageDefinition(pd).grpc.reflection.v1alpha

const client = new proto.ServerReflection(
    '127.0.0.1:50051', 
    grpc.credentials.createInsecure()
)

const call = client.ServerReflectionInfo()

call.on('error', err => {
    // ストリーミングの終了
    call.end()
    console.error(err)
})
// レスポンスメッセージの受信
call.on('data', res => {
    // ストリーミングの終了
    call.end()
    
    // サービス名の出力
    res.list_services_response.service.forEach(s => {
        console.log(s.name)
    })
})
// リクエストメッセージの送信
call.write({host: 'localhost', list_services: ''})

実行結果は以下の通り、サービス名を出力できました。

実行結果
> node list_services.js

grpc.reflection.v1alpha.ServerReflection
item.ItemService

(b) サービスのインターフェース定義取得

次に、サービスの内容(インターフェース定義)を取得してみます。

こちらは、リクエストメッセージの file_containing_symbol フィールドにサービス名を指定する事で取得できます。

ただ、レスポンスメッセージの該当フィールドの内容が reflection.proto のコメントにあるように FileDescriptorProtoシリアライズした結果 ※ (の配列)となっている点に注意が必要です。

 ※ bytes 型は Node.js では Buffer として扱われる
reflection.proto の該当箇所
・・・
message FileDescriptorResponse {
  // Serialized FileDescriptorProto messages. We avoid taking a dependency on
  // descriptor.proto, which uses proto2 only features, by making them opaque
  // bytes instead.
  repeated bytes file_descriptor_proto = 1;
}
・・・

FileDescriptorProto へのデシリアライズに関しては試行錯誤しましたが、最も手軽そうな protobufjs/ext/descriptorFileDescriptorProto.decode を使う事にしました。

load_symbol.js
const grpc = require('grpc')
const protoLoader = require('@grpc/proto-loader')
const descriptor = require('protobufjs/ext/descriptor')

const serviceName = process.argv[2]

・・・

const call = client.ServerReflectionInfo()

call.on('error', err => {
    call.end()
    console.error(err)
})

call.on('data', res => {
    call.end()
    
    res.file_descriptor_response.file_descriptor_proto
        // FileDescriptorProto デシリアライズ
        .map(buf => descriptor.FileDescriptorProto.decode(buf))
        .forEach(d => {
            // JSON 化して出力
            console.log(JSON.stringify(d, null, 2))
        })
})

call.write({host: 'localhost', file_containing_symbol: serviceName})

item.ItemService のサービス内容を取得してみた結果です。 go_package の内容も含め、問題なく取得できているようです。

実行結果
> node load_symbol.js item.ItemService

{
  "name": "item/item.proto",
  "package": "item",
  "dependency": [
    "google/protobuf/empty.proto"
  ],
  "messageType": [
    {
      "name": "ItemRequest",
      "field": [
        {
          "name": "item_id",
          "number": 1,
          "label": "LABEL_OPTIONAL",
          "type": "TYPE_STRING",
          "jsonName": "itemId"
        }
      ]
    },
    {
      "name": "AddItemRequest",
      "field": [
        {
          "name": "item_id",
          "number": 1,
          "label": "LABEL_OPTIONAL",
          "type": "TYPE_STRING",
          "jsonName": "itemId"
        },
        {
          "name": "price",
          "number": 2,
          "label": "LABEL_OPTIONAL",
          "type": "TYPE_UINT64",
          "jsonName": "price"
        }
      ]
    },
    {
      "name": "Item",
      "field": [
        {
          "name": "item_id",
          "number": 1,
          "label": "LABEL_OPTIONAL",
          "type": "TYPE_STRING",
          "jsonName": "itemId"
        },
        {
          "name": "price",
          "number": 2,
          "label": "LABEL_OPTIONAL",
          "type": "TYPE_UINT64",
          "jsonName": "price"
        }
      ]
    }
  ],
  "service": [
    {
      "name": "ItemService",
      "method": [
        {
          "name": "GetItem",
          "inputType": ".item.ItemRequest",
          "outputType": ".item.Item"
        },
        {
          "name": "AddItem",
          "inputType": ".item.AddItemRequest",
          "outputType": ".google.protobuf.Empty"
        }
      ]
    }
  ],
  "options": {
    "goPackage": "sample/proto/item"
  },
  "syntax": "proto3"
}

(c) 全サービスのインターフェース定義取得

最後に、全サービスのインターフェース定義を取得する処理を実装してみます。

双方向ストリーミング RPC を活用して、サービスのリスト取得とサービス内容の取得を同じストリーミング上で行ってみました。

load_symbols.js
const grpc = require('grpc')
const protoLoader = require('@grpc/proto-loader')
const descriptor = require('protobufjs/ext/descriptor')

・・・

const call = client.ServerReflectionInfo()

call.on('error', err => {
    call.end()
    console.error(err)
})

let count = 0

call.on('data', res => {
    // サービスのリスト取得時の処理
    if (res.list_services_response) {
        const names = res.list_services_response.service.map(s => s.name)

        count = names.length

        names.forEach(name => 
            call.write({host: 'localhost', file_containing_symbol: name})
        )
    }
    // サービスのインターフェース定義取得時の処理
    else if (res.file_descriptor_response) {
        if (--count == 0) {
            // インターフェース定義を全て取得したら終了
            call.end()
        }

        res.file_descriptor_response.file_descriptor_proto
            .map(buf => descriptor.FileDescriptorProto.decode(buf))
            .forEach(d => {
                console.log(JSON.stringify(d, null, 2))
            })
    }
    else {
        console.log(res)
        call.end()
    }
})

call.write({host: 'localhost', list_services: ''})

実行結果は以下の通りです。

実行結果
> node load_symbols.js

{
  "name": "grpc_reflection_v1alpha/reflection.proto",
  "package": "grpc.reflection.v1alpha",
  ・・・
  "service": [
    {
      "name": "ServerReflection",
      "method": [
        {
          "name": "ServerReflectionInfo",
          "inputType": ".grpc.reflection.v1alpha.ServerReflectionRequest",
          "outputType": ".grpc.reflection.v1alpha.ServerReflectionResponse",
          "clientStreaming": true,
          "serverStreaming": true
        }
      ]
    }
  ],
  "syntax": "proto3"
}
{
  "name": "item/item.proto",
  "package": "item",
  "dependency": [
    "google/protobuf/empty.proto"
  ],
  ・・・
  "service": [
    {
      "name": "ItemService",
      "method": [
        {
          "name": "GetItem",
          "inputType": ".item.ItemRequest",
          "outputType": ".item.Item"
        },
        {
          "name": "AddItem",
          "inputType": ".item.AddItemRequest",
          "outputType": ".google.protobuf.Empty"
        }
      ]
    }
  ],
  "options": {
    "goPackage": "sample/proto/item"
  },
  "syntax": "proto3"
}

備考

別の実装例として、処理毎に個別のストリーミングで処理するようにして Promise 化してみました。

load_symbols2.js
const grpc = require('grpc')
const protoLoader = require('@grpc/proto-loader')
const descriptor = require('protobufjs/ext/descriptor')

・・・

const merge = a => b => Object.fromEntries([a, b].map(Object.entries).flat())

const serverReflectionInfo = (f, g) => new Promise((resolve, revoke) => {
    const call = client.ServerReflectionInfo()

    call.on('error', err => {
        call.end()
        revoke(err)
    })

    call.on('data', res => {
        call.end()
        resolve( g(res) )
    })

    call.write( f({host: 'localhost'}) )
})
// サービスのリスト取得
const listServices = () => serverReflectionInfo(
    merge({list_services: ''}),
    res => res.list_services_response.service.map(s => s.name)
)
// サービスのインターフェース定義取得
const loadSymbol = name => serverReflectionInfo(
    merge({file_containing_symbol: name}),
    res => res.file_descriptor_response.file_descriptor_proto
                .map(buf => descriptor.FileDescriptorProto.decode(buf))
)

listServices()
    .then(names => 
        Promise.all(names.map(loadSymbol))
    )
    .then(ds => ds.flat())
    .then(ds => ds.forEach(d => console.log(JSON.stringify(d, null, 2))))
    .catch(err => console.error(err))
実行結果
> node load_symbols2.js

{
  "name": "grpc_reflection_v1alpha/reflection.proto",
  "package": "grpc.reflection.v1alpha",
  ・・・
  "service": [
    {
      "name": "ServerReflection",
      "method": [
        {
          "name": "ServerReflectionInfo",
          "inputType": ".grpc.reflection.v1alpha.ServerReflectionRequest",
          "outputType": ".grpc.reflection.v1alpha.ServerReflectionResponse",
          "clientStreaming": true,
          "serverStreaming": true
        }
      ]
    }
  ],
  "syntax": "proto3"
}
{
  "name": "item/item.proto",
  "package": "item",
  "dependency": [
    "google/protobuf/empty.proto"
  ],
  ・・・
  "service": [
    {
      "name": "ItemService",
      "method": [
        {
          "name": "GetItem",
          "inputType": ".item.ItemRequest",
          "outputType": ".item.Item"
        },
        {
          "name": "AddItem",
          "inputType": ".item.AddItemRequest",
          "outputType": ".google.protobuf.Empty"
        }
      ]
    }
  ],
  "options": {
    "goPackage": "sample/proto/item"
  },
  "syntax": "proto3"
}

Pulumi を使って Kubernetes へ CRD を登録

PulumiJavaScriptPython・Go のようなプログラミング言語で Infrastructure as Code するためのツールです。

今回は、この Pulumi を使って Kubernetes(k3s を使用)へカスタムリソースを登録してみます。

ソースは http://github.com/fits/try_samples/tree/master/blog/20190825/

はじめに

今回は、k3s(Lightweight Kubernetes) がインストール済みの Ubuntu 環境を使います。※

 ※ ただし、k3s の前に microk8s をインストールしたりしているので
    クリーンな環境とは言えないかもしれない
    (Istio と Knative のために Helm 等をインストールしていたりもする)

Pulumi を以下のようにインストールします。

Pulumi インストール例
$ curl -fsSL https://get.pulumi.com | sh

$HOME/.pulumi ディレクトリへ各種ファイルが展開され、.bashrc ファイルへ PATH の設定が追加されました。

なお、このままだと pulumi コマンド実行時に kubeconfig を参照できないようだったので、とりあえず /etc/rancher/k3s/k3s.yaml$HOME/.kube/config ファイルとしてコピーし chown しています。

動作確認
$ pulumi version

v0.17.28

Kubernetes へ CRD を登録(Node.js 使用)

準備

プロジェクトを作成する前に、まずは Pulumi でログインを実施しておきます。

今回はローカル環境の Kubernetes を使うので --local を指定して login を実施しました。

ログイン
$ pulumi login --local

プロジェクト作成

適当なディレクトリを用意し、テンプレートを指定してプロジェクトのひな型を作成します。

今回は Kubernetes を対象とした Node.js 用のプロジェクトを作成するため kubernetes-javascript を指定しました。

プロジェクト作成
$ mkdir sample
$ cd sample
$ pulumi new kubernetes-javascript

・・・
project name: (sample)
project description: (A minimal Kubernetes JavaScript Pulumi program)
Created project 'sample'

stack name: (dev)
Enter your passphrase to protect config/secrets:
Re-enter your passphrase to confirm:
Created stack 'dev'

Enter your passphrase to unlock config/secrets
    (set PULUMI_CONFIG_PASSPHRASE to remember):
Installing dependencies...
・・・

実装

上記で生成された index.js ファイルに Kubernetes へ登録する内容を実装していきます。

今回は下記のようなカスタムリソースの登録を実装します。

例. カスタムリソース登録内容
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
  name: items.example.com
spec:
  group: example.com
  version: v1alpha1
  scope: Namespaced
  names:
    kind: Item
    plural: items
    singular: item
  preserveUnknownFields: false
  validation:
    openAPIV3Schema:
      type: object
      properties:
        spec:
          type: object
          properties:
            value:
              type: integer
            note:
              type: string

---

apiVersion: "example.com/v1alpha1"
kind: Item
metadata:
  name: item1
spec:
  value: 100
  note: sample item 1

---

apiVersion: "example.com/v1alpha1"
kind: Item
metadata:
  name: item2
spec:
  value: 20
  note: sample item 2

上記のカスタムリソースを実装したコードが以下です。

Pulumi でカスタムリソースを定義する場合、@pulumi/kubernetesCustomResourceDefinitionCustomResource を使えば良さそうです。

YAML と同等の内容を JavaScript の Object で表現して、CustomResourceDefinition 等のコンストラクタの第 2引数として渡すだけです。

index.js (カスタムリソース登録内容の実装)
'use strict'
const k8s = require('@pulumi/kubernetes')

const capitalize = s => `${s[0].toUpperCase()}${s.slice(1)}`

const crdName = 'item'
const crdGroup = 'example.com'
const crdVersion = 'v1alpha1'

const props = {
    value: 'integer',
    note: 'string'
}

const items = [
    { name: 'item1', value: 100, note: 'sample item 1' },
    { name: 'item2', value:  20, note: 'sample item 2' }
]

const crdKind = capitalize(crdName)
const crdPlural = `${crdName}s`

new k8s.apiextensions.v1beta1.CustomResourceDefinition(crdName, {
    metadata: { name: `${crdPlural}.${crdGroup}` },
    spec: {
        group: crdGroup,
        version: crdVersion,
        scope: 'Namespaced',
        names: {
            kind: crdKind,
            plural: crdPlural,
            singular: crdName
        },
        preserveUnknownFields: false,
        validation: {
            openAPIV3Schema: {
                type: 'object',
                properties: {
                    spec: {
                        type: 'object',
                        properties: Object.fromEntries(
                            Object.entries(props).map(([k, v]) => 
                                [k, { type: v }]
                            )
                        )
                    }
                }
            }
        }
    }
})

items.forEach(it => 
    new k8s.apiextensions.CustomResource(it.name, {
        apiVersion: `${crdGroup}/${crdVersion}`,
        kind: crdKind,
        metadata: {
            name: it.name
        },
        spec: Object.fromEntries(
            Object.keys(props).map(k => [k, it[k]])
        )
    })
)

デプロイ

pulumi up でデプロイします。

デプロイ
$ pulumi up

・・・
Previewing update (dev):

     Type                                                         Name        P
 +   pulumi:pulumi:Stack                                          sample-dev  c
 +   tq kubernetes:example.com:Item                               item2       c
 +   tq kubernetes:example.com:Item                               item1       c
 +   mq kubernetes:apiextensions.k8s.io:CustomResourceDefinition  item        c

Resources:
    + 4 to create

Do you want to perform this update?
  yes
> no
  details

Do you want to perform this update?yes を選択すると実際にデプロイが実施されます。

Do you want to perform this update? yes
Updating (dev):

     Type                                                         Name        S
 +   pulumi:pulumi:Stack                                          sample-dev  c
 +   tq kubernetes:apiextensions.k8s.io:CustomResourceDefinition  item        c
 +   tq kubernetes:example.com:Item                               item1       c
 +   mq kubernetes:example.com:Item                               item2       c

Resources:
    + 4 created
・・・

ちなみに、details を選ぶと登録内容(YAML)を確認できます。

正常に登録されたか、kubectl コマンドで確認してみます。(k3s の一般的な環境では k3s kubectl とする必要があるかもしれません)

CRD の確認
$ kubectl get crd | grep items

items.example.com                                    2019-08-14T08:57:22Z
カスタムリソースの確認
$ kubectl get item

NAME    AGE
item1   13m
item2   13m
カスタムリソース詳細1
$ kubectl describe item item1

Name:         item1
Namespace:    default
Labels:       app.kubernetes.io/managed-by=pulumi
Annotations:  kubectl.kubernetes.io/last-applied-configuration:
                {"apiVersion":"example.com/v1alpha1","kind":"Item","metadata":{"labels":{"app.kubernetes.io/managed-by":"pulumi"},"name":"item1"},"spec":{...
API Version:  example.com/v1alpha1
Kind:         Item
Metadata:
  Creation Timestamp:  2019-08-14T08:57:22Z
  Generation:          1
  Resource Version:    19763
  Self Link:           /apis/example.com/v1alpha1/namespaces/default/items/item1
  UID:                 87503274-be71-11e9-aeea-025c19d6acb9
Spec:
  Note:   sample item 1
  Value:  100
Events:   <none>
カスタムリソース詳細2
$ kubectl describe item item2

Name:         item2
Namespace:    default
Labels:       app.kubernetes.io/managed-by=pulumi
Annotations:  kubectl.kubernetes.io/last-applied-configuration:
                {"apiVersion":"example.com/v1alpha1","kind":"Item","metadata":{"labels":{"app.kubernetes.io/managed-by":"pulumi"},"name":"item2"},"spec":{...
API Version:  example.com/v1alpha1
Kind:         Item
Metadata:
  Creation Timestamp:  2019-08-14T08:57:22Z
  Generation:          1
  Resource Version:    19764
  Self Link:           /apis/example.com/v1alpha1/namespaces/default/items/item2
  UID:                 875af773-be71-11e9-aeea-025c19d6acb9
Spec:
  Note:   sample item 2
  Value:  20
Events:   <none>

問題なく登録できているようです。

アンデプロイ

デプロイ内容を削除(アンデプロイ)する場合は destroy を実行します。

アンデプロイ
$ pulumi destroy

・・・
Do you want to perform this destroy? yes
Destroying (dev):

     Type                                                         Name        S
 -   pulumi:pulumi:Stack                                          sample-dev  d
 -   tq kubernetes:apiextensions.k8s.io:CustomResourceDefinition  item        d
 -   tq kubernetes:example.com:Item                               item2       d
 -   mq kubernetes:example.com:Item                               item1       d

Resources:
    - 4 deleted
・・・

Keras.js によるランドマーク検出の Web アプリケーション化2

前回 はランドマーク検出対象の画像サイズを固定(256x256)しましたが、今回は任意の画像サイズに対応できるように改造してみます。

ソースは http://github.com/fits/try_samples/tree/master/blog/20190506/

可変サイズ対応

ドラッグアンドドロップした画像のサイズに合わせてランドマーク検出を実施するようにしてみます。(ファイル構成などは 前回 と同じ)

ただ、Keras.js を通常とは異なる使い方をするため、何らかの不都合が生じるかもしれませんし、別バージョンでは動作しないかもしれません。

(a) UI 処理(src/app.js)

canvas のサイズを画像サイズに合わせて変更し、ランドマーク検出処理へ画像サイズ(幅、高さ)の情報を渡すようにします。

src/app.js
・・・
const loadImage = url => new Promise(resolve => {
    const img = new Image()

    img.addEventListener('load', () => {
        // canvas のサイズを画像サイズに合わせて変更
        canvas.width = img.width
        canvas.height = img.height

        ctx.clearRect(0, 0, canvas.width, canvas.height)

        ctx.drawImage(img, 0, 0)

        const d = ctx.getImageData(0, 0, canvas.width, canvas.height)
        resolve({width: img.width, height: img.height, data: imgToArray(d)})
    })

    img.src = url
})

・・・

const ready = () => {
    ・・・

    canvas.addEventListener('drop', ev => {
        ev.preventDefault()
        canvas.classList.remove('dragging')

        const file = ev.dataTransfer.files[0]

        if (imageTypes.includes(file.type)) {
            clearLandmarksInfo()

            const reader = new FileReader()

            reader.onload = ev => {
                loadImage(reader.result)
                    .then(d => {
                        detectDialog.showModal()
                        // 画像のサイズ情報を追加
                        worker.postMessage({type: 'predict', input: d.data, width: d.width, height: d.height})
                    })
            }

            reader.readAsDataURL(file)
        }
    }, false)
}

・・・

(b) ランドマーク検出処理(src/worker.js)

通常は(Keras.js の)モデル内でレイヤー毎の入出力の形状が固定化されているので、このままでは任意の画像サイズには対応できません。

そこで、検出処理の度に入出力の形状を強制的にリセットする処理(以下)を加える事で可変サイズに対応します。

  • (1) 入力データの形状(画像サイズ)を変更
  • (2) 各レイヤーの出力形状をクリア
  • (3) inputTensorsMap のリセット
src/worker.js
・・・

onmessage = ev => {
    switch (ev.data.type) {
        ・・・
        case 'predict':
            const inputLayerName = model.inputLayerNames[0]
            const outputLayerName = model.outputLayerNames[0]

            const w = ev.data.width
            const h = ev.data.height

            // (1) 入力データの形状(画像サイズ)を変更
            const inputLayer = model.modelLayersMap.get(inputLayerName)
            inputLayer.shape[0] = h
            inputLayer.shape[1] = w

            // (2) 各レイヤーの出力形状をクリア
            model.modelLayersMap.forEach(n => {
                if (n.outputShape) {
                    n.outputShape = null
                    n.imColsMat = null
                }
            })

            // (3) inputTensorsMap のリセット
            model.resetInputTensors()

            const data = {}
            data[inputLayerName] = ev.data.input

            Promise.resolve(model.predict(data))
                .then(r => {
                    const shape = model.modelLayersMap.get(outputLayerName)
                                                .output.tensor.shape

                    return new KerasJS.Tensor(r[outputLayerName], shape)
                })
                .then(detectLandmarks)
                .then(r => 
                    postMessage({type: ev.data.type, output: r})
                )
                .catch(err => {
                    console.log(err)
                    postMessage({type: ev.data.type, error: err.message})
                })

            break
    }
}

動作確認

(1) 画像サイズ 128x128

f:id:fits:20190506114638p:plain

(2) 画像サイズ 307x307

f:id:fits:20190506114655p:plain

(3) 画像サイズ 100x128

f:id:fits:20190506114713p:plain

(4) 画像サイズ 200x256

f:id:fits:20190506114736p:plain