Node.js のマイクロサービスフレームワーク Moleculer

Moleculer という名のなかなか期待できそうな Node.js 用マイクロサービスフレームワークを見つけたのでご紹介します。

ロードバランサー、サーキットブレーカー、メトリクス収集、トレーシング、キャッシュ機能等、多彩な機能を備えていますが。

個人的に、イベント駆動アーキテクチャを採用し Kafka や NATS 等のメッセージブローカーと容易に連携できる事から、コレオグラフィ型のマイクロサービスを手軽に作れそうな点に注目しています。

なんとなく Akka と似ている気もしますが、Akka を使ったマイクロサービスフレームワークLagom などと比べても、簡単かつ手軽に使えるのが利点かと思います。

今回のソースは http://github.com/fits/try_samples/tree/master/blog/20200224/

(1) ローカルアプリケーション

とりあえず簡単なローカルアプリケーションを作成してみます。

以下のモジュールを使うのでインストールしておきます。

  • moleculer 0.14.2
moleculer インストール例
> npm install --save moleculer

Moleculer では、メソッド・アクション・イベントハンドリング等を定義した ServiceServiceBroker へ登録する事で処理を組み立てます。

ServiceBroker はノード毎にインスタンスを作る事になり、アクションの呼び出しやイベントの通知、別ノードとの連携等を担います。

アクションは call('<サービス名>.<アクション名>', <パラメータ>) で呼び出す事ができ、アクションで return した値が call の戻り値となります。

イベントの発火(通知)は emit('<イベント名>', <イベント内容>)broadcast('<イベント名>', <イベント内容>) ※ で行います。

 ※ emit と broadcast の違いは、
    broadcast は全ノードの全サービスへ通知されますが、
    emit は同じサービスを複数のノードで実行していると
    その中の 1つのノードへ通知されます
order1.js
const { ServiceBroker } = require('moleculer')

const broker = new ServiceBroker()

broker.createService({
    name: 'order-service',
    actions: {
        order(ctx) {
            console.log(`# order-service.order: ${JSON.stringify(ctx.params)}`)
            // order.ordered イベントの発火
            ctx.emit('order.ordered', ctx.params)

            return ctx.params.id
        }
    }
})

broker.createService({
    name: 'order-check-service',
    events: {
        // order.ordered イベントのハンドリング
        'order.ordered'(ctx) {
            console.log(`## order-check-service ${ctx.eventName}: ${JSON.stringify(ctx.params)}`)
        }
    }
})

const run = async () => {
    await broker.start()

    const order1 = {id: 'order1', item: 'item1', qty: 2}
    // order アクションの呼び出し
    const res1 = await broker.call('order-service.order', order1)

    console.log(`order result: ${res1}`)
}

run().catch(err => console.error(err))
実行結果
> node order1.js

[2020-02-18T14:08:35.336Z] INFO  ・・・: Moleculer v0.14.2 is starting...
・・・
# order-service.order: {"id":"order1","item":"item1","qty":2}
## order-check-service order.ordered: {"id":"order1","item":"item1","qty":2}
order result: order1
・・・
[2020-02-18T14:08:35.440Z] INFO  ・・・: ServiceBroker is stopped. Good bye.

上記の処理へサービスを 1つ追加してみます。

イベントのハンドリングではワイルドカード * も使えるようになっていますので使ってみます。

order2.js
const { ServiceBroker } = require('moleculer')

const broker = new ServiceBroker()

broker.createService({
    name: 'order-service',
    actions: {
        order(ctx) {
            console.log(`# order-service.order: ${JSON.stringify(ctx.params)}`)
            ctx.emit('order.ordered', ctx.params)
            return ctx.params.id
        }
    },
    events: {
        // order.validated と order.invalidated イベントをハンドリング
        'order.*validated'(ctx) {
            console.log(`# order-service ${ctx.eventName}: ${JSON.stringify(ctx.params)}`)
        }
    }
})

broker.createService({
    name: 'order-check-service',
    events: {
        async 'order.ordered'(ctx) {
            console.log(`## order-check-service ${ctx.eventName}: ${JSON.stringify(ctx.params)}`)

            // item-service の check アクション呼び出し
            const isValid = await ctx.call('item-service.check', ctx.params)

            if (isValid) {
                ctx.emit('order.validated', ctx.params)
            }
            else {
                ctx.emit('order.invalidated', ctx.params)
            }
        }
    }
})

broker.createService({
    name: 'item-service',
    actions: {
        check(ctx) {
            console.log(`### item-service.check: ${JSON.stringify(ctx.params)}`)
            return ['item1', 'item2'].includes(ctx.params.item)
        }
    }
})

