Akka Streams で MQTT Broker へ接続

ローカルで実行した MQTT Broker(前回 参照)に対して Akka Streams の JavaAPI を使って Groovy で接続してみます。

Akka Streams 用の様々なコネクタを備えた Alpakka に MQTT Broker 用の Source や Sink が用意されているので、今回はこちらを使います。

今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170925/

Publish 処理

まずは publish 処理です。

MqttConnectionSettings から MqttSink を作成し、MqttMessage を渡せば MQTT Broker へメッセージを送信できます。

MqttConnectionSettings は MQTT Broker の接続先と clientId、そして永続化の方法を指定して作成します。

clientId はクライアント毎に一意な値を指定します。 (clientId を null にすると IllegalArgumentException: Null clientId となりました)

MqttQoS ではメッセージ到達の QoS を指定します。 MqttQoS.atLeastOnce() は少なくとも 1回(重複の可能性あり)の到達可能性を指定する事になります。

@Grab('com.typesafe.akka:akka-stream_2.12:2.5.4')
@Grab('com.lightbend.akka:akka-stream-alpakka-mqtt_2.12:0.11')
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.javadsl.Source
import akka.stream.alpakka.mqtt.MqttQoS
import akka.stream.alpakka.mqtt.MqttConnectionSettings
import akka.stream.alpakka.mqtt.MqttMessage
import akka.stream.alpakka.mqtt.javadsl.MqttSink
import akka.util.ByteString

def topic = args[0]
def clientId = args[1]
def message = args[2]

def system = ActorSystem.create()
def mat = ActorMaterializer.create(system)

def settings = MqttConnectionSettings.create(
    'tcp://localhost:1883',
    clientId,
    new org.eclipse.paho.client.mqttv3.persist.MemoryPersistence()
)

def msg = MqttMessage.create(topic, ByteString.fromString(message))

Source.single(msg)
    .runWith(MqttSink.create(settings, MqttQoS.atLeastOnce()), mat)

// メッセージ送信前に terminate しないようスリープで調整
sleep 1000

system.terminate()

Subscribe 処理

次に subscribe 処理です。

MqttConnectionSettings から MqttSourceSettings を作り、MqttSource を作成します。

MqttSourceSettings の withSubscriptions で subscribe するトピック名と QoS を指定します。

mqtt_subscribe.groovy
@Grab('com.typesafe.akka:akka-stream_2.12:2.5.4')
@Grab('com.lightbend.akka:akka-stream-alpakka-mqtt_2.12:0.11')
import akka.actor.ActorSystem
import akka.japi.Pair
import akka.stream.ActorMaterializer
import akka.stream.javadsl.Sink
import akka.stream.alpakka.mqtt.MqttQoS
import akka.stream.alpakka.mqtt.MqttSourceSettings
import akka.stream.alpakka.mqtt.MqttConnectionSettings
import akka.stream.alpakka.mqtt.javadsl.MqttSource

def topic = args[0]
def clientId = args[1]

def system = ActorSystem.create()
def mat = ActorMaterializer.create(system)

def settings = MqttSourceSettings.create(
    MqttConnectionSettings.create(
        'tcp://localhost:1883',
        clientId,
        new org.eclipse.paho.client.mqttv3.persist.MemoryPersistence()
    )
).withSubscriptions(
    // トピックと QoS の指定
    Pair.create(topic, MqttQoS.atLeastOnce())
)

MqttSource.create(settings, 10).runWith(Sink.foreach { println it }, mat)

println "subscribe : ${topic}"

System.in.read()

system.terminate()

上記の処理では、再接続の際にセッションがクリアされてしまうので、通信が切断している間のメッセージを後で受け取るような事はできません。

再接続の際にセッションが復元される Persistent Session を適用するには、以下のように withCleanSession(false) とする必要があります。

なお、Persistent Session を適用するには同じ clientId を使って再接続する必要があります。

mqtt_subscribe2.groovy (Persistent Session 版)
・・・

def settings = MqttSourceSettings.create(
    MqttConnectionSettings.create(
        'tcp://localhost:1883',
        clientId,
        new org.eclipse.paho.client.mqttv3.persist.MemoryPersistence()
    ).withCleanSession(false) // Persistent Session
).withSubscriptions(
    Pair.create(topic, MqttQoS.atLeastOnce())
)

・・・

動作確認

前回 の Moquette 組み込み実行スクリプトで MQTT Broker を起動しておきます。

MQTT Broker 実行
> groovy moquette_run.groovy

・・・
[main] INFO io.moquette.server.netty.NettyAcceptor - Server has been bound. host=0.0.0.0, port=1883
・・・
Server started, version 0.10

Subscribe 処理を実行します。

動作確認のため Persistent Session 版も実行していますが、(Publish 処理も含め) clientId へ異なる値をそれぞれ指定します。(ここでは subscribe1・2 と publish1 としています)

また、トピック名は sample とします。

(a) Subscribe1 実行
> groovy mqtt_subscribe.groovy sample subscribe1

subscribe: sample
(b) Subscribe2(Persistent Session 版)実行
> groovy mqtt_subscribe2.groovy sample subscribe2

subscribe: sample

この状態で “a” “ab” “abc” という 3つのメッセージを publish してみます。

Publish 実行
> groovy mqtt_publish.groovy sample publish1 a
> groovy mqtt_publish.groovy sample publish1 ab
> groovy mqtt_publish.groovy sample publish1 abc

Subscribe の結果は以下のようになりました。

(a) Subscribe1 状況
> groovy mqtt_subscribe.groovy sample subscribe1

subscribe: sample
MqttMessage(sample,ByteString(97))
MqttMessage(sample,ByteString(97, 98))
MqttMessage(sample,ByteString(97, 98, 99))
(b) Subscribe2(Persistent Session 版)状況
> groovy mqtt_subscribe2.groovy sample subscribe2

subscribe: sample
MqttMessage(sample,ByteString(97))
MqttMessage(sample,ByteString(97, 98))
MqttMessage(sample,ByteString(97, 98, 99))

ここで (a) と (b) の処理を一度終了しておき、その状態で publish してみます。

Publish 実行
> groovy mqtt_publish.groovy sample publish1 abcd
> groovy mqtt_publish.groovy sample publish1 abcde

(a) と (b) を再実行すると、以下のように Persistent Session 版の方は停止中に publish されたメッセージを取得できました。

(a) Subscribe1 再実行
> groovy mqtt_subscribe.groovy sample subscribe1

subscribe: sample
(b) Subscribe2(Persistent Session 版)再実行
> groovy mqtt_subscribe2.groovy sample subscribe2

subscribe: sample
MqttMessage(sample,ByteString(97, 98, 99, 100))
MqttMessage(sample,ByteString(97, 98, 99, 100, 101))