Node.js のマイクロサービスフレームワーク Moleculer
Moleculer という名のなかなか期待できそうな Node.js 用マイクロサービスフレームワークを見つけたのでご紹介します。
ロードバランサー、サーキットブレーカー、メトリクス収集、トレーシング、キャッシュ機能等、多彩な機能を備えていますが。
個人的に、イベント駆動アーキテクチャを採用し Kafka や NATS 等のメッセージブローカーと容易に連携できる事から、コレオグラフィ型のマイクロサービスを手軽に作れそうな点に注目しています。
なんとなく Akka と似ている気もしますが、Akka を使ったマイクロサービスフレームワークの Lagom などと比べても、簡単かつ手軽に使えるのが利点かと思います。
今回のソースは http://github.com/fits/try_samples/tree/master/blog/20200224/
(1) ローカルアプリケーション
とりあえず簡単なローカルアプリケーションを作成してみます。
以下のモジュールを使うのでインストールしておきます。
- moleculer 0.14.2
moleculer インストール例
> npm install --save moleculer
Moleculer では、メソッド・アクション・イベントハンドリング等を定義した Service
を ServiceBroker
へ登録する事で処理を組み立てます。
ServiceBroker はノード毎にインスタンスを作る事になり、アクションの呼び出しやイベントの通知、別ノードとの連携等を担います。
アクションは call('<サービス名>.<アクション名>', <パラメータ>)
で呼び出す事ができ、アクションで return した値が call の戻り値となります。
イベントの発火(通知)は emit('<イベント名>', <イベント内容>)
や broadcast('<イベント名>', <イベント内容>)
※ で行います。
※ emit と broadcast の違いは、 broadcast は全ノードの全サービスへ通知されますが、 emit は同じサービスを複数のノードで実行していると その中の 1つのノードへ通知されます
order1.js
const { ServiceBroker } = require('moleculer') const broker = new ServiceBroker() broker.createService({ name: 'order-service', actions: { order(ctx) { console.log(`# order-service.order: ${JSON.stringify(ctx.params)}`) // order.ordered イベントの発火 ctx.emit('order.ordered', ctx.params) return ctx.params.id } } }) broker.createService({ name: 'order-check-service', events: { // order.ordered イベントのハンドリング 'order.ordered'(ctx) { console.log(`## order-check-service ${ctx.eventName}: ${JSON.stringify(ctx.params)}`) } } }) const run = async () => { await broker.start() const order1 = {id: 'order1', item: 'item1', qty: 2} // order アクションの呼び出し const res1 = await broker.call('order-service.order', order1) console.log(`order result: ${res1}`) } run().catch(err => console.error(err))
実行結果
> node order1.js [2020-02-18T14:08:35.336Z] INFO ・・・: Moleculer v0.14.2 is starting... ・・・ # order-service.order: {"id":"order1","item":"item1","qty":2} ## order-check-service order.ordered: {"id":"order1","item":"item1","qty":2} order result: order1 ・・・ [2020-02-18T14:08:35.440Z] INFO ・・・: ServiceBroker is stopped. Good bye.
上記の処理へサービスを 1つ追加してみます。
イベントのハンドリングではワイルドカード *
も使えるようになっていますので使ってみます。
order2.js
const { ServiceBroker } = require('moleculer') const broker = new ServiceBroker() broker.createService({ name: 'order-service', actions: { order(ctx) { console.log(`# order-service.order: ${JSON.stringify(ctx.params)}`) ctx.emit('order.ordered', ctx.params) return ctx.params.id } }, events: { // order.validated と order.invalidated イベントをハンドリング 'order.*validated'(ctx) { console.log(`# order-service ${ctx.eventName}: ${JSON.stringify(ctx.params)}`) } } }) broker.createService({ name: 'order-check-service', events: { async 'order.ordered'(ctx) { console.log(`## order-check-service ${ctx.eventName}: ${JSON.stringify(ctx.params)}`) // item-service の check アクション呼び出し const isValid = await ctx.call('item-service.check', ctx.params) if (isValid) { ctx.emit('order.validated', ctx.params) } else { ctx.emit('order.invalidated', ctx.params) } } } }) broker.createService({ name: 'item-service', actions: { check(ctx) { console.log(`### item-service.check: ${JSON.stringify(ctx.params)}`) return ['item1', 'item2'].includes(ctx.params.item) } } }) const run = async () => { await broker.start() const order1 = {id: 'order1', item: 'item1', qty: 2} const order2 = {id: 'order2', item: 'item3', qty: 1} const res1 = await broker.call('order-service.order', order1) console.log(`order result: ${res1}`) const res2 = await broker.call('order-service.order', order2) console.log(`order result: ${res2}`) } run().catch(err => console.error(err))
実行結果
> node order2.js ・・・ # order-service.order: {"id":"order1","item":"item1","qty":2} ## order-check-service order.ordered: {"id":"order1","item":"item1","qty":2} ### item-service.check: {"id":"order1","item":"item1","qty":2} order result: order1 # order-service.order: {"id":"order2","item":"item3","qty":1} ## order-check-service order.ordered: {"id":"order2","item":"item3","qty":1} ### item-service.check: {"id":"order2","item":"item3","qty":1} # order-service order.validated: {"id":"order1","item":"item1","qty":2} order result: order2 # order-service order.invalidated: {"id":"order2","item":"item3","qty":1} ・・・
(2) メッセージブローカー(NATS Streaming)利用
次に、別ノードのサービスとメッセージブローカー経由で連携してみます。
処理内容としては Web API を通して受け取ったメッセージを別ノードのサービスに call・emit・broadcast します。
ここでは、メッセージブローカーとして Go言語で実装されており手軽に実行できる NATS Streaming を使用します。
まずは、以下のモジュールを使うのでインストールしておきます。
- moleculer 0.14.2
- moleculer-web 0.9.0
- node-nats-streaming 0.2.6
モジュールインストール例
> npm install --save moleculer moleculer-web node-nats-streaming
Web API としてリクエストを受け付ける処理を実装します。
moleculer-web
を require した結果を mixins
して routes
で HTTP リクエストとそれに応じて実行するアクションをマッピングします。
NATS Streaming を使用するため transporter
に STAN
を設定します。
nodeID
はデフォルトでホスト名とプロセスIDから生成されるようになっていますが、他のノードとの接続に使うので、ここでは明示的に設定しています。
pub.js
const { ServiceBroker } = require('moleculer') const HTTPServer = require('moleculer-web') const broker = new ServiceBroker({ nodeID: 'pub', transporter: 'STAN' }) broker.createService({ name: 'web', mixins: [HTTPServer], settings: { routes: [ { aliases: { 'PUT /pub': 'pub.publish' }} ] } }) broker.createService({ name: 'pub', actions: { async publish(ctx) { const params = ctx.params // アクション呼び出し const res = await ctx.call('sub.direct', params) // イベントの emit ctx.emit('event.emit', params) // イベントの broadcast ctx.broadcast('event.broadcast', params) return res } } }) broker.start().catch(err => console.error(err))
次に、NATS Streaming 経由でアクションの呼び出しやイベント通知を受け取る処理を実装します。
こちらは複数プロセスで実行できるように nodeID をコマンド引数として渡すようにしています。
sub.js
const { ServiceBroker } = require('moleculer') const id = process.argv[2] const broker = new ServiceBroker({ nodeID: id, transporter: 'STAN' }) broker.createService({ name: 'sub', actions: { direct(ctx) { console.log(`* sub.direct: nodeID=${broker.nodeID}, params=${JSON.stringify(ctx.params)}`) return broker.nodeID } }, events: { 'event.*'(ctx) { console.log(`* ${ctx.eventName}: nodeID=${broker.nodeID}, params=${JSON.stringify(ctx.params)}`) } } }) broker.start().catch(err => console.error(err))
動作確認
まずは NATS Streaming を実行しておきます。
NATS Streaming 実行
> nats-streaming-server ・・・ [2668] 2020/02/23 19:27:33.566759 [[32mINF[0m] Server is ready [2668] 2020/02/23 19:27:33.591774 [[32mINF[0m] STREAM: Recovering the state... [2668] 2020/02/23 19:27:33.592779 [[32mINF[0m] STREAM: No recovered state [2668] 2020/02/23 19:27:33.843799 [[32mINF[0m] STREAM: Message store is MEMORY [2668] 2020/02/23 19:27:33.844764 [[32mINF[0m] STREAM: ---------- Store Limits ---------- [2668] 2020/02/23 19:27:33.846740 [[32mINF[0m] STREAM: Channels: 100 * [2668] 2020/02/23 19:27:33.848741 [[32mINF[0m] STREAM: --------- Channels Limits -------- [2668] 2020/02/23 19:27:33.849804 [[32mINF[0m] STREAM: Subscriptions: 1000 * [2668] 2020/02/23 19:27:33.850745 [[32mINF[0m] STREAM: Messages : 1000000 * [2668] 2020/02/23 19:27:33.852756 [[32mINF[0m] STREAM: Bytes : 976.56 MB * [2668] 2020/02/23 19:27:33.853740 [[32mINF[0m] STREAM: Age : unlimited * [2668] 2020/02/23 19:27:33.854745 [[32mINF[0m] STREAM: Inactivity : unlimited * [2668] 2020/02/23 19:27:33.857744 [[32mINF[0m] STREAM: ---------------------------------- [2668] 2020/02/23 19:27:33.858745 [[32mINF[0m] STREAM: Streaming Server is ready
pub.js を起動し、2つ sub.js を実行しておきます。
これによって、NATS Streaming へ MOL.DISCOVER.<nodeID>
、MOL.EVENT.<nodeID>
、MOL.REQ.<nodeID>
等のような様々なチャネルが登録され、ノード間の接続が確立します。
pub.js 実行
> node pub.js ・・・ [2020-02-23T10:29:58.028Z] INFO pub/REGISTRY: Strategy: RoundRobinStrategy ・・・ [2020-02-23T10:29:58.102Z] INFO pub/WEB: Register route to '/' [2020-02-23T10:29:58.158Z] INFO pub/WEB: PUT /pub => pub.publish ・・・ [2020-02-23T10:30:29.936Z] INFO pub/REGISTRY: Node 'sub1' connected. [2020-02-23T10:30:36.918Z] INFO pub/REGISTRY: Node 'sub2' connected.
sub.js 実行1(sub1)
> node sub.js sub1 ・・・ [2020-02-23T10:30:34.026Z] INFO sub1/REGISTRY: Node 'pub' connected. [2020-02-23T10:30:36.919Z] INFO sub1/REGISTRY: Node 'sub2' connected.
sub.js 実行2(sub2)
> node sub.js sub2 ・・・ [2020-02-23T10:30:39.027Z] INFO sub2/REGISTRY: Node 'pub' connected. [2020-02-23T10:30:40.111Z] INFO sub2/REGISTRY: Node 'sub1' connected.
この状態で 2回 HTTP リクエストを実施してみます。
HTTP リクエスト 1回目
$ curl -s -XPUT -H "Content-Type: application/json" http://localhost:3000/pub -d "{\"no\": 1}" "sub1"
HTTP リクエスト 2回目
$ curl -s -XPUT -H "Content-Type: application/json" http://localhost:3000/pub -d "{\"no\": 2}" "sub2"
1回目の call と emit は sub1 側で、2回目は sub2 側で実行しておりロードバランスされています(デフォルトではラウンドロビンとなる)
broadcast は sub1 と sub2 のどちらにも通知されています。
sub.js sub1 の結果
・・・ * sub.direct: nodeID=sub1, params={"no":1} * event.emit: nodeID=sub1, params={"no":1} * event.broadcast: nodeID=sub1, params={"no":1} * event.broadcast: nodeID=sub1, params={"no":2}
sub.js sub2 の結果
・・・ * event.broadcast: nodeID=sub2, params={"no":1} * sub.direct: nodeID=sub2, params={"no":2} * event.emit: nodeID=sub2, params={"no":2} * event.broadcast: nodeID=sub2, params={"no":2}
(3) MongoDB への CRUD を REST API 化
最後に、MongoDB へデータを CRUD する処理を REST API 化してみます。
以下のモジュールを使うのでインストールしておきます。
- moleculer 0.14.2
- moleculer-web 0.9.0
- node-nats-streaming 0.2.6
- moleculer-db 0.8.5
- moleculer-db-adapter-mongo 0.4.7
モジュールインストール例
> npm install --save moleculer moleculer-web node-nats-streaming moleculer-db moleculer-db-adapter-mongo
これまでは createService
でサービスを定義していましたが、ここでは別ファイルでサービスを定義する方法を試してみます。
ServiceBroker の loadServices
を使うと、デフォルトで services/xxx.service.js
(xxx は任意) ファイルをロードして登録してくれます。
server.js
const { ServiceBroker } = require('moleculer') const id = process.argv[2] const broker = new ServiceBroker({ nodeID: id, transporter: 'STAN' }) broker.loadServices() broker.start().catch(err => console.error(err))
REST API のサービスを実装します。
aliases で REST
とするとアクション側で "POST /" のような HTTP メソッドとパスを(rest
項目で)設定する事になります。
services/api.service.js
const HTTPServer = require('moleculer-web') module.exports = { name: 'api', mixins: [HTTPServer], settings: { routes: [ { aliases: { 'REST items': 'item' } } ] } }
moleculer-db
がデフォルトで REST API に対応した CRUD 処理をアクションとして定義してくれているので、今回はこれをそのまま使う事にします。
ここでは entityCreated
等を使って更新時のイベント通知のみを実装しています。
なお、MongoDB の接続文字列やコレクション名は環境変数で設定するようにしています。
services/item.service.js
const DbService = require('moleculer-db') const MongoDBAdapter = require('moleculer-db-adapter-mongo') const mongoUri = process.env['MONGO_URI'] const colName = process.env['MONGO_COLLECTION_NAME'] module.exports = { name: 'item', mixins: [DbService], adapter: new MongoDBAdapter(mongoUri, { useUnifiedTopology: true }), collection: colName, entityCreated(entity, ctx) { console.log(`entityCreated: ${JSON.stringify(entity)}`) ctx.emit('item.created', entity) }, entityUpdated(entity, ctx) { console.log(`entityUpdated: ${JSON.stringify(entity)}`) ctx.emit('item.updated', entity) }, entityRemoved(entity, ctx) { console.log(`entityRemoved: ${JSON.stringify(entity)}`) ctx.emit('item.removed', entity) } }
動作確認用にイベントをハンドリングする処理も用意しました。
handler.js
const { ServiceBroker } = require('moleculer') const id = process.argv[2] const broker = new ServiceBroker({ nodeID: id, transporter: 'STAN' }) broker.createService({ name: 'handler', events: { 'item.*'(ctx) { console.log(`* ${ctx.eventName}: nodeID=${broker.nodeID}, params=${JSON.stringify(ctx.params)}`) } } }) broker.start().catch(err => console.error(err))
動作確認
MongoDB と NATS Streaming を起動しておきます。
MongoDB 実行
> mongod --dbpath itemdb
NATS Streaming 実行
> nats-streaming-server
接続先の MongoDB やコレクション名を環境変数で設定してから、server.js を実行します。
server.js 実行
> set MONGO_URI=mongodb://localhost/sample > set MONGO_COLLECTION_NAME=items > node server.js server1 ・・・ [2020-02-23T11:41:06.978Z] INFO server1/API: Register route to '/' [2020-02-23T11:41:07.031Z] INFO server1/API: GET /items => item.list [2020-02-23T11:41:07.034Z] INFO server1/API: GET /items/:id => item.get [2020-02-23T11:41:07.035Z] INFO server1/API: POST /items => item.create [2020-02-23T11:41:07.036Z] INFO server1/API: PUT /items/:id => item.update [2020-02-23T11:41:07.037Z] INFO server1/API: PATCH /items/:id => item.patch [2020-02-23T11:41:07.038Z] INFO server1/API: DELETE /items/:id => item.remove ・・・ [2020-02-23T11:41:47.389Z] INFO server1/REGISTRY: Node 'handler1' connected.
イベントハンドラも別途実行しておきます。
handler.js 実行
> node handler.js handler1 ・・・ [2020-02-23T11:41:48.322Z] INFO handler1/REGISTRY: Node 'server1' connected.
準備が整ったので REST API を呼び出して CRUD 処理を順番に呼び出してみます。
create 実施
$ curl -s -XPOST -H "Content-Type: application/json" http://localhost:3000/items -d "{\"name\": \"item1\", \"qty\": 1}" {"name":"item1","qty":1,"_id":"5e5266a735ebff2c9083a0af"}
read 実施
$ curl -s http://localhost:3000/items/5e5266a735ebff2c9083a0af {"_id":"5e5266a735ebff2c9083a0af","name":"item1","qty":1}
update 実施
$ curl -s -XPUT -H "Content-Type: application/json" http://localhost:3000/items/5e5266a735ebff2c9083a0af -d "{\"qty\": 2}" {"_id":"5e5266a735ebff2c9083a0af","name":"item1","qty":2}
delete 実施
$ curl -s -XDELETE http://localhost:3000/items/5e5266a735ebff2c9083a0af {"_id":"5e5266a735ebff2c9083a0af","name":"item1","qty":2}
read 実施
$ curl -s http://localhost:3000/items/5e5266a735ebff2c9083a0af { "name": "EntityNotFoundError", "message": "Entity not found", "code": 404, "type": null, "data": { "id": "5e5266a735ebff2c9083a0af" } }
問題なく動作しているようです。 handler.js の出力結果も以下のようになりました。
handler.js の出力結果
・・・ * item.created: nodeID=handler1, params={"name":"item1","qty":1,"_id":"5e5266a735ebff2c9083a0af"} * item.updated: nodeID=handler1, params={"_id":"5e5266a735ebff2c9083a0af","name":"item1","qty":2} * item.removed: nodeID=handler1, params={"_id":"5e5266a735ebff2c9083a0af","name":"item1","qty":2}
Ramda で入れ子のオブジェクトをフラットにする
Ramda を使って入れ子になったオブジェクトをフラットにする処理を考えてみました。
ソースは http://github.com/fits/try_samples/tree/master/blog/20200216/
サンプル1
まずは、以下のように入れ子になったオブジェクトを使って
{ item: { name: 'item-1', details: { color: 'white', size: 'L' } }, num: 5 }
下記のように変換してみます。
{ name: 'item-1', color: 'white', size: 'L', num: 5 }
方法はいろいろありそうですが、ここではオブジェクトの値部分を単一要素のオブジェクト {<項目名>: <値>}
へ変換してから R.mergeAll
でマージするようにしてみました。
sample1.js
const R = require('ramda') const data = { item: { name: 'item-1', details: { color: 'white', size: 'L' } }, num: 5 } // Object の判定 const isObject = R.pipe(R.type, R.equals('Object')) // 値部分を単一要素のオブジェクトへ変換 const valuesToObjOf = R.pipe( R.mapObjIndexed( R.ifElse( isObject, v => valuesToObjOf(v), R.flip(R.objOf) ) ), R.values, R.flatten ) const flatObj = R.pipe( valuesToObjOf, R.mergeAll ) console.log( valuesToObjOf(data) ) console.log('------') console.log( flatObj(data) )
値部分がオブジェクトの場合は再帰的に処理するようにしていますが、その判定に R.is(Object)
を使うと Array
等も true
になってしまい都合が悪いので、R.type
の結果が Object
か否かで判定するようにしています。
実行結果
> node sample1.js [ { name: 'item-1' }, { color: 'white' }, { size: 'L' }, { num: 5 } ] ------ { name: 'item-1', color: 'white', size: 'L', num: 5 }
サンプル2
上記の処理だと項目名が重複した際に不都合が生じるため、項目名を連結するように valuesToObjOf
を改良してみました。
sample2.js
・・・ const valuesToObjOf = (obj, prefix = '') => R.pipe( R.mapObjIndexed( R.ifElse( isObject, (v, k) => valuesToObjOf(v, `${prefix}${k}_`), (v, k) => R.objOf(`${prefix}${k}`, v) ) ), R.values, R.flatten )(obj) const flatObj = R.pipe( valuesToObjOf, R.mergeAll ) console.log( flatObj(data) )
実行結果
> node sample2.js { item_name: 'item-1', item_details_color: 'white', item_details_size: 'L', num: 5 }
Odoo の在庫モジュールを JSON-RPC で操作2
前回 の続きで、今回は Odoo の在庫モジュールを JSON-RPC で操作し在庫移動(顧客への出荷)を処理してみます。
ソースは http://github.com/fits/try_samples/tree/master/blog/20200112/
在庫管理
Odoo の画面では在庫移動(顧客への出荷等)を以下のように処理しているようでした。(ステータスは ドラフト -> 待機中 -> 準備完了 -> 完了
のように遷移)
- (1) stock.picking を create - 作成
- (2) action_confirm の実行 - 処理準備
- (3) action_assign の実行 - 利用可能準備(引当)
- (4) button_validate の実行
- (5) stock.immediate.transfer の process を実施
ただ、画面処理を前提としたような (4) 以降の処理を API として呼び出すには違和感があります。
そこで、button_validate
の処理等を参照して別の方法で処理できないか調べてみたところ、以下のようにしても実現できそうでした。
- (4') (1) で作られた stock.move.line の qty_done を更新
- (5') action_done の実行
よって、今回はこの両方を試してみます。
(c) 在庫移動1 - button_validate 実行
まずは、button_validate
を呼び出す方を試してみます。
(認証などの処理は前回と同じです)
sample3.js
const axios = require('axios') ・・・ const pickingTypeId = parseInt(process.argv[2]) const locationId = parseInt(process.argv[3]) const locationDestId = parseInt(process.argv[4]) const productId = parseInt(process.argv[5]) const qty = parseInt(process.argv[6]) const main = async () => { const auth = ・・・ const uid = auth.data.result // (1) 作成 const sp1 = await axios.post(url, { jsonrpc: '2.0', method: 'call', id: 'c2', params: { service: 'object', method: 'execute', args: [db, uid, password, 'stock.picking', 'create', { picking_type_id: pickingTypeId, location_id: locationId, location_dest_id: locationDestId, move_lines: [ [0, 0, { name: 'move', product_id: productId, product_uom: 1, product_uom_qty: qty }] ] }] } }) console.log(sp1.data) if (sp1.data.error) { return } const pid = sp1.data.result console.log(`id: ${pid}`) // (2) 処理準備 const sp2 = await axios.post(url, { jsonrpc: '2.0', method: 'call', id: 'c3', params: { service: 'object', method: 'execute', args: [db, uid, password, 'stock.picking', 'action_confirm', pid] } }) console.log(sp2.data) if (sp2.data.error) { return } // (3) 利用可能準備(引当) const sp3 = await axios.post(url, { jsonrpc: '2.0', method: 'call', id: 'c4', params: { service: 'object', method: 'execute', args: [db, uid, password, 'stock.picking', 'action_assign', pid] } }) console.log(sp3.data) if (sp3.data.error) { return } // (4) 検証(button_validate の実行) const sp4 = await axios.post(url, { jsonrpc: '2.0', method: 'call', id: 'c5', params: { service: 'object', method: 'execute', args: [db, uid, password, 'stock.picking', 'button_validate', pid] } }) console.log(sp4.data) if (sp4.data.error) { return } const vld = sp4.data.result // (5) stock.immediate.transfer の process を実行 const res = await axios.post(url, { jsonrpc: '2.0', method: 'call', id: 'c7', params: { service: 'object', method: 'execute', args: [db, uid, password, vld.res_model, 'process', vld.res_id] } }) console.log(res.data) } main().catch(err => console.error(err))
上記を使って、ピッキングタイプ 2(Delivery Orders)
で在庫ロケーション 8(WH/Stock)
から顧客出荷用のロケーション 5(Partner Locations/Customers)
へ商品 19(Large Desk)
を 2個移動してみます。
実行
> node sample3.js 2 8 5 19 2 { jsonrpc: '2.0', id: 'c2', result: 70 } id: 70 { jsonrpc: '2.0', id: 'c3', result: true } { jsonrpc: '2.0', id: 'c4', result: true } { jsonrpc: '2.0', id: 'c5', result: { name: 'Immediate Transfer?', type: 'ir.actions.act_window', view_mode: 'form', res_model: 'stock.immediate.transfer', views: [ [Array] ], view_id: 477, target: 'new', res_id: 8, context: {} } } { jsonrpc: '2.0', id: 'c7', result: false }
在庫数を確認してみると 100個あった在庫が 98個となっているため、とりあえずは成功しているようです。
在庫数の確認
> node sample1.js 19 ・・・ [ { id: 19, name: 'Large Desk', product_tmpl_id: [ 13, '[E-COM09] Large Desk' ], qty_available: 98, virtual_available: 98 } ]
(d) 在庫移動2 - qty_done の更新
次に qty_done
を更新する方法も試してみます。
sample4.js
const axios = require('axios') ・・・ const pickingTypeId = parseInt(process.argv[2]) const locationId = parseInt(process.argv[3]) const locationDestId = parseInt(process.argv[4]) const productId = parseInt(process.argv[5]) const qty = parseInt(process.argv[6]) const main = async () => { ・・・ // (3) 利用可能準備(引当) const sp3 = await axios.post(url, { jsonrpc: '2.0', method: 'call', id: 'd4', params: { service: 'object', method: 'execute', args: [db, uid, password, 'stock.picking', 'action_assign', pid] } }) console.log(sp3.data) if (sp3.data.error) { return } // move_line_ids の取得 const pick = await axios.post(url, { jsonrpc: '2.0', method: 'call', id: 'd5', params: { service: 'object', method: 'execute_kw', args: [db, uid, password, 'stock.picking', 'read', [pid], { fields: ['move_line_ids'] }] } }) console.log(pick.data) const lineIds = pick.data.result[0].move_line_ids console.log(`move line ids: ${lineIds}`) // (4') stock.move.line の qty_done(処理済み在庫数)を更新 const sml = await axios.post(url, { jsonrpc: '2.0', method: 'call', id: 'd6', params: { service: 'object', method: 'execute_kw', args: [ db, uid, password, 'stock.move.line', 'write', [ lineIds, { qty_done: qty }], {} ] } }) console.log(sml.data) if (sml.data.error) { return } // (5') action_done の実行 const sp4 = await axios.post(url, { jsonrpc: '2.0', method: 'call', id: 'd7', params: { service: 'object', method: 'execute', args: [db, uid, password, 'stock.picking', 'action_done', pid] } }) console.log(sp4.data) } main().catch(err => console.error(err))
button_validate の時と同じようにピッキングタイプ 2(Delivery Orders)
で在庫ロケーション 8(WH/Stock)
から顧客出荷用のロケーション 5(Partner Locations/Customers)
へ商品 19(Large Desk)
を今度は 3個移動してみます。
実行
> node sample4.js 2 8 5 19 3 { jsonrpc: '2.0', id: 'd2', result: 71 } id: 71 { jsonrpc: '2.0', id: 'd3', result: true } { jsonrpc: '2.0', id: 'd4', result: true } { jsonrpc: '2.0', id: 'd5', result: [ { id: 71, move_line_ids: [Array] } ] } move line ids: 73 { jsonrpc: '2.0', id: 'd6', result: true } { jsonrpc: '2.0', id: 'd7', result: true }
98個あった在庫が 95個となっているので、こちらも一応は成功しているようです。
在庫確認
> node sample1.js 19 ・・・ [ { id: 19, name: 'Large Desk', product_tmpl_id: [ 13, '[E-COM09] Large Desk' ], qty_available: 95, virtual_available: 95 } ]
Odoo の在庫モジュールを JSON-RPC で操作1
Python で実装されたオープンソース ERP である Odoo の在庫モジュールを JSON-RPC を使って操作してみました。
ソースは http://github.com/fits/try_samples/tree/master/blog/20200112/
はじめに
Docker で PostgreSQL と Odoo を実行しておきます。
PostgreSQL 実行
$ docker run -d -e POSTGRES_USER=odoo -e POSTGRES_PASSWORD=odoo -e POSTGRES_DB=postgres --name db postgres
Odoo 実行
$ docker run -d -p 8069:8069 --name odoo --link db:db odoo
http://localhost:8069/
へアクセスすると DB の初期構築画面(以下)が表示されるので設定します。
「Demo data」 にチェックを付けておくと、デモ用のデータが登録されるので、今回はこのデータを使います。
デフォルトの状態では在庫管理モジュールが導入されていないため、「アプリ」画面で「在庫」モジュールをインストールしておきます。
Odoo には JSON-RPC と XML-RPC が用意されており、これらを利用することで永続モデルクラス(models ディレクトリに配置)等のメソッドを外部から呼び出せるようになっています。
今回は JSON-RPC の方を使いますが、参考までに XML-RPC の処理を Python で実装してみると以下のようになりました。
なお、qty_available
は在庫モジュールをインストールする事によって Product クラス(product.product)へ追加される項目です。
sample.py (XML-RPC で商品情報を取得するサンプル)
import xmlrpc.client url = 'http://localhost:8069' db = 'odoo1' user = 'admin@example.com' password = 'pass' common = xmlrpc.client.ServerProxy(f'{url}/xmlrpc/2/common') # 認証 uid = common.authenticate(db, user, password, {}) print(f'uid: {uid}') models = xmlrpc.client.ServerProxy(f'{url}/xmlrpc/2/object') obj = 'product.product' # 商品の件数取得 count = models.execute(db, uid, password, obj, 'search_count', []) print(f'count: {count}') # 取得する項目 kw = {'fields': ['name', 'lst_price', 'product_tmpl_id', 'qty_available']} # 商品情報取得 products = models.execute_kw(db, uid, password, obj, 'search_read', [], kw) for p in products: print(p)
実行結果
> python sample.py uid: 2 count: 32 {'id': 15, 'name': 'Customizable Desk (CONFIG)', 'lst_price': 800.4, 'product_tmpl_id': [9, 'Customizable Desk (CONFIG)'], 'qty_available': 60.0} {'id': 16, 'name': 'Corner Desk Right Sit', 'lst_price': 147.0, 'product_tmpl_id': [10, '[E-COM06] Corner Desk Right Sit'], 'qty_available': 45.0} {'id': 17, 'name': 'Large Cabinet', 'lst_price': 320.0, 'product_tmpl_id': [11, '[E-COM07] Large Cabinet'], 'qty_available': 250.0} ・・・
在庫管理
それでは、JSON-RPC で在庫の操作を行ってみます。
JSON-RPC や XML-RPC で任意の処理を呼び出すためには、認証を行って uid
を取得します。
JSON-RPC における認証は、下記のような JSON を /jsonrpc
へ POST する事で実施します。
args
には DB の初期構築時に設定した Database Name
、Email
、Password
の値を順に指定した後、最後の引数を {}
や null
等にしておきます。
id
はリクエストとレスポンスを識別するためのもので、任意の文字列や数値を指定する事ができるようです。
認証用リクエスト JSON 例
{ "jsonrpc": "2.0", "method": "call", "id": "a1", "params": { "service": "common", "method": "authenticate", "args": ["odoo1", "admin@example.com", "pass", {}] } }
認証に成功すると以下のようなレスポンスが返り、result
項目の値が uid となっています。
id
はリクエストで指定したものと同じ値になります。
認証用レスポンス JSON 例
{"jsonrpc": "2.0", "id": "a1", "result": 2}
認証で取得した uid を使ってモデルクラスのメソッドを呼び出すには、以下のような JSON を使います。(execute_kw の場合は args 内の要素が 1つ増える)
任意のメソッド呼び出し用リクエスト JSON(execute の場合)
{ "jsonrpc": "2.0", "method": "call", "id": <任意の値>, "params": { "service": "object", "method": "execute", "args": [<Database Name>, <uid>, <Password>, <モデル名>, <メソッド名>, <メソッドの引数>] } }
(a) 在庫数の取得
まずは、指定商品の在庫数を取得してみます。
商品の在庫数は、在庫モジュールによって Product クラスへ追加された qty_available
や virtual_available
で取得できるようです。
qty_available
が商品の在庫画面上で 手持在庫
として表示される実際の在庫数、virtual_available
が画面上の 見通し
として表示される数値で実際の在庫数に入出荷の予定を加味したものです。( virtual_available = qty_available + incoming_qty - outgoing_qty
)
sample1.js
const axios = require('axios') const url = 'http://localhost:8069/jsonrpc' const db = 'odoo1' const user = 'admin@example.com' const password = 'pass' const productId = parseInt(process.argv[2]) const main = async () => { // 認証 const auth = await axios.post(url, { jsonrpc: '2.0', method: 'call', id: 'a1', params: { service: 'common', method: 'authenticate', args: [db, user, password, {}] } }) const uid = auth.data.result console.log(`uid: ${uid}`) // 商品情報の取得 const prd = await axios.post(url, { jsonrpc: '2.0', method: 'call', id: 'a2', params: { service: 'object', method: 'execute_kw', args: [ db, uid, password, 'product.product', 'read', [ productId ], { fields: ['name', 'product_tmpl_id', 'qty_available', 'virtual_available'] } ] } }) console.log(prd.data.result) } main().catch(err => console.error(err))
実行結果は以下の通りです。
実行結果1
> node sample1.js 19 uid: 2 [ { id: 19, name: 'Large Desk', product_tmpl_id: [ 13, '[E-COM09] Large Desk' ], qty_available: 0, virtual_available: 0 } ]
実行結果2
> node sample1.js 21 ・・・ [ { id: 21, name: 'Cabinet with Doors', product_tmpl_id: [ 15, '[E-COM11] Cabinet with Doors' ], qty_available: 8, virtual_available: 128 } ]
(b) 在庫数の調整
次に、指定した商品の在庫数を更新してみます。
Odoo の在庫モジュールでは、在庫ロケーション(stock.location)間の在庫移動(stock.move)として在庫数の変動を表現しているようです。
調整用途で在庫数を更新する際にも調整用ロケーション Virtual Locations/YourCompany: Inventory adjustment
から在庫ロケーション WH/Stock
等への在庫移動という形で処理する事になります。
ここでは、商品の在庫数を簡単に調整するためのウィザード処理 ※ として用意されている(と思われる) ProductChangeQuantity
(stock.change.product.qty
) を使って在庫数を更新してみます。
※ 在庫モジュールの wizard ディレクトリにソースが配置されている
それには、stock.change.product.qty
を create
として change_product_qty
を呼び出せば良さそうです。
create の際に product_id
と product_tmpl_id
で商品を、new_quantity
で更新後の在庫数を指定すれば、stock.warehouse
の lot_stock_id
から在庫のロケーションを特定して在庫移動が作成されるようです。
sample2.js
const axios = require('axios') ・・・ const productId = parseInt(process.argv[2]) const productTmplId = parseInt(process.argv[3]) const qty = parseInt(process.argv[4]) const main = async () => { const auth = ・・・ const uid = auth.data.result // 作成 const chg = await axios.post(url, { jsonrpc: '2.0', method: 'call', id: 'b2', params: { service: 'object', method: 'execute', args: [db, uid, password, 'stock.change.product.qty', 'create', { product_id: productId, product_tmpl_id: productTmplId, new_quantity: qty }] } }) const sid = chg.data.result console.log(`create id: ${sid}`) // 在庫数の更新 const res = await axios.post(url, { jsonrpc: '2.0', method: 'call', id: 'b3', params: { service: 'object', method: 'execute', args: [db, uid, password, 'stock.change.product.qty', 'change_product_qty', sid] } }) console.log(res.data.result) } main().catch(err => console.error(err))
上記を実行して、(a) で在庫数が 0 だった Large Desk(product_id = 19, product_tmpl_id = 13)
の在庫数を 100 にしてみます。
実行結果
> node sample2.js 19 13 100 create id: 6 { type: 'ir.actions.act_window_close' }
商品の在庫数を確認してみると 100 に変化しており、処理は成功しているようです。
在庫数の確認
> node sample1.js 19 ・・・ [ { id: 19, name: 'Large Desk', product_tmpl_id: [ 13, '[E-COM09] Large Desk' ], qty_available: 100, virtual_available: 100 } ]
DB の stock_move
テーブルを確認してみると下記のレコードが追加されていました。
stock_move テーブルへ追加されたレコード(を JSON 化したもの)
{ "id" : 44, "name" : "Product Quantity Updated", ・・・ "company_id" : 1, ・・・ "product_id" : 19, "description_picking" : null, "product_qty" : 100.0, "product_uom_qty" : 100.000, "product_uom" : 1, "location_id" : 14, "location_dest_id" : 8, ・・・ "state" : "done", ・・・ "procure_method" : "make_to_stock", ・・・ "create_uid" : 2, ・・・ }
在庫ロケーション 14
(Virtual Locations/YourCompany: Inventory adjustment)から 8
(WH/Stock)への在庫移動として処理されています。
今回はここまで、次回は顧客への出荷を処理してみたいと思います。
KMongo でデータクラスを setOnInsert する
KMongo でデータクラスをそのまま setOnInsert
する方法を考えてみました。
ソースは http://github.com/fits/try_samples/tree/master/blog/20191217/
はじめに
MongoDB は {upsert: true}
で update
や findAndModify
する際、指定の条件に合致するドキュメントが存在する場合は何も行わず、存在しない場合に登録するドキュメントを $setOnInsert
で指定できるようになっています。
KMongo にも setOnInsert
メソッドは用意されていますが、現時点では以下のような実装になっており、データクラスをそのままセットできるようになっていません。(項目名と値を指定する事になる)
kmongo-property/src/main/kotlin/org/litote/kmongo/Updates.kt
fun <@OnlyInputTypes T> setOnInsert(property: KProperty<T?>, value: T): Bson = Updates.setOnInsert(property.path(), value)
そのため、データクラスをそのまま setOnInsert
するには何らかの工夫が必要だと思われます。
(a) リフレクション利用
下記のデータクラスを使って、targetId と revision が同じドキュメントは複数登録しないような処理を実装してみます。
データクラス例
typealias EventDate = OffsetDateTime data class Event(val targetId: String, val revision: Long, val date: EventDate = EventDate.now())
とりあえず、データクラスの項目を 1つずつ指定して combine
する方法が考えられます。
コード例1
・・・ val col = client.getDatabase(dbName).getCollection<Event>() val eventData = Event(・・・) col.findOneAndUpdate( and( Event::targetId eq eventData.targetId, Event::revision eq eventData.revision ), combine( setOnInsert(Event::targetId, eventData.targetId), setOnInsert(Event::revision, eventData.revision), setOnInsert(Event::date, eventData.date) ), findOneAndUpdateUpsert() )
リフレクションを使うと以下のようになります。
コード例2
・・・ val col = client.getDatabase(dbName).getCollection<Event>() val eventData = Event(・・・) col.findOneAndUpdate( and( Event::targetId eq eventData.targetId, Event::revision eq eventData.revision ), combine( Event::class.memberProperties.map { setOnInsert(it, it.get(eventData)) } ), findOneAndUpdateUpsert() )
サンプルアプリケーション(sample1)
下記サンプルアプリケーションで動作確認してみます。
なお、findOneAndUpdate
の第三引数を findOneAndUpdateUpsert()
もしくは findOneAndUpdateUpsert().returnDocument(ReturnDocument.BEFORE)
とすると処理前のドキュメントの内容が返り ※、findOneAndUpdateUpsert().returnDocument(ReturnDocument.AFTER)
とすると処理後の内容が返ってきます。
※ 該当ドキュメントが存在しない場合(つまり、新規登録した場合)は null が返ってきます
sample1/src/main/kotlin/App.kt
import com.mongodb.ConnectionString import com.mongodb.client.model.ReturnDocument import org.litote.kmongo.* import java.time.OffsetDateTime import kotlin.reflect.full.memberProperties typealias EventDate = OffsetDateTime data class Event(val targetId: String, val revision: Long, val date: EventDate = EventDate.now()) fun main() { val conStr = "mongodb://localhost" val dbName = "sample" KMongo.createClient(ConnectionString(conStr)).use { client -> val col = client.getDatabase(dbName).getCollection<Event>() val d1 = Event("a1", 1) val res = col.findOneAndUpdate( and( Event::targetId eq d1.targetId, Event::revision eq d1.revision ), combine( setOnInsert(Event::targetId, d1.targetId), setOnInsert(Event::revision, d1.revision), setOnInsert(Event::date, d1.date) ), findOneAndUpdateUpsert() ) println(res) val d2 = Event("b1", 1) val res2 = col.findOneAndUpdate( and( Event::targetId eq d2.targetId, Event::revision eq d2.revision ), combine( Event::class.memberProperties.map { setOnInsert(it, it.get(d2)) } ), findOneAndUpdateUpsert().returnDocument(ReturnDocument.AFTER) ) println(res2) } }
動作確認
Gradle でビルド・実行します。
実行(1回目)
> gradle sample1:run ・・・ null Event(targetId=b1, revision=1, date=2019-12-15T13:03:51.586Z) ・・・
MongoDB のドキュメント内容は以下の通りです。
MongoDB データ確認(1回目)
> mongo mongodb://localhost/sample ・・・ > db.event.find() { "_id" : ObjectId("5df62f37a4d2dfcdb84e6f5e"), "revision" : NumberLong(1), "targetId" : "a1", "date" : ISODate("2019-12-15T13:03:50.206Z") } { "_id" : ObjectId("5df62f37a4d2dfcdb84e6f60"), "revision" : NumberLong(1), "targetId" : "b1", "date" : ISODate("2019-12-15T13:03:51.586Z") }
再度実行してみます。
実行(2回目)
> gradle sample1:run ・・・ Event(targetId=a1, revision=1, date=2019-12-15T13:03:50.206Z) Event(targetId=b1, revision=1, date=2019-12-15T13:03:51.586Z) ・・・
既に該当ドキュメントが登録済みのため、MongoDB のドキュメントに変化はありません。
MongoDB データ確認(2回目)
> db.event.find() { "_id" : ObjectId("5df62f37a4d2dfcdb84e6f5e"), "revision" : NumberLong(1), "targetId" : "a1", "date" : ISODate("2019-12-15T13:03:50.206Z") } { "_id" : ObjectId("5df62f37a4d2dfcdb84e6f60"), "revision" : NumberLong(1), "targetId" : "b1", "date" : ISODate("2019-12-15T13:03:51.586Z") }
(b) JSON 利用
(a) の方法には問題があり、下記のようなデータクラスで不都合が生じます。
@JsonTypeInfo
部分が処理されないため、復元先のクラス名を設定する @class
項目が作られず、EventDetail 部分が実装クラス(Created
や Updated
)へ復元できずにエラーとなります。
データクラス例
interface EventDetail data class Created(val value: Int) : EventDetail data class Updated(val oldValue: Int, val newValue: Int) : EventDetail data class Event(val targetId: String, val revision: Long, @JsonTypeInfo(use = JsonTypeInfo.Id.CLASS) val detail: EventDetail, val date: EventDate = EventDate.now())
そこで、ここでは .json
でデータクラスの JSON 文字列を取得し、文字列として {$setOnInsert: {"targetId": "・・・", ・・・}}
を組み立て BSON 化する方法を試します。
コード例
・・・ val col = client.getDatabase(dbName).getCollection<Event>() val eventData = Event(・・・) col.findOneAndUpdate( and( Event::targetId eq eventData.targetId, Event::revision eq eventData.revision ), // 文字列として $setOnInsert の内容を作って BSON 化 "{${MongoOperator.setOnInsert}: ${eventData.json}}".bson, findOneAndUpdateUpsert() )
サンプルアプリケーション(sample2)
下記サンプルアプリケーションで動作確認してみます。
sample2/src/main/kotlin/App.kt
import com.fasterxml.jackson.annotation.JsonTypeInfo import com.mongodb.ConnectionString import com.mongodb.client.model.ReturnDocument import org.litote.kmongo.* import java.time.OffsetDateTime typealias EventDate = OffsetDateTime interface EventDetail data class Created(val value: Int) : EventDetail data class Updated(val oldValue: Int, val newValue: Int) : EventDetail data class Event(val targetId: String, val revision: Long, @JsonTypeInfo(use = JsonTypeInfo.Id.CLASS) val detail: EventDetail, val date: EventDate = EventDate.now()) fun main() { val conStr = "mongodb://localhost" val dbName = "sample2" KMongo.createClient(ConnectionString(conStr)).use { client -> val col = client.getDatabase(dbName).getCollection<Event>() val d1 = Event("a1", 1, Created(1)) val res1 = col.findOneAndUpdate( and( Event::targetId eq d1.targetId, Event::revision eq d1.revision ), "{${MongoOperator.setOnInsert}: ${d1.json}}".bson, findOneAndUpdateUpsert().returnDocument(ReturnDocument.AFTER) ) println(res1) val d2 = Event("a1", 2, Updated(1, 2)) val res2 = col.findOneAndUpdate( and( Event::targetId eq d2.targetId, Event::revision eq d2.revision ), "{${MongoOperator.setOnInsert}: ${d2.json}}".bson, findOneAndUpdateUpsert().returnDocument(ReturnDocument.AFTER) ) println(res2) } }
動作確認
実行します。
実行(1回目)
> gradle sample2:run ・・・ Event(targetId=a1, revision=1, detail=Created(value=1), date=2019-12-15T16:35:52.351Z) Event(targetId=a1, revision=2, detail=Updated(oldValue=1, newValue=2), date=2019-12-15T16:35:53.265Z) ・・・
MongoDB のドキュメント内容は以下の通り。
@class
項目にクラス名が保存されています。
MongoDB データ確認(1回目)
> mongo mongodb://localhost/sample2 ・・・ > db.event.find() { "_id" : ObjectId("5df660e9a4d2dfcdb84e7206"), "revision" : 1, "targetId" : "a1", "date" : ISODate("2019-12-15T16:35:52.351Z"), "detail" : { "@class" : "Created", "value" : 1 } } { "_id" : ObjectId("5df660e9a4d2dfcdb84e7208"), "revision" : 2, "targetId" : "a1", "date" : ISODate("2019-12-15T16:35:53.265Z"), "detail" : { "@class" : "Updated", "oldValue" : 1, "newValue" : 2 } }
再度実行してみます。
実行(2回目)
> gradle sample2:run ・・・ Event(targetId=a1, revision=1, detail=Created(value=1), date=2019-12-15T16:35:52.351Z) Event(targetId=a1, revision=2, detail=Updated(oldValue=1, newValue=2), date=2019-12-15T16:35:53.265Z) ・・・
ドキュメントが登録済みのため MongoDB の内容に変化はありません。
MongoDB データ確認(2回目)
> db.event.find() { "_id" : ObjectId("5df660e9a4d2dfcdb84e7206"), "revision" : 1, "targetId" : "a1", "date" : ISODate("2019-12-15T16:35:52.351Z"), "detail" : { "@class" : "Created", "value" : 1 } } { "_id" : ObjectId("5df660e9a4d2dfcdb84e7208"), "revision" : 2, "targetId" : "a1", "date" : ISODate("2019-12-15T16:35:53.265Z"), "detail" : { "@class" : "Updated", "oldValue" : 1, "newValue" : 2 } }
備考
今回使用した Gradle 用のビルド定義ファイルは以下の通りです。
build.gradle
buildscript { repositories { jcenter() } dependencies { classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version" } } plugins { id 'org.jetbrains.kotlin.jvm' version "$kotlin_version" apply false } allprojects { apply plugin: 'org.jetbrains.kotlin.jvm' apply plugin: 'application' mainClassName = 'AppKt' repositories { mavenLocal() jcenter() } dependencies { implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version" implementation "org.litote.kmongo:kmongo:$kmongo_version" } }
gradle.properties
kotlin_version=1.3.61 kmongo_version=3.11.2
settings.gradle
rootProject.name = 'kmongo_setoninsert' include 'sample1', 'sample2'
gRPC Server Reflection のクライアント処理
gRPC Server Reflection を呼び出す処理を Node.js で実装してみました。
ソースは http://github.com/fits/try_samples/tree/master/blog/20191008/
事前準備(サーバー実装)
まずは、gRPC Server Reflection を有効にしたサーバー処理を用意します。 Node.js での実装は無理そうだったので(未実装らしい) Go 言語で実装します。
protoc コマンドをインストールした後、Go 言語用の protoc プラグインである protoc-gen-go
をインストールします。
protoc-gen-go インストール
> go get -u github.com/golang/protobuf/protoc-gen-go
google.golang.org/grpc
等のライブラリをビルド時に自動ダウンロードするように、Go のモジュールファイル go.mod
を用意しておきます。
go.mod の作成
> go mod init sample
proto ファイルを作成してインターフェースを定義します。
今回は、go_package
を使って Go 用のパッケージを別途定義してみました。
proto/item/item.proto
syntax = "proto3"; import "google/protobuf/empty.proto"; option go_package = "sample/proto/item"; package item; message ItemRequest { string item_id = 1; } message AddItemRequest { string item_id = 1; uint64 price = 2; } message Item { string item_id = 1; uint64 price = 2; } service ItemService { rpc GetItem(ItemRequest) returns (Item); rpc AddItem(AddItemRequest) returns (google.protobuf.Empty); }
protoc コマンドで Go 用のコードを生成します。
protoc によるコード生成
> protoc -I=proto --go_out=plugins=grpc,paths=source_relative:./proto proto/item/item.proto
サーバー処理を実装します。
Server Reflection を有効化するには google.golang.org/grpc/reflection
を import して reflection.Register
を適用するだけなので簡単です。
server/main.go
package main import ( "context" "fmt" "net" "log" "google.golang.org/grpc" "google.golang.org/grpc/reflection" empty "github.com/golang/protobuf/ptypes/empty" pb "sample/proto/item" ) type Server struct { Items map[string]pb.Item } func (s *Server) GetItem(ctx context.Context, req *pb.ItemRequest) (*pb.Item, error) { log.Println("call GetItem: ", req) item, ok := s.Items[req.GetItemId()] if !ok { return nil, fmt.Errorf("item not found: %s", req.GetItemId()) } return &item, nil } func (s *Server) AddItem(ctx context.Context, req *pb.AddItemRequest) (*empty.Empty, error) { log.Println("call AddItem: ", req) s.Items[req.GetItemId()] = pb.Item{ItemId: req.GetItemId(), Price: req.GetPrice()} return &empty.Empty{}, nil } func main() { address := ":50051" listen, err := net.Listen("tcp", address) if err != nil { log.Fatalf("error: %v", err) } s := grpc.NewServer() pb.RegisterItemServiceServer(s, &Server{Items: make(map[string]pb.Item)}) // gRPC Server Reflection 有効化 reflection.Register(s) log.Println("server start:", address) if err := s.Serve(listen); err != nil { log.Fatalf("failed serve: %v", err) } }
上記を実行しておきます。(go run でビルドも実施されます)
初回実行時は google.golang.org/grpc
等の依存ライブラリが自動的にダウンロードされます。
実行
> go run server/main.go ・・・ 2019/10/06 22:00:00 server start: :50051
サーバー側のファイル構成は以下のようになっています。
ファイル構成
- go.mod
- go.sum
- proto
- item
- item.proto
- item.pb.go
- item
- server
- main.go
gRPC Server Reflection クライアント処理
それでは、本題の gRPC Server Reflection を呼び出す処理を実装していきます。
準備
どのように実装すべきか分からなかったので、とりあえず今回は https://github.com/grpc/grpc/blob/master/src/proto/grpc/reflection/v1alpha/reflection.proto を使う事にしました。
まずは、この proto ファイルをローカルにダウンロードしておきます。
proto ファイルのダウンロード
> curl -O https://raw.githubusercontent.com/grpc/grpc/master/src/proto/grpc/reflection/v1alpha/reflection.proto
Node.js で gRPC 処理を実装するため grpc
と proto-loader
をインストールしておきます。
grpc と proto-loader のインストール
> npm install grpc @grpc/proto-loader
(a) サービスのリスト取得
はじめに、サービス名をリストアップする処理を実装してみます。
reflection.proto
を見てみると、以下のように ServerReflectionInfo
メソッドの引数である ServerReflectionRequest
の message_request
にどのフィールドを指定するかで取得内容が変わるようになっています。
サービスのリストを取得するには list_services
フィールドを使えば良さそうです。
reflection.proto の該当箇所
・・・ package grpc.reflection.v1alpha; service ServerReflection { rpc ServerReflectionInfo(stream ServerReflectionRequest) returns (stream ServerReflectionResponse); } message ServerReflectionRequest { string host = 1; oneof message_request { string file_by_filename = 3; string file_containing_symbol = 4; ExtensionRequest file_containing_extension = 5; string all_extension_numbers_of_type = 6; string list_services = 7; } } ・・・
ServerReflectionInfo
メソッドは引数と戻り値の両方に stream
が指定されている双方向ストリーミング RPC となっているため、以下のように write
でリクエストメッセージを送信して on('data', ・・・)
でレスポンスメッセージを取得する事になります。
また、end
でストリーミング処理を終了します。
list_services.js
const grpc = require('grpc') const protoLoader = require('@grpc/proto-loader') const pd = protoLoader.loadSync('reflection.proto', { keepCase: true, defaults: true }) const proto = grpc.loadPackageDefinition(pd).grpc.reflection.v1alpha const client = new proto.ServerReflection( '127.0.0.1:50051', grpc.credentials.createInsecure() ) const call = client.ServerReflectionInfo() call.on('error', err => { // ストリーミングの終了 call.end() console.error(err) }) // レスポンスメッセージの受信 call.on('data', res => { // ストリーミングの終了 call.end() // サービス名の出力 res.list_services_response.service.forEach(s => { console.log(s.name) }) }) // リクエストメッセージの送信 call.write({host: 'localhost', list_services: ''})
実行結果は以下の通り、サービス名を出力できました。
実行結果
> node list_services.js grpc.reflection.v1alpha.ServerReflection item.ItemService
(b) サービスのインターフェース定義取得
次に、サービスの内容(インターフェース定義)を取得してみます。
こちらは、リクエストメッセージの file_containing_symbol
フィールドにサービス名を指定する事で取得できます。
ただ、レスポンスメッセージの該当フィールドの内容が reflection.proto のコメントにあるように FileDescriptorProto
をシリアライズした結果 ※ (の配列)となっている点に注意が必要です。
※ bytes 型は Node.js では Buffer として扱われる
reflection.proto の該当箇所
・・・ message FileDescriptorResponse { // Serialized FileDescriptorProto messages. We avoid taking a dependency on // descriptor.proto, which uses proto2 only features, by making them opaque // bytes instead. repeated bytes file_descriptor_proto = 1; } ・・・
FileDescriptorProto へのデシリアライズに関しては試行錯誤しましたが、最も手軽そうな protobufjs/ext/descriptor
の FileDescriptorProto.decode
を使う事にしました。
load_symbol.js
const grpc = require('grpc') const protoLoader = require('@grpc/proto-loader') const descriptor = require('protobufjs/ext/descriptor') const serviceName = process.argv[2] ・・・ const call = client.ServerReflectionInfo() call.on('error', err => { call.end() console.error(err) }) call.on('data', res => { call.end() res.file_descriptor_response.file_descriptor_proto // FileDescriptorProto デシリアライズ .map(buf => descriptor.FileDescriptorProto.decode(buf)) .forEach(d => { // JSON 化して出力 console.log(JSON.stringify(d, null, 2)) }) }) call.write({host: 'localhost', file_containing_symbol: serviceName})
item.ItemService のサービス内容を取得してみた結果です。 go_package の内容も含め、問題なく取得できているようです。
実行結果
> node load_symbol.js item.ItemService { "name": "item/item.proto", "package": "item", "dependency": [ "google/protobuf/empty.proto" ], "messageType": [ { "name": "ItemRequest", "field": [ { "name": "item_id", "number": 1, "label": "LABEL_OPTIONAL", "type": "TYPE_STRING", "jsonName": "itemId" } ] }, { "name": "AddItemRequest", "field": [ { "name": "item_id", "number": 1, "label": "LABEL_OPTIONAL", "type": "TYPE_STRING", "jsonName": "itemId" }, { "name": "price", "number": 2, "label": "LABEL_OPTIONAL", "type": "TYPE_UINT64", "jsonName": "price" } ] }, { "name": "Item", "field": [ { "name": "item_id", "number": 1, "label": "LABEL_OPTIONAL", "type": "TYPE_STRING", "jsonName": "itemId" }, { "name": "price", "number": 2, "label": "LABEL_OPTIONAL", "type": "TYPE_UINT64", "jsonName": "price" } ] } ], "service": [ { "name": "ItemService", "method": [ { "name": "GetItem", "inputType": ".item.ItemRequest", "outputType": ".item.Item" }, { "name": "AddItem", "inputType": ".item.AddItemRequest", "outputType": ".google.protobuf.Empty" } ] } ], "options": { "goPackage": "sample/proto/item" }, "syntax": "proto3" }
(c) 全サービスのインターフェース定義取得
最後に、全サービスのインターフェース定義を取得する処理を実装してみます。
双方向ストリーミング RPC を活用して、サービスのリスト取得とサービス内容の取得を同じストリーミング上で行ってみました。
load_symbols.js
const grpc = require('grpc') const protoLoader = require('@grpc/proto-loader') const descriptor = require('protobufjs/ext/descriptor') ・・・ const call = client.ServerReflectionInfo() call.on('error', err => { call.end() console.error(err) }) let count = 0 call.on('data', res => { // サービスのリスト取得時の処理 if (res.list_services_response) { const names = res.list_services_response.service.map(s => s.name) count = names.length names.forEach(name => call.write({host: 'localhost', file_containing_symbol: name}) ) } // サービスのインターフェース定義取得時の処理 else if (res.file_descriptor_response) { if (--count == 0) { // インターフェース定義を全て取得したら終了 call.end() } res.file_descriptor_response.file_descriptor_proto .map(buf => descriptor.FileDescriptorProto.decode(buf)) .forEach(d => { console.log(JSON.stringify(d, null, 2)) }) } else { console.log(res) call.end() } }) call.write({host: 'localhost', list_services: ''})
実行結果は以下の通りです。
実行結果
> node load_symbols.js { "name": "grpc_reflection_v1alpha/reflection.proto", "package": "grpc.reflection.v1alpha", ・・・ "service": [ { "name": "ServerReflection", "method": [ { "name": "ServerReflectionInfo", "inputType": ".grpc.reflection.v1alpha.ServerReflectionRequest", "outputType": ".grpc.reflection.v1alpha.ServerReflectionResponse", "clientStreaming": true, "serverStreaming": true } ] } ], "syntax": "proto3" } { "name": "item/item.proto", "package": "item", "dependency": [ "google/protobuf/empty.proto" ], ・・・ "service": [ { "name": "ItemService", "method": [ { "name": "GetItem", "inputType": ".item.ItemRequest", "outputType": ".item.Item" }, { "name": "AddItem", "inputType": ".item.AddItemRequest", "outputType": ".google.protobuf.Empty" } ] } ], "options": { "goPackage": "sample/proto/item" }, "syntax": "proto3" }
備考
別の実装例として、処理毎に個別のストリーミングで処理するようにして Promise 化してみました。
load_symbols2.js
const grpc = require('grpc') const protoLoader = require('@grpc/proto-loader') const descriptor = require('protobufjs/ext/descriptor') ・・・ const merge = a => b => Object.fromEntries([a, b].map(Object.entries).flat()) const serverReflectionInfo = (f, g) => new Promise((resolve, revoke) => { const call = client.ServerReflectionInfo() call.on('error', err => { call.end() revoke(err) }) call.on('data', res => { call.end() resolve( g(res) ) }) call.write( f({host: 'localhost'}) ) }) // サービスのリスト取得 const listServices = () => serverReflectionInfo( merge({list_services: ''}), res => res.list_services_response.service.map(s => s.name) ) // サービスのインターフェース定義取得 const loadSymbol = name => serverReflectionInfo( merge({file_containing_symbol: name}), res => res.file_descriptor_response.file_descriptor_proto .map(buf => descriptor.FileDescriptorProto.decode(buf)) ) listServices() .then(names => Promise.all(names.map(loadSymbol)) ) .then(ds => ds.flat()) .then(ds => ds.forEach(d => console.log(JSON.stringify(d, null, 2)))) .catch(err => console.error(err))
実行結果
> node load_symbols2.js { "name": "grpc_reflection_v1alpha/reflection.proto", "package": "grpc.reflection.v1alpha", ・・・ "service": [ { "name": "ServerReflection", "method": [ { "name": "ServerReflectionInfo", "inputType": ".grpc.reflection.v1alpha.ServerReflectionRequest", "outputType": ".grpc.reflection.v1alpha.ServerReflectionResponse", "clientStreaming": true, "serverStreaming": true } ] } ], "syntax": "proto3" } { "name": "item/item.proto", "package": "item", "dependency": [ "google/protobuf/empty.proto" ], ・・・ "service": [ { "name": "ItemService", "method": [ { "name": "GetItem", "inputType": ".item.ItemRequest", "outputType": ".item.Item" }, { "name": "AddItem", "inputType": ".item.AddItemRequest", "outputType": ".google.protobuf.Empty" } ] } ], "options": { "goPackage": "sample/proto/item" }, "syntax": "proto3" }
Pulumi を使って Kubernetes へ CRD を登録
Pulumi は JavaScript・Python・Go のようなプログラミング言語で Infrastructure as Code するためのツールです。
今回は、この Pulumi を使って Kubernetes(k3s を使用)へカスタムリソースを登録してみます。
ソースは http://github.com/fits/try_samples/tree/master/blog/20190825/
はじめに
今回は、k3s(Lightweight Kubernetes) がインストール済みの Ubuntu 環境を使います。※
※ ただし、k3s の前に microk8s をインストールしたりしているので クリーンな環境とは言えないかもしれない (Istio と Knative のために Helm 等をインストールしていたりもする)
Pulumi を以下のようにインストールします。
Pulumi インストール例
$ curl -fsSL https://get.pulumi.com | sh
$HOME/.pulumi
ディレクトリへ各種ファイルが展開され、.bashrc
ファイルへ PATH の設定が追加されました。
なお、このままだと pulumi コマンド実行時に kubeconfig を参照できないようだったので、とりあえず /etc/rancher/k3s/k3s.yaml
を $HOME/.kube/config
ファイルとしてコピーし chown しています。
動作確認
$ pulumi version v0.17.28
Kubernetes へ CRD を登録(Node.js 使用)
準備
プロジェクトを作成する前に、まずは Pulumi でログインを実施しておきます。
今回はローカル環境の Kubernetes を使うので --local
を指定して login を実施しました。
ログイン
$ pulumi login --local
プロジェクト作成
適当なディレクトリを用意し、テンプレートを指定してプロジェクトのひな型を作成します。
今回は Kubernetes を対象とした Node.js 用のプロジェクトを作成するため kubernetes-javascript
を指定しました。
プロジェクト作成
$ mkdir sample $ cd sample $ pulumi new kubernetes-javascript ・・・ project name: (sample) project description: (A minimal Kubernetes JavaScript Pulumi program) Created project 'sample' stack name: (dev) Enter your passphrase to protect config/secrets: Re-enter your passphrase to confirm: Created stack 'dev' Enter your passphrase to unlock config/secrets (set PULUMI_CONFIG_PASSPHRASE to remember): Installing dependencies... ・・・
実装
上記で生成された index.js
ファイルに Kubernetes へ登録する内容を実装していきます。
今回は下記のようなカスタムリソースの登録を実装します。
例. カスタムリソース登録内容
apiVersion: apiextensions.k8s.io/v1beta1 kind: CustomResourceDefinition metadata: name: items.example.com spec: group: example.com version: v1alpha1 scope: Namespaced names: kind: Item plural: items singular: item preserveUnknownFields: false validation: openAPIV3Schema: type: object properties: spec: type: object properties: value: type: integer note: type: string --- apiVersion: "example.com/v1alpha1" kind: Item metadata: name: item1 spec: value: 100 note: sample item 1 --- apiVersion: "example.com/v1alpha1" kind: Item metadata: name: item2 spec: value: 20 note: sample item 2
上記のカスタムリソースを実装したコードが以下です。
Pulumi でカスタムリソースを定義する場合、@pulumi/kubernetes
の CustomResourceDefinition
と CustomResource
を使えば良さそうです。
YAML と同等の内容を JavaScript の Object で表現して、CustomResourceDefinition 等のコンストラクタの第 2引数として渡すだけです。
index.js (カスタムリソース登録内容の実装)
'use strict' const k8s = require('@pulumi/kubernetes') const capitalize = s => `${s[0].toUpperCase()}${s.slice(1)}` const crdName = 'item' const crdGroup = 'example.com' const crdVersion = 'v1alpha1' const props = { value: 'integer', note: 'string' } const items = [ { name: 'item1', value: 100, note: 'sample item 1' }, { name: 'item2', value: 20, note: 'sample item 2' } ] const crdKind = capitalize(crdName) const crdPlural = `${crdName}s` new k8s.apiextensions.v1beta1.CustomResourceDefinition(crdName, { metadata: { name: `${crdPlural}.${crdGroup}` }, spec: { group: crdGroup, version: crdVersion, scope: 'Namespaced', names: { kind: crdKind, plural: crdPlural, singular: crdName }, preserveUnknownFields: false, validation: { openAPIV3Schema: { type: 'object', properties: { spec: { type: 'object', properties: Object.fromEntries( Object.entries(props).map(([k, v]) => [k, { type: v }] ) ) } } } } } }) items.forEach(it => new k8s.apiextensions.CustomResource(it.name, { apiVersion: `${crdGroup}/${crdVersion}`, kind: crdKind, metadata: { name: it.name }, spec: Object.fromEntries( Object.keys(props).map(k => [k, it[k]]) ) }) )
デプロイ
pulumi up でデプロイします。
デプロイ
$ pulumi up ・・・ Previewing update (dev): Type Name P + pulumi:pulumi:Stack sample-dev c + tq kubernetes:example.com:Item item2 c + tq kubernetes:example.com:Item item1 c + mq kubernetes:apiextensions.k8s.io:CustomResourceDefinition item c Resources: + 4 to create Do you want to perform this update? yes > no details
Do you want to perform this update?
で yes
を選択すると実際にデプロイが実施されます。
Do you want to perform this update? yes Updating (dev): Type Name S + pulumi:pulumi:Stack sample-dev c + tq kubernetes:apiextensions.k8s.io:CustomResourceDefinition item c + tq kubernetes:example.com:Item item1 c + mq kubernetes:example.com:Item item2 c Resources: + 4 created ・・・
ちなみに、details を選ぶと登録内容(YAML)を確認できます。
正常に登録されたか、kubectl コマンドで確認してみます。(k3s の一般的な環境では k3s kubectl とする必要があるかもしれません)
CRD の確認
$ kubectl get crd | grep items items.example.com 2019-08-14T08:57:22Z
カスタムリソースの確認
$ kubectl get item NAME AGE item1 13m item2 13m
カスタムリソース詳細1
$ kubectl describe item item1 Name: item1 Namespace: default Labels: app.kubernetes.io/managed-by=pulumi Annotations: kubectl.kubernetes.io/last-applied-configuration: {"apiVersion":"example.com/v1alpha1","kind":"Item","metadata":{"labels":{"app.kubernetes.io/managed-by":"pulumi"},"name":"item1"},"spec":{... API Version: example.com/v1alpha1 Kind: Item Metadata: Creation Timestamp: 2019-08-14T08:57:22Z Generation: 1 Resource Version: 19763 Self Link: /apis/example.com/v1alpha1/namespaces/default/items/item1 UID: 87503274-be71-11e9-aeea-025c19d6acb9 Spec: Note: sample item 1 Value: 100 Events: <none>
カスタムリソース詳細2
$ kubectl describe item item2 Name: item2 Namespace: default Labels: app.kubernetes.io/managed-by=pulumi Annotations: kubectl.kubernetes.io/last-applied-configuration: {"apiVersion":"example.com/v1alpha1","kind":"Item","metadata":{"labels":{"app.kubernetes.io/managed-by":"pulumi"},"name":"item2"},"spec":{... API Version: example.com/v1alpha1 Kind: Item Metadata: Creation Timestamp: 2019-08-14T08:57:22Z Generation: 1 Resource Version: 19764 Self Link: /apis/example.com/v1alpha1/namespaces/default/items/item2 UID: 875af773-be71-11e9-aeea-025c19d6acb9 Spec: Note: sample item 2 Value: 20 Events: <none>
問題なく登録できているようです。
アンデプロイ
デプロイ内容を削除(アンデプロイ)する場合は destroy を実行します。
アンデプロイ
$ pulumi destroy ・・・ Do you want to perform this destroy? yes Destroying (dev): Type Name S - pulumi:pulumi:Stack sample-dev d - tq kubernetes:apiextensions.k8s.io:CustomResourceDefinition item d - tq kubernetes:example.com:Item item2 d - mq kubernetes:example.com:Item item1 d Resources: - 4 deleted ・・・