MQTT Broker をローカル実行

以下の MQTT Broker をそれぞれローカルで実行してみました。

ソースは http://github.com/fits/try_samples/tree/master/blog/20170910/

Mosca

Mosca は Node.js 用の MQTT Broker です。

npm でインストールして mosca コマンドで実行できます。

インストール例
> npm install mosca
実行例
> mosca -v

       +++.+++:   ,+++    +++;   '+++    +++.
      ++.+++.++   ++.++  ++,'+  `+',++  ++,++
      +`  +,  +: .+  .+  +;  +; '+  '+  +`  +`
      +`  +.  +: ,+  `+  ++  +; '+  ;+  +   +.
      +`  +.  +: ,+  `+   +'    '+      +   +.
      +`  +.  +: ,+  `+   :+.   '+      +++++.
      +`  +.  +: ,+  `+    ++   '+      +++++.
      +`  +.  +: ,+  `+     ++  '+      +   +.
      +`  +.  +: ,+  `+  +:  +: '+  ;+  +   +.
      +`  +.  +: .+  .+  +;  +; '+  '+  +   +.
      +`  +.  +:  ++;++  ++'++   ++'+'  +   +.
      +`  +.  +:   +++    +++.   ,++'   +   +.
{"pid":11260,"hostname":"host1","name":"mosca","level":30,"time":1504448625943,"msg":"server started","mqtt":1883,"v":1}

ログが JSON で出力されていますが、pino ※ を使えば以下のようにログを整形して出力してくれます。

 ※ mosca 2.5.2 を npm install すると pino もインストールされました
実行例 - pino 利用
> mosca -v | pino

       +++.+++:   ,+++    +++;   '+++    +++.
      ++.+++.++   ++.++  ++,'+  `+',++  ++,++
      +`  +,  +: .+  .+  +;  +; '+  '+  +`  +`
      +`  +.  +: ,+  `+  ++  +; '+  ;+  +   +.
      +`  +.  +: ,+  `+   +'    '+      +   +.
      +`  +.  +: ,+  `+   :+.   '+      +++++.
      +`  +.  +: ,+  `+    ++   '+      +++++.
      +`  +.  +: ,+  `+     ++  '+      +   +.
      +`  +.  +: ,+  `+  +:  +: '+  ;+  +   +.
      +`  +.  +: .+  .+  +;  +; '+  '+  +   +.
      +`  +.  +:  ++;++  ++'++   ++'+'  +   +.
      +`  +.  +:   +++    +++.   ,++'   +   +.
[2017-09-03T14:24:23.929Z] INFO (mosca/3124 on host1): server started
    mqtt: 1883

サーバー組み込み実行

Mosca を組み込み実行するコードは以下の通りです。

実際は new mosca.Server() だけでサーバーが起動するのですが、そのままだとクライアントからの接続状況が分かり難いのでログ出力しています。

mosca_run.js
const mosca = require('mosca')

const server = new mosca.Server()

server.on('ready', () => console.log('server started'))

server.on('clientConnected', client => 
    console.log(`client connected: ${client.id}`))

server.on('published', (packet) => 
    console.log(`published: ${JSON.stringify(packet)}`))

クライアント処理

MQTT.js をインストールして MQTT のクライアント処理を実装してみます。

MQTT.js インストール例
> npm install mqtt

まずは、指定のトピックへメッセージを publish する処理です。

publish_sample.js
const mqtt = require('mqtt')
const client = mqtt.connect('mqtt://localhost')

const topic = process.argv[2]
const msg = process.argv[3]

client.on('connect', () => {
    client.publish(topic, msg)

    client.end()
})

次に、指定のトピックを subscribe する処理です。

subscribe_sample.js
const mqtt = require('mqtt')
const client = mqtt.connect('mqtt://localhost')

const topic = process.argv[2]

client.on('connect', () => {
    client.subscribe(topic)
})

client.on('message', (topic, msg) => {
    console.log(`topic: ${topic}, msg: ${msg}`)
})

動作確認

まずは Mosca サーバーを実行しておきます。(mosca コマンドで実行しても可)

サーバー組み込み実行
> node mosca_run.js

server started

次に data トピックを subscribe してみます。

subscribe 実行
> node subscribe_sample.js data

data トピックへ sample1 ~ 3 という文字列を publish してみます。

publish 実行
> node publish_sample.js data sample1
> node publish_sample.js data sample2
> node publish_sample.js data sample3

subscribe 側にメッセージが出力されました。

subscribe の結果
topic: data, msg: sample1
topic: data, msg: sample2
topic: data, msg: sample3

Moquette

Moquette は Java 用の MQTT Broker です。

distribution-0.10-bundle-tar.tar.gz をダウンロード・解凍した後、bin/moquette.bat や bin/moquette.sh で実行できるようですが、moquette.bat の内容に問題があって、そのままでは Java 8 で実行できませんでした。(## の行を削除するか rem を付けて、JAVA_OPTS の設定を削る等が必要でした)

サーバー組み込み実行

組み込み実行は io.moquette.server.Servermain メソッドを呼び出すだけです。

ただし、このままでは IllegalArgumentException: Can't locate file "null" となってしまうので config/moquette.conf ファイルを作成しておきます。(デフォルト設定を使うのなら中身は空でよい)

moquette_run.groovy
@GrabResolver(name = 'bintray', root = 'https://jcenter.bintray.com')
@Grab('io.moquette:moquette-broker:0.10')
@Grab('org.slf4j:slf4j-simple:1.7.25')
import io.moquette.server.Server

Server.main(args)

動作確認

Mosca と同様に動作確認を行ってみます。

サーバー組み込み実行
> groovy moquette_run.groovy

・・・
[main] INFO io.moquette.server.netty.NettyAcceptor - Server has been bound. host=0.0.0.0, port=1883
[main] INFO io.moquette.server.netty.NettyAcceptor - Configuring Websocket MQTT transport
[main] INFO io.moquette.server.netty.NettyAcceptor - Property websocket_port has been setted to disabled. Websocket MQTT will be disabled
[main] INFO io.moquette.server.Server - Moquette server has been initialized successfully
Server started, version 0.10

data トピックを subscribe してみます。

subscribe 実行
> node subscribe_sample.js data

data トピックへ sample1 ~ 3 という文字列を publish してみます。

publish 実行
> node publish_sample.js data sample1
> node publish_sample.js data sample2
> node publish_sample.js data sample3

subscribe 側にメッセージが出力されました。

subscribe の結果
topic: data, msg: sample1
topic: data, msg: sample2
topic: data, msg: sample3