Node.js のマイクロサービスフレームワーク Moleculer
Moleculer という名のなかなか期待できそうな Node.js 用マイクロサービスフレームワークを見つけたのでご紹介します。
ロードバランサー、サーキットブレーカー、メトリクス収集、トレーシング、キャッシュ機能等、多彩な機能を備えていますが。
個人的に、イベント駆動アーキテクチャを採用し Kafka や NATS 等のメッセージブローカーと容易に連携できる事から、コレオグラフィ型のマイクロサービスを手軽に作れそうな点に注目しています。
なんとなく Akka と似ている気もしますが、Akka を使ったマイクロサービスフレームワークの Lagom などと比べても、簡単かつ手軽に使えるのが利点かと思います。
今回のソースは http://github.com/fits/try_samples/tree/master/blog/20200224/
(1) ローカルアプリケーション
とりあえず簡単なローカルアプリケーションを作成してみます。
以下のモジュールを使うのでインストールしておきます。
- moleculer 0.14.2
moleculer インストール例
> npm install --save moleculer
Moleculer では、メソッド・アクション・イベントハンドリング等を定義した Service
を ServiceBroker
へ登録する事で処理を組み立てます。
ServiceBroker はノード毎にインスタンスを作る事になり、アクションの呼び出しやイベントの通知、別ノードとの連携等を担います。
アクションは call('<サービス名>.<アクション名>', <パラメータ>)
で呼び出す事ができ、アクションで return した値が call の戻り値となります。
イベントの発火(通知)は emit('<イベント名>', <イベント内容>)
や broadcast('<イベント名>', <イベント内容>)
※ で行います。
※ emit と broadcast の違いは、 broadcast は全ノードの全サービスへ通知されますが、 emit は同じサービスを複数のノードで実行していると その中の 1つのノードへ通知されます
order1.js
const { ServiceBroker } = require('moleculer') const broker = new ServiceBroker() broker.createService({ name: 'order-service', actions: { order(ctx) { console.log(`# order-service.order: ${JSON.stringify(ctx.params)}`) // order.ordered イベントの発火 ctx.emit('order.ordered', ctx.params) return ctx.params.id } } }) broker.createService({ name: 'order-check-service', events: { // order.ordered イベントのハンドリング 'order.ordered'(ctx) { console.log(`## order-check-service ${ctx.eventName}: ${JSON.stringify(ctx.params)}`) } } }) const run = async () => { await broker.start() const order1 = {id: 'order1', item: 'item1', qty: 2} // order アクションの呼び出し const res1 = await broker.call('order-service.order', order1) console.log(`order result: ${res1}`) } run().catch(err => console.error(err))
実行結果
> node order1.js [2020-02-18T14:08:35.336Z] INFO ・・・: Moleculer v0.14.2 is starting... ・・・ # order-service.order: {"id":"order1","item":"item1","qty":2} ## order-check-service order.ordered: {"id":"order1","item":"item1","qty":2} order result: order1 ・・・ [2020-02-18T14:08:35.440Z] INFO ・・・: ServiceBroker is stopped. Good bye.
上記の処理へサービスを 1つ追加してみます。
イベントのハンドリングではワイルドカード *
も使えるようになっていますので使ってみます。
order2.js
const { ServiceBroker } = require('moleculer') const broker = new ServiceBroker() broker.createService({ name: 'order-service', actions: { order(ctx) { console.log(`# order-service.order: ${JSON.stringify(ctx.params)}`) ctx.emit('order.ordered', ctx.params) return ctx.params.id } }, events: { // order.validated と order.invalidated イベントをハンドリング 'order.*validated'(ctx) { console.log(`# order-service ${ctx.eventName}: ${JSON.stringify(ctx.params)}`) } } }) broker.createService({ name: 'order-check-service', events: { async 'order.ordered'(ctx) { console.log(`## order-check-service ${ctx.eventName}: ${JSON.stringify(ctx.params)}`) // item-service の check アクション呼び出し const isValid = await ctx.call('item-service.check', ctx.params) if (isValid) { ctx.emit('order.validated', ctx.params) } else { ctx.emit('order.invalidated', ctx.params) } } } }) broker.createService({ name: 'item-service', actions: { check(ctx) { console.log(`### item-service.check: ${JSON.stringify(ctx.params)}`) return ['item1', 'item2'].includes(ctx.params.item) } } }) const run = async () => { await broker.start() const order1 = {id: 'order1', item: 'item1', qty: 2} const order2 = {id: 'order2', item: 'item3', qty: 1} const res1 = await broker.call('order-service.order', order1) console.log(`order result: ${res1}`) const res2 = await broker.call('order-service.order', order2) console.log(`order result: ${res2}`) } run().catch(err => console.error(err))
実行結果
> node order2.js ・・・ # order-service.order: {"id":"order1","item":"item1","qty":2} ## order-check-service order.ordered: {"id":"order1","item":"item1","qty":2} ### item-service.check: {"id":"order1","item":"item1","qty":2} order result: order1 # order-service.order: {"id":"order2","item":"item3","qty":1} ## order-check-service order.ordered: {"id":"order2","item":"item3","qty":1} ### item-service.check: {"id":"order2","item":"item3","qty":1} # order-service order.validated: {"id":"order1","item":"item1","qty":2} order result: order2 # order-service order.invalidated: {"id":"order2","item":"item3","qty":1} ・・・
(2) メッセージブローカー(NATS Streaming)利用
次に、別ノードのサービスとメッセージブローカー経由で連携してみます。
処理内容としては Web API を通して受け取ったメッセージを別ノードのサービスに call・emit・broadcast します。
ここでは、メッセージブローカーとして Go言語で実装されており手軽に実行できる NATS Streaming を使用します。
まずは、以下のモジュールを使うのでインストールしておきます。
- moleculer 0.14.2
- moleculer-web 0.9.0
- node-nats-streaming 0.2.6
モジュールインストール例
> npm install --save moleculer moleculer-web node-nats-streaming
Web API としてリクエストを受け付ける処理を実装します。
moleculer-web
を require した結果を mixins
して routes
で HTTP リクエストとそれに応じて実行するアクションをマッピングします。
NATS Streaming を使用するため transporter
に STAN
を設定します。
nodeID
はデフォルトでホスト名とプロセスIDから生成されるようになっていますが、他のノードとの接続に使うので、ここでは明示的に設定しています。
pub.js
const { ServiceBroker } = require('moleculer') const HTTPServer = require('moleculer-web') const broker = new ServiceBroker({ nodeID: 'pub', transporter: 'STAN' }) broker.createService({ name: 'web', mixins: [HTTPServer], settings: { routes: [ { aliases: { 'PUT /pub': 'pub.publish' }} ] } }) broker.createService({ name: 'pub', actions: { async publish(ctx) { const params = ctx.params // アクション呼び出し const res = await ctx.call('sub.direct', params) // イベントの emit ctx.emit('event.emit', params) // イベントの broadcast ctx.broadcast('event.broadcast', params) return res } } }) broker.start().catch(err => console.error(err))
次に、NATS Streaming 経由でアクションの呼び出しやイベント通知を受け取る処理を実装します。
こちらは複数プロセスで実行できるように nodeID をコマンド引数として渡すようにしています。
sub.js
const { ServiceBroker } = require('moleculer') const id = process.argv[2] const broker = new ServiceBroker({ nodeID: id, transporter: 'STAN' }) broker.createService({ name: 'sub', actions: { direct(ctx) { console.log(`* sub.direct: nodeID=${broker.nodeID}, params=${JSON.stringify(ctx.params)}`) return broker.nodeID } }, events: { 'event.*'(ctx) { console.log(`* ${ctx.eventName}: nodeID=${broker.nodeID}, params=${JSON.stringify(ctx.params)}`) } } }) broker.start().catch(err => console.error(err))
動作確認
まずは NATS Streaming を実行しておきます。
NATS Streaming 実行
> nats-streaming-server ・・・ [2668] 2020/02/23 19:27:33.566759 [[32mINF[0m] Server is ready [2668] 2020/02/23 19:27:33.591774 [[32mINF[0m] STREAM: Recovering the state... [2668] 2020/02/23 19:27:33.592779 [[32mINF[0m] STREAM: No recovered state [2668] 2020/02/23 19:27:33.843799 [[32mINF[0m] STREAM: Message store is MEMORY [2668] 2020/02/23 19:27:33.844764 [[32mINF[0m] STREAM: ---------- Store Limits ---------- [2668] 2020/02/23 19:27:33.846740 [[32mINF[0m] STREAM: Channels: 100 * [2668] 2020/02/23 19:27:33.848741 [[32mINF[0m] STREAM: --------- Channels Limits -------- [2668] 2020/02/23 19:27:33.849804 [[32mINF[0m] STREAM: Subscriptions: 1000 * [2668] 2020/02/23 19:27:33.850745 [[32mINF[0m] STREAM: Messages : 1000000 * [2668] 2020/02/23 19:27:33.852756 [[32mINF[0m] STREAM: Bytes : 976.56 MB * [2668] 2020/02/23 19:27:33.853740 [[32mINF[0m] STREAM: Age : unlimited * [2668] 2020/02/23 19:27:33.854745 [[32mINF[0m] STREAM: Inactivity : unlimited * [2668] 2020/02/23 19:27:33.857744 [[32mINF[0m] STREAM: ---------------------------------- [2668] 2020/02/23 19:27:33.858745 [[32mINF[0m] STREAM: Streaming Server is ready
pub.js を起動し、2つ sub.js を実行しておきます。
これによって、NATS Streaming へ MOL.DISCOVER.<nodeID>
、MOL.EVENT.<nodeID>
、MOL.REQ.<nodeID>
等のような様々なチャネルが登録され、ノード間の接続が確立します。
pub.js 実行
> node pub.js ・・・ [2020-02-23T10:29:58.028Z] INFO pub/REGISTRY: Strategy: RoundRobinStrategy ・・・ [2020-02-23T10:29:58.102Z] INFO pub/WEB: Register route to '/' [2020-02-23T10:29:58.158Z] INFO pub/WEB: PUT /pub => pub.publish ・・・ [2020-02-23T10:30:29.936Z] INFO pub/REGISTRY: Node 'sub1' connected. [2020-02-23T10:30:36.918Z] INFO pub/REGISTRY: Node 'sub2' connected.
sub.js 実行1(sub1)
> node sub.js sub1 ・・・ [2020-02-23T10:30:34.026Z] INFO sub1/REGISTRY: Node 'pub' connected. [2020-02-23T10:30:36.919Z] INFO sub1/REGISTRY: Node 'sub2' connected.
sub.js 実行2(sub2)
> node sub.js sub2 ・・・ [2020-02-23T10:30:39.027Z] INFO sub2/REGISTRY: Node 'pub' connected. [2020-02-23T10:30:40.111Z] INFO sub2/REGISTRY: Node 'sub1' connected.
この状態で 2回 HTTP リクエストを実施してみます。
HTTP リクエスト 1回目
$ curl -s -XPUT -H "Content-Type: application/json" http://localhost:3000/pub -d "{\"no\": 1}" "sub1"
HTTP リクエスト 2回目
$ curl -s -XPUT -H "Content-Type: application/json" http://localhost:3000/pub -d "{\"no\": 2}" "sub2"
1回目の call と emit は sub1 側で、2回目は sub2 側で実行しておりロードバランスされています(デフォルトではラウンドロビンとなる)
broadcast は sub1 と sub2 のどちらにも通知されています。
sub.js sub1 の結果
・・・ * sub.direct: nodeID=sub1, params={"no":1} * event.emit: nodeID=sub1, params={"no":1} * event.broadcast: nodeID=sub1, params={"no":1} * event.broadcast: nodeID=sub1, params={"no":2}
sub.js sub2 の結果
・・・ * event.broadcast: nodeID=sub2, params={"no":1} * sub.direct: nodeID=sub2, params={"no":2} * event.emit: nodeID=sub2, params={"no":2} * event.broadcast: nodeID=sub2, params={"no":2}
(3) MongoDB への CRUD を REST API 化
最後に、MongoDB へデータを CRUD する処理を REST API 化してみます。
以下のモジュールを使うのでインストールしておきます。
- moleculer 0.14.2
- moleculer-web 0.9.0
- node-nats-streaming 0.2.6
- moleculer-db 0.8.5
- moleculer-db-adapter-mongo 0.4.7
モジュールインストール例
> npm install --save moleculer moleculer-web node-nats-streaming moleculer-db moleculer-db-adapter-mongo
これまでは createService
でサービスを定義していましたが、ここでは別ファイルでサービスを定義する方法を試してみます。
ServiceBroker の loadServices
を使うと、デフォルトで services/xxx.service.js
(xxx は任意) ファイルをロードして登録してくれます。
server.js
const { ServiceBroker } = require('moleculer') const id = process.argv[2] const broker = new ServiceBroker({ nodeID: id, transporter: 'STAN' }) broker.loadServices() broker.start().catch(err => console.error(err))
REST API のサービスを実装します。
aliases で REST
とするとアクション側で "POST /" のような HTTP メソッドとパスを(rest
項目で)設定する事になります。
services/api.service.js
const HTTPServer = require('moleculer-web') module.exports = { name: 'api', mixins: [HTTPServer], settings: { routes: [ { aliases: { 'REST items': 'item' } } ] } }
moleculer-db
がデフォルトで REST API に対応した CRUD 処理をアクションとして定義してくれているので、今回はこれをそのまま使う事にします。
ここでは entityCreated
等を使って更新時のイベント通知のみを実装しています。
なお、MongoDB の接続文字列やコレクション名は環境変数で設定するようにしています。
services/item.service.js
const DbService = require('moleculer-db') const MongoDBAdapter = require('moleculer-db-adapter-mongo') const mongoUri = process.env['MONGO_URI'] const colName = process.env['MONGO_COLLECTION_NAME'] module.exports = { name: 'item', mixins: [DbService], adapter: new MongoDBAdapter(mongoUri, { useUnifiedTopology: true }), collection: colName, entityCreated(entity, ctx) { console.log(`entityCreated: ${JSON.stringify(entity)}`) ctx.emit('item.created', entity) }, entityUpdated(entity, ctx) { console.log(`entityUpdated: ${JSON.stringify(entity)}`) ctx.emit('item.updated', entity) }, entityRemoved(entity, ctx) { console.log(`entityRemoved: ${JSON.stringify(entity)}`) ctx.emit('item.removed', entity) } }
動作確認用にイベントをハンドリングする処理も用意しました。
handler.js
const { ServiceBroker } = require('moleculer') const id = process.argv[2] const broker = new ServiceBroker({ nodeID: id, transporter: 'STAN' }) broker.createService({ name: 'handler', events: { 'item.*'(ctx) { console.log(`* ${ctx.eventName}: nodeID=${broker.nodeID}, params=${JSON.stringify(ctx.params)}`) } } }) broker.start().catch(err => console.error(err))
動作確認
MongoDB と NATS Streaming を起動しておきます。
MongoDB 実行
> mongod --dbpath itemdb
NATS Streaming 実行
> nats-streaming-server
接続先の MongoDB やコレクション名を環境変数で設定してから、server.js を実行します。
server.js 実行
> set MONGO_URI=mongodb://localhost/sample > set MONGO_COLLECTION_NAME=items > node server.js server1 ・・・ [2020-02-23T11:41:06.978Z] INFO server1/API: Register route to '/' [2020-02-23T11:41:07.031Z] INFO server1/API: GET /items => item.list [2020-02-23T11:41:07.034Z] INFO server1/API: GET /items/:id => item.get [2020-02-23T11:41:07.035Z] INFO server1/API: POST /items => item.create [2020-02-23T11:41:07.036Z] INFO server1/API: PUT /items/:id => item.update [2020-02-23T11:41:07.037Z] INFO server1/API: PATCH /items/:id => item.patch [2020-02-23T11:41:07.038Z] INFO server1/API: DELETE /items/:id => item.remove ・・・ [2020-02-23T11:41:47.389Z] INFO server1/REGISTRY: Node 'handler1' connected.
イベントハンドラも別途実行しておきます。
handler.js 実行
> node handler.js handler1 ・・・ [2020-02-23T11:41:48.322Z] INFO handler1/REGISTRY: Node 'server1' connected.
準備が整ったので REST API を呼び出して CRUD 処理を順番に呼び出してみます。
create 実施
$ curl -s -XPOST -H "Content-Type: application/json" http://localhost:3000/items -d "{\"name\": \"item1\", \"qty\": 1}" {"name":"item1","qty":1,"_id":"5e5266a735ebff2c9083a0af"}
read 実施
$ curl -s http://localhost:3000/items/5e5266a735ebff2c9083a0af {"_id":"5e5266a735ebff2c9083a0af","name":"item1","qty":1}
update 実施
$ curl -s -XPUT -H "Content-Type: application/json" http://localhost:3000/items/5e5266a735ebff2c9083a0af -d "{\"qty\": 2}" {"_id":"5e5266a735ebff2c9083a0af","name":"item1","qty":2}
delete 実施
$ curl -s -XDELETE http://localhost:3000/items/5e5266a735ebff2c9083a0af {"_id":"5e5266a735ebff2c9083a0af","name":"item1","qty":2}
read 実施
$ curl -s http://localhost:3000/items/5e5266a735ebff2c9083a0af { "name": "EntityNotFoundError", "message": "Entity not found", "code": 404, "type": null, "data": { "id": "5e5266a735ebff2c9083a0af" } }
問題なく動作しているようです。 handler.js の出力結果も以下のようになりました。
handler.js の出力結果
・・・ * item.created: nodeID=handler1, params={"name":"item1","qty":1,"_id":"5e5266a735ebff2c9083a0af"} * item.updated: nodeID=handler1, params={"_id":"5e5266a735ebff2c9083a0af","name":"item1","qty":2} * item.removed: nodeID=handler1, params={"_id":"5e5266a735ebff2c9083a0af","name":"item1","qty":2}