const run = async () => {
    await broker.start()

    const order1 = {id: 'order1', item: 'item1', qty: 2}
    const order2 = {id: 'order2', item: 'item3', qty: 1}

    const res1 = await broker.call('order-service.order', order1)

    console.log(`order result: ${res1}`)

    const res2 = await broker.call('order-service.order', order2)

    console.log(`order result: ${res2}`)
}

run().catch(err => console.error(err))
実行結果
> node order2.js

・・・
# order-service.order: {"id":"order1","item":"item1","qty":2}
## order-check-service order.ordered: {"id":"order1","item":"item1","qty":2}
### item-service.check: {"id":"order1","item":"item1","qty":2}
order result: order1
# order-service.order: {"id":"order2","item":"item3","qty":1}
## order-check-service order.ordered: {"id":"order2","item":"item3","qty":1}
### item-service.check: {"id":"order2","item":"item3","qty":1}
# order-service order.validated: {"id":"order1","item":"item1","qty":2}
order result: order2
# order-service order.invalidated: {"id":"order2","item":"item3","qty":1}
・・・

(2) メッセージブローカー(NATS Streaming)利用

次に、別ノードのサービスとメッセージブローカー経由で連携してみます。

処理内容としては Web API を通して受け取ったメッセージを別ノードのサービスに call・emit・broadcast します。

ここでは、メッセージブローカーとして Go言語で実装されており手軽に実行できる NATS Streaming を使用します。

まずは、以下のモジュールを使うのでインストールしておきます。

  • moleculer 0.14.2
  • moleculer-web 0.9.0
  • node-nats-streaming 0.2.6
モジュールインストール例
> npm install --save moleculer moleculer-web node-nats-streaming

Web API としてリクエストを受け付ける処理を実装します。

moleculer-web を require した結果を mixins して routes で HTTP リクエストとそれに応じて実行するアクションをマッピングします。

NATS Streaming を使用するため transporterSTAN を設定します。

nodeID はデフォルトでホスト名とプロセスIDから生成されるようになっていますが、他のノードとの接続に使うので、ここでは明示的に設定しています。

pub.js
const { ServiceBroker } = require('moleculer')

const HTTPServer = require('moleculer-web')

const broker = new ServiceBroker({
    nodeID: 'pub',
    transporter: 'STAN'
})

broker.createService({
    name: 'web',
    mixins: [HTTPServer],
    settings: {
        routes: [
            { aliases: {
                'PUT /pub': 'pub.publish'
            }}
        ]
    }
})

broker.createService({
    name: 'pub',
    actions: {
        async publish(ctx) {
            const params = ctx.params

            // アクション呼び出し
            const res = await ctx.call('sub.direct', params)

            // イベントの emit
            ctx.emit('event.emit', params)
            // イベントの broadcast
            ctx.broadcast('event.broadcast', params)

            return res
        }
    }
})

broker.start().catch(err => console.error(err))

次に、NATS Streaming 経由でアクションの呼び出しやイベント通知を受け取る処理を実装します。

こちらは複数プロセスで実行できるように nodeID をコマンド引数として渡すようにしています。

sub.js
const { ServiceBroker } = require('moleculer')

const id = process.argv[2]

const broker = new ServiceBroker({
    nodeID: id,
    transporter: 'STAN'
})

broker.createService({
    name: 'sub',
    actions: {
        direct(ctx) {
            console.log(`* sub.direct: nodeID=${broker.nodeID}, params=${JSON.stringify(ctx.params)}`)
            return broker.nodeID
        }
    },
    events: {
        'event.*'(ctx) {
            console.log(`* ${ctx.eventName}: nodeID=${broker.nodeID}, params=${JSON.stringify(ctx.params)}`)
        }
    }
})

broker.start().catch(err => console.error(err))

動作確認

まずは NATS Streaming を実行しておきます。

NATS Streaming 実行
> nats-streaming-server

