Akka Streams で MQTT Broker へ接続
ローカルで実行した MQTT Broker(前回 参照)に対して Akka Streams の Java 用 API を使って Groovy で接続してみます。
Akka Streams 用の様々なコネクタを備えた Alpakka に MQTT Broker 用の Source や Sink が用意されているので、今回はこちらを使います。
今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170925/
Publish 処理
まずは publish 処理です。
MqttConnectionSettings
から MqttSink
を作成し、MqttMessage
を渡せば MQTT Broker へメッセージを送信できます。
MqttConnectionSettings は MQTT Broker の接続先と clientId
、そして永続化の方法を指定して作成します。
clientId はクライアント毎に一意な値を指定します。 (clientId を null
にすると IllegalArgumentException: Null clientId
となりました)
MqttQoS ではメッセージ到達の QoS を指定します。
MqttQoS.atLeastOnce()
は少なくとも 1回(重複の可能性あり)の到達可能性を指定する事になります。
@Grab('com.typesafe.akka:akka-stream_2.12:2.5.4') @Grab('com.lightbend.akka:akka-stream-alpakka-mqtt_2.12:0.11') import akka.actor.ActorSystem import akka.stream.ActorMaterializer import akka.stream.javadsl.Source import akka.stream.alpakka.mqtt.MqttQoS import akka.stream.alpakka.mqtt.MqttConnectionSettings import akka.stream.alpakka.mqtt.MqttMessage import akka.stream.alpakka.mqtt.javadsl.MqttSink import akka.util.ByteString def topic = args[0] def clientId = args[1] def message = args[2] def system = ActorSystem.create() def mat = ActorMaterializer.create(system) def settings = MqttConnectionSettings.create( 'tcp://localhost:1883', clientId, new org.eclipse.paho.client.mqttv3.persist.MemoryPersistence() ) def msg = MqttMessage.create(topic, ByteString.fromString(message)) Source.single(msg) .runWith(MqttSink.create(settings, MqttQoS.atLeastOnce()), mat) // メッセージ送信前に terminate しないようスリープで調整 sleep 1000 system.terminate()
Subscribe 処理
次に subscribe 処理です。
MqttConnectionSettings から MqttSourceSettings
を作り、MqttSource
を作成します。
MqttSourceSettings の withSubscriptions
で subscribe するトピック名と QoS を指定します。
mqtt_subscribe.groovy
@Grab('com.typesafe.akka:akka-stream_2.12:2.5.4') @Grab('com.lightbend.akka:akka-stream-alpakka-mqtt_2.12:0.11') import akka.actor.ActorSystem import akka.japi.Pair import akka.stream.ActorMaterializer import akka.stream.javadsl.Sink import akka.stream.alpakka.mqtt.MqttQoS import akka.stream.alpakka.mqtt.MqttSourceSettings import akka.stream.alpakka.mqtt.MqttConnectionSettings import akka.stream.alpakka.mqtt.javadsl.MqttSource def topic = args[0] def clientId = args[1] def system = ActorSystem.create() def mat = ActorMaterializer.create(system) def settings = MqttSourceSettings.create( MqttConnectionSettings.create( 'tcp://localhost:1883', clientId, new org.eclipse.paho.client.mqttv3.persist.MemoryPersistence() ) ).withSubscriptions( // トピックと QoS の指定 Pair.create(topic, MqttQoS.atLeastOnce()) ) MqttSource.create(settings, 10).runWith(Sink.foreach { println it }, mat) println "subscribe : ${topic}" System.in.read() system.terminate()
上記の処理では、再接続の際にセッションがクリアされてしまうので、通信が切断している間のメッセージを後で受け取るような事はできません。
再接続の際にセッションが復元される Persistent Session を適用するには、以下のように withCleanSession(false)
とする必要があります。
なお、Persistent Session を適用するには同じ clientId を使って再接続する必要があります。
mqtt_subscribe2.groovy (Persistent Session 版)
・・・ def settings = MqttSourceSettings.create( MqttConnectionSettings.create( 'tcp://localhost:1883', clientId, new org.eclipse.paho.client.mqttv3.persist.MemoryPersistence() ).withCleanSession(false) // Persistent Session ).withSubscriptions( Pair.create(topic, MqttQoS.atLeastOnce()) ) ・・・
動作確認
前回 の Moquette 組み込み実行スクリプトで MQTT Broker を起動しておきます。
MQTT Broker 実行
> groovy moquette_run.groovy ・・・ [main] INFO io.moquette.server.netty.NettyAcceptor - Server has been bound. host=0.0.0.0, port=1883 ・・・ Server started, version 0.10
Subscribe 処理を実行します。
動作確認のため Persistent Session 版も実行していますが、(Publish 処理も含め) clientId へ異なる値をそれぞれ指定します。(ここでは subscribe1・2 と publish1 としています)
また、トピック名は sample とします。
(a) Subscribe1 実行
> groovy mqtt_subscribe.groovy sample subscribe1 subscribe: sample
(b) Subscribe2(Persistent Session 版)実行
> groovy mqtt_subscribe2.groovy sample subscribe2 subscribe: sample
この状態で “a” “ab” “abc” という 3つのメッセージを publish してみます。
Publish 実行
> groovy mqtt_publish.groovy sample publish1 a > groovy mqtt_publish.groovy sample publish1 ab > groovy mqtt_publish.groovy sample publish1 abc
Subscribe の結果は以下のようになりました。
(a) Subscribe1 状況
> groovy mqtt_subscribe.groovy sample subscribe1 subscribe: sample MqttMessage(sample,ByteString(97)) MqttMessage(sample,ByteString(97, 98)) MqttMessage(sample,ByteString(97, 98, 99))
(b) Subscribe2(Persistent Session 版)状況
> groovy mqtt_subscribe2.groovy sample subscribe2 subscribe: sample MqttMessage(sample,ByteString(97)) MqttMessage(sample,ByteString(97, 98)) MqttMessage(sample,ByteString(97, 98, 99))
ここで (a) と (b) の処理を一度終了しておき、その状態で publish してみます。
Publish 実行
> groovy mqtt_publish.groovy sample publish1 abcd > groovy mqtt_publish.groovy sample publish1 abcde
(a) と (b) を再実行すると、以下のように Persistent Session 版の方は停止中に publish されたメッセージを取得できました。
(a) Subscribe1 再実行
> groovy mqtt_subscribe.groovy sample subscribe1 subscribe: sample
(b) Subscribe2(Persistent Session 版)再実行
> groovy mqtt_subscribe2.groovy sample subscribe2 subscribe: sample MqttMessage(sample,ByteString(97, 98, 99, 100)) MqttMessage(sample,ByteString(97, 98, 99, 100, 101))