・・・
[2668] 2020/02/23 19:27:33.566759 [[32mINF[0m] Server is ready
[2668] 2020/02/23 19:27:33.591774 [[32mINF[0m] STREAM: Recovering the state...
[2668] 2020/02/23 19:27:33.592779 [[32mINF[0m] STREAM: No recovered state
[2668] 2020/02/23 19:27:33.843799 [[32mINF[0m] STREAM: Message store is MEMORY
[2668] 2020/02/23 19:27:33.844764 [[32mINF[0m] STREAM: ---------- Store Limits ----------
[2668] 2020/02/23 19:27:33.846740 [[32mINF[0m] STREAM: Channels:                  100 *
[2668] 2020/02/23 19:27:33.848741 [[32mINF[0m] STREAM: --------- Channels Limits --------
[2668] 2020/02/23 19:27:33.849804 [[32mINF[0m] STREAM:   Subscriptions:          1000 *
[2668] 2020/02/23 19:27:33.850745 [[32mINF[0m] STREAM:   Messages     :       1000000 *
[2668] 2020/02/23 19:27:33.852756 [[32mINF[0m] STREAM:   Bytes        :     976.56 MB *
[2668] 2020/02/23 19:27:33.853740 [[32mINF[0m] STREAM:   Age          :     unlimited *
[2668] 2020/02/23 19:27:33.854745 [[32mINF[0m] STREAM:   Inactivity   :     unlimited *
[2668] 2020/02/23 19:27:33.857744 [[32mINF[0m] STREAM: ----------------------------------
[2668] 2020/02/23 19:27:33.858745 [[32mINF[0m] STREAM: Streaming Server is ready

pub.js を起動し、2つ sub.js を実行しておきます。

これによって、NATS Streaming へ MOL.DISCOVER.<nodeID>MOL.EVENT.<nodeID>MOL.REQ.<nodeID> 等のような様々なチャネルが登録され、ノード間の接続が確立します。

pub.js 実行
> node pub.js

・・・
[2020-02-23T10:29:58.028Z] INFO  pub/REGISTRY: Strategy: RoundRobinStrategy
・・・
[2020-02-23T10:29:58.102Z] INFO  pub/WEB: Register route to '/'
[2020-02-23T10:29:58.158Z] INFO  pub/WEB:      PUT /pub => pub.publish
・・・
[2020-02-23T10:30:29.936Z] INFO  pub/REGISTRY: Node 'sub1' connected.
[2020-02-23T10:30:36.918Z] INFO  pub/REGISTRY: Node 'sub2' connected.
sub.js 実行1(sub1)
> node sub.js sub1

・・・
[2020-02-23T10:30:34.026Z] INFO  sub1/REGISTRY: Node 'pub' connected.
[2020-02-23T10:30:36.919Z] INFO  sub1/REGISTRY: Node 'sub2' connected.
sub.js 実行2(sub2)
> node sub.js sub2

・・・
[2020-02-23T10:30:39.027Z] INFO  sub2/REGISTRY: Node 'pub' connected.
[2020-02-23T10:30:40.111Z] INFO  sub2/REGISTRY: Node 'sub1' connected.

この状態で 2回 HTTP リクエストを実施してみます。

HTTP リクエスト 1回目
$ curl -s -XPUT -H "Content-Type: application/json" http://localhost:3000/pub -d "{\"no\": 1}"
"sub1"
HTTP リクエスト 2回目
$ curl -s -XPUT -H "Content-Type: application/json" http://localhost:3000/pub -d "{\"no\": 2}"
"sub2"

1回目の call と emit は sub1 側で、2回目は sub2 側で実行しておりロードバランスされています(デフォルトではラウンドロビンとなる)

broadcast は sub1 と sub2 のどちらにも通知されています。

sub.js sub1 の結果
・・・
* sub.direct: nodeID=sub1, params={"no":1}
* event.emit: nodeID=sub1, params={"no":1}
* event.broadcast: nodeID=sub1, params={"no":1}
* event.broadcast: nodeID=sub1, params={"no":2}
sub.js sub2 の結果
・・・
* event.broadcast: nodeID=sub2, params={"no":1}
* sub.direct: nodeID=sub2, params={"no":2}
* event.emit: nodeID=sub2, params={"no":2}
* event.broadcast: nodeID=sub2, params={"no":2}

(3) MongoDB への CRUDREST API

最後に、MongoDB へデータを CRUD する処理を REST API 化してみます。

以下のモジュールを使うのでインストールしておきます。

  • moleculer 0.14.2
  • moleculer-web 0.9.0
  • node-nats-streaming 0.2.6
  • moleculer-db 0.8.5
  • moleculer-db-adapter-mongo 0.4.7
モジュールインストール例
> npm install --save moleculer moleculer-web node-nats-streaming moleculer-db moleculer-db-adapter-mongo

これまでは createService でサービスを定義していましたが、ここでは別ファイルでサービスを定義する方法を試してみます。

ServiceBroker の loadServices を使うと、デフォルトで services/xxx.service.js (xxx は任意) ファイルをロードして登録してくれます。

server.js
const { ServiceBroker } = require('moleculer')

const id = process.argv[2]

const broker = new ServiceBroker({
    nodeID: id,
    transporter: 'STAN'
})

broker.loadServices()

broker.start().catch(err => console.error(err))

REST API のサービスを実装します。

aliases で REST とするとアクション側で "POST /" のような HTTP メソッドとパスを(rest 項目で)設定する事になります。

services/api.service.js
const HTTPServer = require('moleculer-web')

module.exports = {
    name: 'api',
    mixins: [HTTPServer],
    settings: {
        routes: [
            { aliases: { 'REST items': 'item' } }
        ]
    }
}

moleculer-db がデフォルトで REST API に対応した CRUD 処理をアクションとして定義してくれているので、今回はこれをそのまま使う事にします。

ここでは entityCreated 等を使って更新時のイベント通知のみを実装しています。

なお、MongoDB の接続文字列やコレクション名は環境変数で設定するようにしています。

services/item.service.js
const DbService = require('moleculer-db')
const MongoDBAdapter = require('moleculer-db-adapter-mongo')

const mongoUri = process.env['MONGO_URI']
const colName = process.env['MONGO_COLLECTION_NAME']

module.exports = {
    name: 'item',
    mixins: [DbService],
    adapter: new MongoDBAdapter(mongoUri, {
         useUnifiedTopology: true
    }),
    collection: colName,
    entityCreated(entity, ctx) {
        console.log(`entityCreated: ${JSON.stringify(entity)}`)
        ctx.emit('item.created', entity)
    },
    entityUpdated(entity, ctx) {
        console.log(`entityUpdated: ${JSON.stringify(entity)}`)
        ctx.emit('item.updated', entity)
    },
    entityRemoved(entity, ctx) {
        console.log(`entityRemoved: ${JSON.stringify(entity)}`)
        ctx.emit('item.removed', entity)
    }
}

動作確認用にイベントをハンドリングする処理も用意しました。

handler.js
const { ServiceBroker } = require('moleculer')

const id = process.argv[2]

const broker = new ServiceBroker({
    nodeID: id,
    transporter: 'STAN'
})

broker.createService({
    name: 'handler',
    events: {
        'item.*'(ctx) {
            console.log(`* ${ctx.eventName}: nodeID=${broker.nodeID}, params=${JSON.stringify(ctx.params)}`)
        }
    }
})

broker.start().catch(err => console.error(err))

動作確認

MongoDB と NATS Streaming を起動しておきます。

MongoDB 実行
> mongod --dbpath itemdb
NATS Streaming 実行
> nats-streaming-server

接続先の MongoDB やコレクション名を環境変数で設定してから、server.js を実行します。

server.js 実行
> set MONGO_URI=mongodb://localhost/sample
> set MONGO_COLLECTION_NAME=items

> node server.js server1

・・・
[2020-02-23T11:41:06.978Z] INFO  server1/API: Register route to '/'
[2020-02-23T11:41:07.031Z] INFO  server1/API:      GET /items => item.list
[2020-02-23T11:41:07.034Z] INFO  server1/API:      GET /items/:id => item.get
[2020-02-23T11:41:07.035Z] INFO  server1/API:     POST /items => item.create
[2020-02-23T11:41:07.036Z] INFO  server1/API:      PUT /items/:id => item.update
[2020-02-23T11:41:07.037Z] INFO  server1/API:    PATCH /items/:id => item.patch
[2020-02-23T11:41:07.038Z] INFO  server1/API:   DELETE /items/:id => item.remove
・・・
[2020-02-23T11:41:47.389Z] INFO  server1/REGISTRY: Node 'handler1' connected.

イベントハンドラも別途実行しておきます。

handler.js 実行
> node handler.js handler1

・・・
[2020-02-23T11:41:48.322Z] INFO  handler1/REGISTRY: Node 'server1' connected.

準備が整ったので REST API を呼び出して CRUD 処理を順番に呼び出してみます。

create 実施
$ curl -s -XPOST -H "Content-Type: application/json" http://localhost:3000/items -d "{\"name\": \"item1\", \"qty\": 1}"

{"name":"item1","qty":1,"_id":"5e5266a735ebff2c9083a0af"}
read 実施
$ curl -s http://localhost:3000/items/5e5266a735ebff2c9083a0af

{"_id":"5e5266a735ebff2c9083a0af","name":"item1","qty":1}
update 実施
$ curl -s -XPUT -H "Content-Type: application/json" http://localhost:3000/items/5e5266a735ebff2c9083a0af -d "{\"qty\": 2}"

{"_id":"5e5266a735ebff2c9083a0af","name":"item1","qty":2}
delete 実施
$ curl -s -XDELETE http://localhost:3000/items/5e5266a735ebff2c9083a0af

{"_id":"5e5266a735ebff2c9083a0af","name":"item1","qty":2}
read 実施
$ curl -s http://localhost:3000/items/5e5266a735ebff2c9083a0af

{
  "name": "EntityNotFoundError",
  "message": "Entity not found",
  "code": 404,
  "type": null,
  "data": {
    "id": "5e5266a735ebff2c9083a0af"
  }
}

問題なく動作しているようです。 handler.js の出力結果も以下のようになりました。

handler.js の出力結果
・・・
* item.created: nodeID=handler1, params={"name":"item1","qty":1,"_id":"5e5266a735ebff2c9083a0af"}
* item.updated: nodeID=handler1, params={"_id":"5e5266a735ebff2c9083a0af","name":"item1","qty":2}
* item.removed: nodeID=handler1, params={"_id":"5e5266a735ebff2c9083a0af","name":"item1","qty":2}