R の MXNet で iris を分類
「MXNet で iris を分類」 と同様の処理を R言語で実装してみました。
ソースは http://github.com/fits/try_samples/tree/master/blog/20171212/
準備
今回は下記サイトの手順に従って MXNet R パッケージの CPU 版を Windows へインストールしました。
MXNet R パッケージの CPU 版を Windows へインストール
cran <- getOption("repos") cran["dmlc"] <- "https://apache-mxnet.s3-accelerate.dualstack.amazonaws.com/R/CRAN/" options(repos = cran) install.packages("mxnet")
インストールした mxnet のバージョンが少し古いようですが(現時点の MXNet 最新バージョンは 1.0)、今回はこれを使います。
バージョン確認
> packageVersion('mxnet') [1] ‘0.10.1’
学習と評価
MXNet には、階層型ニューラルネットワークの学習処理を簡単に実行するための関数 mx.mlp
が用意されているので、今回はこれを使います。
引数 | 備考 |
---|---|
hidden_node | 隠れ層のノード(ニューロン)数(デフォルトは 1) |
out_node | 出力ノード数(今回は分類数) |
num.round | 繰り返し回数(デフォルトは 10) |
array.batch.size | バッチサイズ(デフォルトは 128) |
learning.rate | 学習係数 |
activation | 活性化関数(デフォルトは tanh) |
hidden_node
にベクトル(例. c(6, 4)
)を設定すれば隠れ層が複数階層化されるようです。
iris のデータセットは R に用意されているものを使います。
mx.mlp の入力データには mx.io.DataIter か R の配列 / 行列を使う必要があるようなので(ラベルデータは配列のみ)、data.matrix
で行列化しています。
ラベルデータとする iris の種別 iris$Species
は因子型ですが、mxnet では因子型を扱えないようなので as.numeric
で数値化しています。
ここで as.numeric の結果は 1 ~ 3 の数値になりますが、mxnet で 3種類の分類を行うには 0 ~ 2 でなければならないようなので -1 しています。
一方、predict
の結果を max.col(t(<predictの結果>))
で処理すると 1 ~ 3 の数値になるため、評価用のラベルデータは -1 せずに正解率の算出に使っています。
また、array.layout = 'rowmajor'
は Warning message 抑制のために設定しています。
iris_hnn.R
library(mxnet) train_size = 0.7 n = nrow(iris) # 1 ~ n から無作為に n * train_size 個を抽出 perm = sample(n, size = round(n * train_size)) # 学習用データ train <- iris[perm, ] # 評価用データ test <-iris[-perm, ] # 学習用入力データ train.x <- data.matrix(train[1:4]) # 学習用ラベルデータ(0 ~ 2) train.y <- as.numeric(train$Species) - 1 # 評価用入力データ test.x <- data.matrix(test[1:4]) # 評価用ラベルデータ(1 ~ 3) test.y <- as.numeric(test$Species) mx.set.seed(0) # 学習 model <- mx.mlp(train.x, train.y, hidden_node = 5, out_node = 3, num.round = 100, learning.rate = 0.1, array.batch.size = 10, activation = 'relu', array.layout = 'rowmajor', eval.metric = mx.metric.accuracy) # 評価 pred <- predict(model, test.x, array.layout = 'rowmajor') # 評価用データの分類結果(1 ~ 3) pred.y <- max.col(t(pred)) # 評価データの正解率を算出 acc <- sum(pred.y == test.y) / length(pred.y) print(acc)
実行結果は以下の通り。
実行結果
・・・ > model <- mx.mlp(train.x, train.y, + hidden_node = 5, + out_node = 3, + num.round = 100, + learning.rate = 0.1, + array.batch.size = 10, + activation = 'relu', + array.layout = 'rowmajor', + eval.metric = mx.metric.accuracy) Start training with 1 devices [1] Train-accuracy=0.32 [2] Train-accuracy=0.281818181818182 ・・・ [99] Train-accuracy=0.954545454545455 [100] Train-accuracy=0.954545454545455 ・・・ > print(acc) [1] 0.9555556
備考
predict
の実行結果は以下のような内容となっています。
> pred [,1] [,2] [,3] [,4] [1,] 0.968931615 0.968931615 0.968931615 0.968931615 [2,] 0.029328469 0.029328469 0.029328469 0.029328469 [3,] 0.001739914 0.001739914 0.001739914 0.001739914 [,5] [,6] [,7] [,8] [1,] 0.968931615 0.968931615 0.968931615 0.968931615 [2,] 0.029328469 0.029328469 0.029328469 0.029328469 [3,] 0.001739914 0.001739914 0.001739914 0.001739914 ・・・ [,41] [,42] [,43] [,44] [1,] 1.762393e-08 7.670556e-06 5.799695e-06 9.349569e-12 [2,] 3.053433e-02 2.679898e-01 1.714197e-01 7.250102e-05 [3,] 9.694657e-01 7.320026e-01 8.285745e-01 9.999275e-01 [,45] [1,] 4.420018e-08 [2,] 8.569881e-03 [3,] 9.914301e-01
1 ~ 3 の中で最も数値の高いものが分類結果となりますので、上記を t
で転置して max.col
すると以下のようになります。
> max.col(t(pred)) [1] 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 2 2 2 2 3 2 2 2 2 2 [30] 3 2 2 2 2 3 3 3 3 3 3 3 3 3 3 3
Elixir でステートマシンを処理
「Akka でステートマシンを処理」 と同じ処理を gen_statem
の Elixir 用ラッパー(以下)を使って実装します。
ソースは http://github.com/fits/try_samples/tree/master/blog/20171121/
ステートマシンの実装(sample1)
まず、以下のステートマシンを実装します。
- 初期状態は Idle 状態
- Idle 状態で On イベントが発生すると Active 状態へ遷移
- Active 状態で Off イベントが発生すると Idle 状態へ遷移
現在の状態 | Off | On |
---|---|---|
Idle | Active | |
Active | Idle |
準備
mix でプロジェクトを作成します。
プロジェクト作成
> mix new sample1 ・・・ > cd sample1
mix.exs の deps へ gen_state_machine
を追加します。
今回は escript で実行するので、そのための設定も追加しておきます。
mix.exs
defmodule Sample1.Mixfile do use Mix.Project def project do [ ・・・ deps: deps(), # escript の設定 escript: [ main_module: Sample1 ] ] end ・・・ defp deps do [ # GenStateMachine {:gen_state_machine, "~> 2.0"} ] end end
実装
handle_event(event_type, event_content, state, data)
関数でイベントをハンドリングし、{:next_state, <遷移先の状態>, <新しいデータ>}
を返せば新しい状態へ遷移します。
状態遷移しない場合は {:keep_state, <データ>}
(:keep_state_and_data
でも可)を返します。
ここで、cast
関数(返事を待たない一方通行の呼び出し。Akka の tell と同じ)を使った場合のイベントタイプは :cast
となります。
lib/sample_state_machine.ex
defmodule SampleStateMachine do use GenStateMachine # 初期状態 def init(_args) do {:ok, :idle, 0} end # on イベントの処理(idle から active へ遷移) def handle_event(:cast, :on, :idle, data) do IO.puts "*** :on, idle -> active" {:next_state, :active, data + 1} end # off イベントの処理(active から idle へ遷移) def handle_event(:cast, :off, :active, data) do IO.puts "*** :off, active -> idle" {:next_state, :idle, data} end # 上記以外 def handle_event(event_type, event_content, state, data) do IO.puts "*** Unhandled: type=#{event_type}, content=#{event_content}, state=#{state}, data=#{data}" {:keep_state, data} # 以下でも可 # {:keep_state_and_data, []} end end
このステートマシンを動作確認するための処理を実装します。
escript で実行できるように main
関数内に定義しています。
lib/sample1.ex
defmodule Sample1 do def main(_args) do {:ok, pid} = GenStateMachine.start_link(SampleStateMachine, nil) GenStateMachine.cast(pid, :on) GenStateMachine.cast(pid, :off) GenStateMachine.cast(pid, :off) GenStateMachine.stop(pid) end end
ビルドと実行
deps.get
で gen_state_machine を取得します。
依存パッケージの取得
> mix deps.get Running dependency resolution... Dependency resolution completed: gen_state_machine 2.0.1 * Getting gen_state_machine (Hex package) Checking package (https://repo.hex.pm/tarballs/gen_state_machine-2.0.1.tar) Using locally cached package
escript.build
で escript 実行用にビルドします。
ビルド
> mix escript.build ==> gen_state_machine Compiling 3 files (.ex) Generated gen_state_machine app ==> sample1 Compiling 2 files (.ex) Generated sample1 app Generated escript sample1 with MIX_ENV=dev
escript コマンドで実行します。
実行結果
> escript sample1 *** :on, idle -> active *** :off, active -> idle *** Unhandled: type=cast, content=off, state=idle, data=1
タイムアウト付きステートマシンの実装(sample2)
次に、タイムアウト時の遷移を追加してみます。
現在の状態 | Off | On | Timeout (2秒) |
---|---|---|---|
Idle | Active | ||
Active | Idle | Idle |
実装
{:next_state, <遷移先の状態>, <新しいデータ>, <タイムアウト(ミリ秒)>}
を返すとタイムアウトを設定できます。
イベントタイプ :timeout
でタイムアウトをハンドリングできます。
注意点として、このタイムアウトは状態自体のタイムアウトではなくイベントの受信に対するタイムアウトです。
lib/timeout_state_machine.ex
defmodule TimeoutStateMachine do use GenStateMachine def init(_args) do {:ok, :idle, 0} end def handle_event(:cast, :on, :idle, data) do IO.puts "*** :on, idle -> active" # 2秒タイムアウト {:next_state, :active, data + 1, 2000} end def handle_event(:cast, :off, :active, data) do IO.puts "*** :off, active -> idle" {:next_state, :idle, data} end # タイムアウト時の処理 def handle_event(:timeout, event_content, :active, data) do IO.puts "*** :timeout content=#{event_content}, active -> idle" {:next_state, :idle, data} end def handle_event(event_type, event_content, state, data) do IO.puts "*** Unhandled: type=#{event_type}, content=#{event_content}, state=#{state}, data=#{data}" {:keep_state, data} end end
動作確認の処理を実装します。
lib/sample2.ex
defmodule Sample2 do def main(_args) do {:ok, pid} = GenStateMachine.start_link(TimeoutStateMachine, nil) GenStateMachine.cast(pid, :on) GenStateMachine.cast(pid, :off) GenStateMachine.cast(pid, :off) GenStateMachine.cast(pid, :on) :timer.sleep(2500) GenStateMachine.cast(pid, :on) :timer.sleep(1500) GenStateMachine.cast(pid, :invalid_message) :timer.sleep(1500) GenStateMachine.cast(pid, :invalid_message) :timer.sleep(2500) GenStateMachine.stop(pid) end end
ビルドと実行
依存パッケージの取得とビルド
> mix deps.get ・・・ > mix escript.build ・・・
実行結果は以下の通り、invalid_message のハンドリングでタイムアウトは機能しなくなっています。
実行結果
> escript sample2 *** :on, idle -> active *** :off, active -> idle *** Unhandled: type=cast, content=off, state=idle, data=1 *** :on, idle -> active *** :timeout content=2000, active -> idle *** :on, idle -> active *** Unhandled: type=cast, content=invalid_message, state=active, data=3 *** Unhandled: type=cast, content=invalid_message, state=active, data=3
状態タイムアウト付きステートマシンの実装(sample3)
最後に、状態のタイムアウトを実現します。
実装
:next_state
を返す際に {:state_timeout, <タイムアウト(ミリ秒)>, <イベント>}
を設定したリストを含める事で状態のタイムアウトを実現できます。
状態のタイムアウトはイベントタイプ :state_timeout
でハンドリングします。
lib/state_timeout_state_machine.ex
defmodule StateTimeoutStateMachine do use GenStateMachine def init(_args) do {:ok, :idle, 0} end def handle_event(:cast, :on, :idle, data) do IO.puts "*** :on, idle -> active" # 状態タイムアウトの設定 actions = [{:state_timeout, 2000, :off}] {:next_state, :active, data + 1, actions} end def handle_event(:cast, :off, :active, data) do IO.puts "*** :off, active -> idle" {:next_state, :idle, data} end # 状態タイムアウトの処理 def handle_event(:state_timeout, :off, :active, data) do IO.puts "*** :state_timeout, active -> idle" {:next_state, :idle, data} end def handle_event(event_type, event_content, state, data) do IO.puts "*** Unhandled: type=#{event_type}, content=#{event_content}, state=#{state}, data=#{data}" {:keep_state, data} end end
動作確認の処理を実装します。
lib/sample3.ex
defmodule Sample3 do def main(_args) do {:ok, pid} = GenStateMachine.start_link(StateTimeoutStateMachine, nil) GenStateMachine.cast(pid, :on) GenStateMachine.cast(pid, :off) GenStateMachine.cast(pid, :off) GenStateMachine.cast(pid, :on) :timer.sleep(2500) GenStateMachine.cast(pid, :on) :timer.sleep(1500) GenStateMachine.cast(pid, :invalid_message) :timer.sleep(1500) GenStateMachine.cast(pid, :invalid_message) :timer.sleep(2500) GenStateMachine.stop(pid) end end
ビルドと実行
依存パッケージの取得とビルド
> mix deps.get ・・・ > mix escript.build ・・・
実行結果は以下の通り、invalid_message のハンドリングとは無関係に状態のタイムアウトが機能しています。
実行結果
> escript sample3 *** :on, idle -> active *** :off, active -> idle *** Unhandled: type=cast, content=off, state=idle, data=1 *** :on, idle -> active *** :state_timeout, active -> idle *** :on, idle -> active *** Unhandled: type=cast, content=invalid_message, state=active, data=3 *** :state_timeout, active -> idle *** Unhandled: type=cast, content=invalid_message, state=idle, data=3
MySQL Binary Log connector でバイナリログをイベント処理
MySQL Binary Log connector (mysql-binlog-connector-java) を使うと、Java プログラムで MySQL / MariaDB のバイナリログをイベント処理できます。
そのため、MySQL の CDC(Change Data Capture)として使えるかもしれません。
ソースは http://github.com/fits/try_samples/tree/master/blog/20171030/
Groovy で実装
MySQL へ接続してバイナリログの内容を取得するには BinaryLogClient
を使います。
registerEventListener
メソッドで EventListener
実装オブジェクトを登録しておくと、バイナリログの内容をデシリアライズした Event
オブジェクトを引数として onEvent
メソッドを呼び出してくれます。
BinaryLogClient のソース(listenForEventPackets() メソッドなど)を見てみると、バイナリログを順番にデシリアライズして(特定のイベントタイプをスキップするような処理は無さそう)、登録している EventListener の onEvent を順次呼び出しているだけのようなので、場合によっては処理性能に注意が必要かもしれません。
binlog_sample.groovy
@Grab('com.github.shyiko:mysql-binlog-connector-java:0.13.0') import com.github.shyiko.mysql.binlog.BinaryLogClient def host = args[0] def port = args[1] as int def user = args[2] def pass = args[3] def client = new BinaryLogClient(host, port, user, pass) client.registerEventListener { ev -> // バイナリログの内容を処理 println ev } client.connect()
動作確認
動作確認は Docker で行ってみます。
準備
BinaryLogClient で接続するために MySQL 側でレプリケーション用の設定を行います。
まずは、レプリケーションの設定ファイルを用意します。
/home/vagrant/mysql/conf/repl.cnf (レプリケーションの設定)
[mysqld] log-bin=mysql-bin server-id=1
次に、レプリケーション用の接続ユーザーを追加するための SQL ファイルも用意しておきます。
/home/vagrant/mysql/init/repl-user.sql (レプリケーション用のユーザー)
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO repl@'%' IDENTIFIED BY 'pass';
今回はコンテナ間の接続(DB への接続)に Docker のユーザー定義ネットワークを使います。
そのため、まずはブリッジネットワーク(sample1)を作成しておきます。
Docker ユーザー定義ネットワークの作成
$ docker network create --driver bridge sample1
/home/vagrant/groovy/binlog_sample.groovy ファイルを用意した後、sample1 のネットワークへ参加するように Groovy のコンテナを実行します。
Groovy コンテナ実行
$ docker run --rm -it --net=sample1 -v /home/vagrant/groovy:/work groovy bash ・・・ groovy@・・・:~$ cd /work
a. MySQL 5.7 の場合
それでは、MySQL のコンテナを実行して動作確認を行います。
- MySQL 5.7.20
MySQL の Docker 公式イメージでは、/etc/mysql/conf.d 内の設定ファイルを適用し、/docker-entrypoint-initdb.d 内の SQL ファイルを実行するようになっています。
今回はこれを使って、先ほど用意しておいたレプリケーションの設定ファイルとユーザー作成 SQL を適用するように実行します。
a-1. MySQL コンテナ実行
$ docker run --name mysql1 --net=sample1 -e MYSQL_ROOT_PASSWORD=secret -d -v /home/vagrant/mysql/conf:/etc/mysql/conf.d -v /home/vagrant/mysql/init:/docker-entrypoint-initdb.d mysql
事前に実行しておいた Groovy コンテナ上で binlog_sample.groovy を実行します。 ユーザー名とパスワードはレプリケーション用のものを使用します。
a-2. binlog_sample.groovy 実行(Groovy コンテナ)
groovy@・・・:/work$ groovy binlog_sample.groovy mysql1 3306 repl pass Oct 22, 2017 4:46:02 PM com.github.shyiko.mysql.binlog.BinaryLogClient connect INFO: Connected to mysql1:3306 at mysql-bin.000003/154 (sid:65535, cid:3) Event{header=EventHeaderV4{timestamp=0, eventType=ROTATE, serverId=1, headerLength=19, dataLength=28, nextPosition=0, flags=32}, data=RotateEventData{binlogFilename='mysql-bin.000003', binlogPosition=154}} ・・・
この状態で以下の SQL を実行してみます。
CREATE DATABASE db1; USE db1; CREATE TABLE tbl1 (id int NOT NULL PRIMARY KEY, name varchar(10) NOT NULL); INSERT INTO tbl1 VALUES (1, 'a'); UPDATE tbl1 SET name = 'aa' WHERE id = 1; DELETE FROM tbl1 WHERE id = 1; CREATE TABLE tbl2 (id int NOT NULL PRIMARY KEY, name varchar(10) NOT NULL); START TRANSACTION; INSERT INTO tbl1 VALUES (1, 'a'); INSERT INTO tbl2 VALUES (2, 'b'); COMMIT;
上記 SQL 実行後の出力結果です。
a-3. binlog_sample.groovy 出力結果
・・・ Event{header=EventHeaderV4{timestamp=1508690815000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=219, flags=0}, data=null} Event{header=EventHeaderV4{timestamp=1508690815000, eventType=QUERY, serverId=1, headerLength=19, dataLength=72, nextPosition=310, flags=8}, data=QueryEventData{threadId=4, executionTime=0, errorCode=0, database='db1', sql='CREATE DATABASE db1'}} Event{header=EventHeaderV4{timestamp=1508690815000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=375, flags=0}, data=null} Event{header=EventHeaderV4{timestamp=1508690815000, eventType=QUERY, serverId=1, headerLength=19, dataLength=127, nextPosition=521, flags=0}, data=QueryEventData{threadId=4, executionTime=0, errorCode=0, database='db1', sql='CREATE TABLE tbl1 (id int NOT NULL PRIMARY KEY, name varchar(10) NOT NULL)'}} Event{header=EventHeaderV4{timestamp=1508690815000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=586, flags=0}, data=null} Event{header=EventHeaderV4{timestamp=1508690815000, eventType=QUERY, serverId=1, headerLength=19, dataLength=52, nextPosition=657, flags=8}, data=QueryEventData{threadId=4, executionTime=0, errorCode=0, database='db1', sql='BEGIN'}} Event{header=EventHeaderV4{timestamp=1508690815000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=30, nextPosition=706, flags=0}, data=TableMapEventData{tableId=219, database='db1', table='tbl1', columnTypes=3, 15, columnMetadata=0, 10, columnNullability={}}} Event{header=EventHeaderV4{timestamp=1508690815000, eventType=EXT_WRITE_ROWS, serverId=1, headerLength=19, dataLength=23, nextPosition=748, flags=0}, data=WriteRowsEventData{tableId=219, includedColumns={0, 1}, rows=[ [1, a] ]}} Event{header=EventHeaderV4{timestamp=1508690815000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=779, flags=0}, data=XidEventData{xid=14}} Event{header=EventHeaderV4{timestamp=1508690815000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=844, flags=0}, data=null} Event{header=EventHeaderV4{timestamp=1508690815000, eventType=QUERY, serverId=1, headerLength=19, dataLength=52, nextPosition=915, flags=8}, data=QueryEventData{threadId=4, executionTime=0, errorCode=0, database='db1', sql='BEGIN'}} Event{header=EventHeaderV4{timestamp=1508690815000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=30, nextPosition=964, flags=0}, data=TableMapEventData{tableId=219, database='db1', table='tbl1', columnTypes=3, 15, columnMetadata=0, 10, columnNullability={}}} Event{header=EventHeaderV4{timestamp=1508690815000, eventType=EXT_UPDATE_ROWS, serverId=1, headerLength=19, dataLength=32, nextPosition=1015, flags=0}, data=UpdateRowsEventData{tableId=219, includedColumnsBeforeUpdate={0, 1}, includedColumns={0, 1}, rows=[ {before=[1, a], after=[1, aa]} ]}} Event{header=EventHeaderV4{timestamp=1508690815000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=1046, flags=0}, data=XidEventData{xid=15}} Event{header=EventHeaderV4{timestamp=1508690815000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=1111, flags=0}, data=null} Event{header=EventHeaderV4{timestamp=1508690815000, eventType=QUERY, serverId=1, headerLength=19, dataLength=52, nextPosition=1182, flags=8}, data=QueryEventData{threadId=4, executionTime=0, errorCode=0, database='db1', sql='BEGIN'}} Event{header=EventHeaderV4{timestamp=1508690815000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=30, nextPosition=1231, flags=0}, data=TableMapEventData{tableId=219, database='db1', table='tbl1', columnTypes=3, 15, columnMetadata=0, 10, columnNullability={}}} Event{header=EventHeaderV4{timestamp=1508690815000, eventType=EXT_DELETE_ROWS, serverId=1, headerLength=19, dataLength=24, nextPosition=1274, flags=0}, data=DeleteRowsEventData{tableId=219, includedColumns={0, 1}, rows=[ [1, aa] ]}} Event{header=EventHeaderV4{timestamp=1508690815000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=1305, flags=0}, data=XidEventData{xid=16}} Event{header=EventHeaderV4{timestamp=1508690815000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=1370, flags=0}, data=null} Event{header=EventHeaderV4{timestamp=1508690815000, eventType=QUERY, serverId=1, headerLength=19, dataLength=127, nextPosition=1516, flags=0}, data=QueryEventData{threadId=4, executionTime=1, errorCode=0, database='db1', sql='CREATE TABLE tbl2 (id int NOT NULL PRIMARY KEY, name varchar(10) NOT NULL)'}} Event{header=EventHeaderV4{timestamp=1508690819000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=1581, flags=0}, data=null} Event{header=EventHeaderV4{timestamp=1508690816000, eventType=QUERY, serverId=1, headerLength=19, dataLength=52, nextPosition=1652, flags=8}, data=QueryEventData{threadId=4, executionTime=0, errorCode=0, database='db1', sql='BEGIN'}} Event{header=EventHeaderV4{timestamp=1508690816000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=30, nextPosition=1701, flags=0}, data=TableMapEventData{tableId=219, database='db1', table='tbl1', columnTypes=3, 15, columnMetadata=0, 10, columnNullability={}}} Event{header=EventHeaderV4{timestamp=1508690816000, eventType=EXT_WRITE_ROWS, serverId=1, headerLength=19, dataLength=23, nextPosition=1743, flags=0}, data=WriteRowsEventData{tableId=219, includedColumns={0, 1}, rows=[ [1, a] ]}} Event{header=EventHeaderV4{timestamp=1508690816000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=30, nextPosition=1792, flags=0}, data=TableMapEventData{tableId=220, database='db1', table='tbl2', columnTypes=3, 15, columnMetadata=0, 10, columnNullability={}}} Event{header=EventHeaderV4{timestamp=1508690816000, eventType=EXT_WRITE_ROWS, serverId=1, headerLength=19, dataLength=23, nextPosition=1834, flags=0}, data=WriteRowsEventData{tableId=220, includedColumns={0, 1}, rows=[ [2, b] ]}} Event{header=EventHeaderV4{timestamp=1508690819000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=1865, flags=0}, data=XidEventData{xid=19}}
eventType を簡単にまとめると以下のようになりました。
SQL | eventType |
---|---|
CREATE | QUERY |
INSERT | EXT_WRITE_ROWS |
UPDATE | EXT_UPDATE_ROWS |
DELETE | EXT_DELETE_ROWS |
注意点として、EXT_XXX_ROWS の Event 内容にテーブル名は含まれておらず tableId
で判断する必要がありそうです。
tableId とテーブル名のマッピングは直前の TABLE_MAP
で実施されています。
また、バイナリログのフォーマット(以下)は RBR(行ベースレプリケーション) となっていました。
mysql> show global variables like 'binlog_format'; +---------------+-------+ | Variable_name | Value | +---------------+-------+ | binlog_format | ROW | +---------------+-------+
b. MariaDB 10.2 の場合
ついでに、MariaDB でも試してみます。
- MariaDB 10.2.9
設定は MySQL と同じものが使えるので、ここでは Docker イメージ名を mariadb に変えて実行するだけです。(以下ではコンテナ名も変えています)
b-1. MariaDB コンテナ実行
$ docker run --name mariadb1 --net=sample1 -e MYSQL_ROOT_PASSWORD=secret -d -v /home/vagrant/mysql/conf:/etc/mysql/conf.d -v /home/vagrant/mysql/init:/docker-entrypoint-initdb.d mariadb
接続先を mariadb1 (MariaDB) へ変えてスクリプトを実行します。
b-2. binlog_sample.groovy 実行(Groovy コンテナ)
groovy@・・・:/work$ groovy binlog_sample.groovy mariadb1 3306 repl pass ・・・
b-3. binlog_sample.groovy 出力結果
・・・ Event{header=EventHeaderV4{timestamp=1508691100000, eventType=QUERY, serverId=1, headerLength=19, dataLength=23, nextPosition=384, flags=8}, data=QueryEventData{threadId=0, executionTime=0, errorCode=0, database='', sql='# Dum'}} Event{header=EventHeaderV4{timestamp=1508691100000, eventType=QUERY, serverId=1, headerLength=19, dataLength=66, nextPosition=469, flags=8}, data=QueryEventData{threadId=10, executionTime=0, errorCode=0, database='db1', sql='CREATE DATABASE db1'}} Event{header=EventHeaderV4{timestamp=1508691100000, eventType=QUERY, serverId=1, headerLength=19, dataLength=23, nextPosition=511, flags=8}, data=QueryEventData{threadId=0, executionTime=0, errorCode=0, database='', sql='# Dum'}} Event{header=EventHeaderV4{timestamp=1508691100000, eventType=QUERY, serverId=1, headerLength=19, dataLength=121, nextPosition=651, flags=0}, data=QueryEventData{threadId=10, executionTime=0, errorCode=0, database='db1', sql='CREATE TABLE tbl1 (id int NOT NULL PRIMARY KEY, name varchar(10) NOT NULL)'}} Event{header=EventHeaderV4{timestamp=1508691100000, eventType=QUERY, serverId=1, headerLength=19, dataLength=23, nextPosition=693, flags=8}, data=QueryEventData{threadId=0, executionTime=0, errorCode=0, database='', sql='BEGIN'}} Event{header=EventHeaderV4{timestamp=1508691100000, eventType=QUERY, serverId=1, headerLength=19, dataLength=79, nextPosition=791, flags=0}, data=QueryEventData{threadId=10, executionTime=0, errorCode=0, database='db1', sql='INSERT INTO tbl1 VALUES (1, 'a')'}} Event{header=EventHeaderV4{timestamp=1508691100000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=822, flags=0}, data=XidEventData{xid=12}} Event{header=EventHeaderV4{timestamp=1508691100000, eventType=QUERY, serverId=1, headerLength=19, dataLength=23, nextPosition=864, flags=8}, data=QueryEventData{threadId=0, executionTime=0, errorCode=0, database='', sql='BEGIN'}} Event{header=EventHeaderV4{timestamp=1508691100000, eventType=QUERY, serverId=1, headerLength=19, dataLength=87, nextPosition=970, flags=0}, data=QueryEventData{threadId=10, executionTime=0, errorCode=0, database='db1', sql='UPDATE tbl1 SET name = 'aa' WHERE id = 1'}} Event{header=EventHeaderV4{timestamp=1508691100000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=1001, flags=0}, data=XidEventData{xid=13}} Event{header=EventHeaderV4{timestamp=1508691100000, eventType=QUERY, serverId=1, headerLength=19, dataLength=23, nextPosition=1043, flags=8}, data=QueryEventData{threadId=0, executionTime=0, errorCode=0, database='', sql='BEGIN'}} Event{header=EventHeaderV4{timestamp=1508691100000, eventType=QUERY, serverId=1, headerLength=19, dataLength=76, nextPosition=1138, flags=0}, data=QueryEventData{threadId=10, executionTime=0, errorCode=0, database='db1', sql='DELETE FROM tbl1 WHERE id = 1'}} Event{header=EventHeaderV4{timestamp=1508691100000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=1169, flags=0}, data=XidEventData{xid=14}} Event{header=EventHeaderV4{timestamp=1508691100000, eventType=QUERY, serverId=1, headerLength=19, dataLength=23, nextPosition=1211, flags=8}, data=QueryEventData{threadId=0, executionTime=0, errorCode=0, database='', sql='# Dum'}} Event{header=EventHeaderV4{timestamp=1508691100000, eventType=QUERY, serverId=1, headerLength=19, dataLength=121, nextPosition=1351, flags=0}, data=QueryEventData{threadId=10, executionTime=1, errorCode=0, database='db1', sql='CREATE TABLE tbl2 (id int NOT NULL PRIMARY KEY, name varchar(10) NOT NULL)'}} Event{header=EventHeaderV4{timestamp=1508691101000, eventType=QUERY, serverId=1, headerLength=19, dataLength=23, nextPosition=1393, flags=8}, data=QueryEventData{threadId=0, executionTime=0, errorCode=0, database='', sql='BEGIN'}} Event{header=EventHeaderV4{timestamp=1508691101000, eventType=QUERY, serverId=1, headerLength=19, dataLength=79, nextPosition=1491, flags=0}, data=QueryEventData{threadId=10, executionTime=0, errorCode=0, database='db1', sql='INSERT INTO tbl1 VALUES (1, 'a')'}} Event{header=EventHeaderV4{timestamp=1508691101000, eventType=QUERY, serverId=1, headerLength=19, dataLength=79, nextPosition=1589, flags=0}, data=QueryEventData{threadId=10, executionTime=0, errorCode=0, database='db1', sql='INSERT INTO tbl2 VALUES (2, 'b')'}} Event{header=EventHeaderV4{timestamp=1508691101000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=1620, flags=0}, data=XidEventData{xid=17}}
eventType を簡単にまとめると以下のようになりました。
SQL | eventType |
---|---|
CREATE | QUERY |
INSERT | QUERY |
UPDATE | QUERY |
DELETE | QUERY |
バイナリログのフォーマットは MBR(ミックスベースレプリケーション)となっていました。
MariaDB [(none)]> show global variables like 'binlog_format'; +---------------+-------+ | Variable_name | Value | +---------------+-------+ | binlog_format | MIXED | +---------------+-------+
Akka でステートマシンを処理
前回 の有限ステートマシン(FSM)の処理を Akka の Java 用 API を使って実装してみます。
ソースは http://github.com/fits/try_samples/tree/master/blog/20171016/
ステートマシンの実装
まずは、以下のステートマシンを実装します。
- 初期状態は Idle 状態
- Idle 状態で On イベントが発生すると Active 状態へ遷移
- Active 状態で Off イベントが発生すると Idle 状態へ遷移
現在の状態 | Off | On |
---|---|---|
Idle | Active | |
Active | Idle |
AbstractFSM<状態の型, データの型>
を extends する事でステートマシンを実装します。
状態以外にも状態のデータを指定できるので、Active 状態へ変化した回数をカウントするようにしてみました。
指定の状態で特定のイベントを処理するには when(現在の状態, FSMStateFunctionBuilder)
を使い、イベントに対する処理は matchEventEquals()
等を使います。
goTo(遷移先の状態)
で状態を遷移させ、その際に using(データ)
で状態遷移後のデータを指定できます。
処理対象外のイベントを受け取った場合の処理は whenUnhandled(FSMStateFunctionBuilder)
で指定でき、状態遷移の状況確認に onTransition
が使えます。
sample.groovy
@Grab('com.typesafe.akka:akka-actor_2.12:2.5.6') import akka.actor.AbstractFSM import akka.actor.ActorSystem import akka.actor.ActorRef import akka.actor.Props enum States { Idle, Active } enum Events { On, Off } class SampleStateMachine extends AbstractFSM<States, Integer> { { // 初期状態の設定 startWith(States.Idle, 0) // On イベントで Idle から Active 状態へ遷移 when(States.Idle, matchEventEquals(Events.On) { event, data -> // Active へ遷移 goTo(States.Active).using(data + 1) }) // Off イベントで Active から Idle 状態へ遷移 when(States.Active, matchEventEquals(Events.Off) { event, data -> // Idle へ遷移 goTo(States.Idle) }) // 処理されないイベント発生時 whenUnhandled( matchAnyEvent { event, data -> println "*** Unhandled event=${event}, data=${data}" stay() } ) // 状態遷移の発生時 onTransition { from, to -> println "*** stateChanged: ${from} -> ${to}, data=${stateData()}, next data=${nextStateData()}" } } } def system = ActorSystem.create() def actor = system.actorOf(Props.create(SampleStateMachine)) actor.tell(Events.On, ActorRef.noSender()) actor.tell(Events.Off, ActorRef.noSender()) actor.tell(Events.Off, ActorRef.noSender()) sleep 2000 system.terminate()
実行結果は以下の通りです。
実行結果
> groovy sample.groovy *** stateChanged: Idle -> Active, data=0, next data=1 *** stateChanged: Active -> Idle, data=1, next data=1 *** Unhandled event=Off, data=1
タイムアウト付きステートマシンの実装1
次は、タイムアウト時の遷移を追加してみます。
現在の状態 | Off | On | Timeout (2秒) |
---|---|---|---|
Idle | Active | ||
Active | Idle | Idle |
when で scala.concurrent.duration.FiniteDuration
を指定すると状態のタイムアウト ※ を指定できます。
※ ただし、このタイムアウトは その状態で何もメッセージを受信しなかった時間に対する タイムアウトのようです 状態の遷移が発生しなくても メッセージを受信する度にタイムアウトはリセットされます
タイムアウト発生時のイベント(メッセージ)は StateTimeout()
の戻り値と等しくなります。
timeout_sample1.groovy
・・・ import scala.concurrent.duration.Duration import java.util.concurrent.TimeUnit enum States { Idle, Active } enum Events { On, Off } class SampleStateMachine extends AbstractFSM<States, Integer> { { startWith(States.Idle, 0) when(States.Idle, matchEventEquals(Events.On) { event, data -> goTo(States.Active).using(data + 1) }) when(States.Active, Duration.create(2, TimeUnit.SECONDS), matchEventEquals(Events.Off) { event, data -> goTo(States.Idle) }.eventEquals(StateTimeout()) { event, data -> // タイムアウト時の処理 println "*** timeout: event=${event}, data=${data}" goTo(States.Idle) /* 以下でも可 self().tell(Events.Off, self()) stay() */ } ) ・・・ } } def system = ActorSystem.create() def actor = system.actorOf(Props.create(SampleStateMachine)) actor.tell(Events.On, ActorRef.noSender()) actor.tell(Events.Off, ActorRef.noSender()) actor.tell(Events.Off, ActorRef.noSender()) actor.tell(Events.On, ActorRef.noSender()) sleep 2500 system.terminate()
実行結果は以下の通りです。
実行結果
> groovy timeout_sample1.groovy *** stateChanged: Idle -> Active, data=0, next data=1 *** stateChanged: Active -> Idle, data=1, next data=1 *** Unhandled event=Off, data=1 *** stateChanged: Idle -> Active, data=1, next data=2 *** timeout: event=StateTimeout, data=2 *** stateChanged: Active -> Idle, data=2, next data=2
when で指定したタイムアウトはイベント(メッセージ)を受信するとリセットされる事を確認するため、以下のようにタイムアウト発生前に invalid-message という文字列を 2回 tell するようにしてみます。
timeout_sample1b.groovy (タイムアウトの検証)
・・・ def system = ActorSystem.create() def actor = system.actorOf(Props.create(SampleStateMachine)) ・・・ actor.tell(Events.On, ActorRef.noSender()) sleep 1500 // 1回目 actor.tell('invalid-message', ActorRef.noSender()) sleep 1500 // 2回目 actor.tell('invalid-message', ActorRef.noSender()) sleep 2500 system.terminate()
実行結果は、以下のように invalid-message という文字列を 2回受信した後にタイムアウトしました。
つまり、メッセージの受信でタイムアウトがリセットされていると考えられます。
実行結果(タイムアウトの検証)
> groovy timeout_sample1b.groovy ・・・ *** stateChanged: Idle -> Active, data=1, next data=2 *** Unhandled event=invalid-message, data=2 *** Unhandled event=invalid-message, data=2 *** timeout: event=StateTimeout, data=2 *** stateChanged: Active -> Idle, data=2, next data=2
タイムアウト付きステートマシンの実装2 (状態のタイムアウト)
メッセージの受信有無に左右されないタイムアウトを実現するには、when のタイムアウトを使わずに setTimer()
を使う事で実装できそうです。
timeout_sample2.groovy
・・・ class SampleStateMachine extends AbstractFSM<States, Integer> { { ・・・ when(States.Active, matchEventEquals(Events.Off) { event, data -> goTo(States.Idle) }.eventEquals(StateTimeout()) { event, data -> println "*** timeout: event=${event}, data=${data}" goTo(States.Idle) /* 以下でも可 self().tell(Events.Off, self()) stay() */ } ) ・・・ onTransition { from, to -> println "*** stateChanged: ${from} -> ${to}, data=${stateData()}, next data=${nextStateData()}" if (to == States.Active) { // Active 状態のタイムアウト設定 setTimer( 'active-timeout', StateTimeout(), Duration.create(2, TimeUnit.SECONDS) ) } else { // タイムアウトのキャンセル cancelTimer('active-timeout') } } } } def system = ActorSystem.create() def actor = system.actorOf(Props.create(SampleStateMachine)) ・・・ actor.tell(Events.On, ActorRef.noSender()) sleep 1500 actor.tell("invalid-message", ActorRef.noSender()) sleep 1500 actor.tell("invalid-message", ActorRef.noSender()) sleep 2500 system.terminate()
実行してみると、2回目の invalid-message を受信する前にタイムアウトが発生しており意図通りの動作となりました。
実行結果
> groovy timeout_sample2.groovy ・・・ *** stateChanged: Idle -> Active, data=1, next data=2 *** Unhandled event=invalid-message, data=2 *** timeout: event=StateTimeout, data=2 *** stateChanged: Active -> Idle, data=2, next data=2 *** Unhandled event=invalid-message, data=2
Spring Statemachine でステートマシンを処理
Spring Statemachine を使って単純な有限ステートマシン(FSM)を実装してみました。
ソースは http://github.com/fits/try_samples/tree/master/blog/20171002/
はじめに
Spring Boot 2.0.0.M4 を使用して Kotlin で実装するため、以下のような Gradle ビルド定義を使いました。
build.gradle
buildscript { ext { kotlinVersion = '1.1.51' springBootVersion = '2.0.0.M4' } repositories { mavenCentral() maven { url "https://repo.spring.io/snapshot" } maven { url "https://repo.spring.io/milestone" } } dependencies { classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}") classpath("org.jetbrains.kotlin:kotlin-gradle-plugin:${kotlinVersion}") classpath("org.jetbrains.kotlin:kotlin-allopen:${kotlinVersion}") } } apply plugin: 'kotlin' apply plugin: 'kotlin-spring' apply plugin: 'org.springframework.boot' compileKotlin { kotlinOptions.jvmTarget = "1.8" } repositories { mavenCentral() maven { url "https://repo.spring.io/snapshot" } maven { url "https://repo.spring.io/milestone" } } dependencies { // JDK 9 でも実行できるようにバージョンを設定 compile("org.springframework.boot:spring-boot-starter:${springBootVersion}") // Spring Statemachine compile('org.springframework.statemachine:spring-statemachine-core:2.0.0.BUILD-SNAPSHOT') compile("org.jetbrains.kotlin:kotlin-stdlib-jre8:${kotlinVersion}") compile("org.jetbrains.kotlin:kotlin-reflect:${kotlinVersion}") }
上記は Spring Initializr で生成したものをベースに多少の変更 ※ を加えています。
※ 不要な設定を削除し、JDK 9 でも実行できるように compile(・・・) で spring-boot-starter のバージョンを設定 compile('org.springframework.boot:spring-boot-starter') のままでは JDK 9 で以下のようなエラーが発生したため(バージョン指定が欠ける) Could not find org.springframework.boot:spring-boot-starter:.
a. StateMachineBuilder 使用
Spring Statemachine では、有限ステートマシンを定義するために以下のような方法が用意されているようなので、まずは StateMachineBuilder を使ってみます。
実装するステートマシンは以下の通りです。
- 初期状態は Idle 状態
- Idle 状態で On イベントが発生すると Active 状態へ遷移
- Active 状態で Off イベントが発生すると Idle 状態へ遷移
現在の状態 | Off | On |
---|---|---|
Idle | Active | |
Active | Idle |
Spring Statemachine におけるステートマシンは StateMachine<状態の型, イベントの型>
として扱います。
今回は状態の型を States
、イベントの型を Events
とし、enum で定義しています。
StateMachineBuilder を使用する場合、builder()
で取得した StateMachineBuilder.Builder<状態の型, イベントの型>
に対してステートマシンの状態(初期状態など)や状態遷移等の設定を行います。
状態の設定は configureStates()
で StateMachineStateConfigurer
を取得し、更に withStates()
で取得した StateConfigurer
で設定します。
initial
で初期の状態を指定し、states
で全ての状態を指定します。
状態遷移の設定は configureTransitions()
で取得した StateMachineTransitionConfigurer
に対して行います。
別の状態へ遷移する場合は withExternal()
で取得した ExternalTransitionConfigurer
で設定します。
source(状態)
で遷移前の状態、target(状態)
で遷移後の状態を指定し event(イベント)
で遷移のきっかけとなるイベントを指定します。
その際に、何らかの処理を行う場合は action(処理)
で指定できます。
複数の状態遷移を繋げて書きたい場合は and()
を使います。
状態遷移等の状況確認には StateMachineListener
が使えます。
src/main/kotlin/sample/Application.kt
package sample import org.springframework.boot.CommandLineRunner import org.springframework.boot.SpringApplication import org.springframework.boot.autoconfigure.SpringBootApplication import org.springframework.context.annotation.Bean import org.springframework.messaging.Message import org.springframework.statemachine.StateMachine import org.springframework.statemachine.config.StateMachineBuilder import org.springframework.statemachine.listener.StateMachineListenerAdapter import org.springframework.statemachine.state.State // 状態 enum class States { Idle, Active } // イベント enum class Events { On, Off } @SpringBootApplication class Application : CommandLineRunner { override fun run(vararg args: String?) { val machine = stateMachine() machine.addStateListener(SampleListener()) // ステートマシンの開始 machine.start() // Idle -> Active (stateChanged) machine.sendEvent(Events.On) // Active -> Idle (stateChanged) machine.sendEvent(Events.Off) // Idle 状態で Off しても何も起こらない (eventNotAccepted) machine.sendEvent(Events.Off) } // 有限ステートマシンの定義 @Bean fun stateMachine(): StateMachine<States, Events> { val builder = StateMachineBuilder.builder<States, Events>() // 状態の設定 builder.configureStates().withStates() .initial(States.Idle).states(States.values().toSet()) // 遷移の設定 builder.configureTransitions() // On イベントで Idle から Active 状態へ遷移 .withExternal().source(States.Idle).target(States.Active).event(Events.On) .and() // Off イベントで Active から Idle 状態へ遷移 .withExternal().source(States.Active).target(States.Idle).event(Events.Off) return builder.build() } } class SampleListener : StateMachineListenerAdapter<States, Events>() { // 状態遷移の発生時 override fun stateChanged(from: State<States, Events>?, to: State<States, Events>?) { println("*** stateChanged: ${from?.id} -> ${to?.id}") } // 受付不可なイベント発生時 override fun eventNotAccepted(event: Message<Events>?) { println("*** eventNotAccepted: ${event?.payload}") } } fun main(args: Array<String>) { SpringApplication.run(Application::class.java, *args) }
実行結果は以下の通りです。
実行結果
> gradle -q bootRun ・・・ *** stateChanged: null -> Idle ・・・ *** stateChanged: Idle -> Active *** stateChanged: Active -> Idle *** eventNotAccepted: Off ・・・ ・・・ o.s.s.support.LifecycleObjectSupport : destroy called
処理が終わるとプロセスは終了しました。
b. @StateMachineFactory アノテーション使用
次に、@StateMachineFactory
アノテーションを使ってステートマシンを定義します。
状態やイベントの型に enum を使っている場合は、EnumStateMachineConfigurerAdapter<状態の型, イベントの型>
を extends したクラスへ @StateMachineFactory を付与します。
この場合、@Autowired
対象の変数の型を StateMachineFactory<状態の型, イベントの型>
とします。
@StateMachine
アノテーションの場合も基本的に同じで、その場合は @Autowired 対象の型を StateMachine<状態の型, イベントの型>
とします。
ステートマシンの定義は、該当する configure(xxxConfigurer)
をオーバーライドして StateMachineBuilder と同じ様に設定するだけです。
ここでは、StateMachineBuilder のサンプルへ以下の機能を追加してみました。
start()
メソッドを呼び出さなくても開始するようにautoStartup(true)
を設定- Active 状態のまま 2秒経過すると Idle 状態へ戻る遷移を追加
状態遷移は以下のようになります。
現在の状態 | Off | On | Timeout (2秒) |
---|---|---|---|
Idle | Active | ||
Active | Idle | Idle |
ここでは、withInternal()
で timerOnce(ミリ秒)
と action(処理)
を組み合わせて、Active 状態が 2秒続いた(タイムアウトした)際に Off イベントを送信して Idle 状態へ遷移するようにしてみましたが、timerOnce(ミリ秒)
は withExternal()
でも使えます。
src/main/kotlin/sample/SampleStateMachineConfig.kt
package sample import org.springframework.statemachine.StateContext import org.springframework.statemachine.config.EnableStateMachineFactory import org.springframework.statemachine.config.EnumStateMachineConfigurerAdapter import org.springframework.statemachine.config.builders.StateMachineConfigurationConfigurer import org.springframework.statemachine.config.builders.StateMachineStateConfigurer import org.springframework.statemachine.config.builders.StateMachineTransitionConfigurer enum class States { Idle, Active } enum class Events { On, Off } // 有限ステートマシンの定義 @EnableStateMachineFactory class SampleStateMachineConfig : EnumStateMachineConfigurerAdapter<States, Events>() { override fun configure(config: StateMachineConfigurationConfigurer<States, Events>?) { config!!.withConfiguration() // 自動的に開始(start メソッドを呼び出す必要がなくなる) .autoStartup(true) } override fun configure(states: StateMachineStateConfigurer<States, Events>?) { states!!.withStates() .initial(States.Idle).states(States.values().toSet()) } override fun configure(transitions: StateMachineTransitionConfigurer<States, Events>?) { transitions!! .withExternal().source(States.Idle).target(States.Active).event(Events.On) .and() .withExternal().source(States.Active).target(States.Idle).event(Events.Off) .and() .withInternal().source(States.Active).timerOnce(2000).action(this::timeout) // 以下でも可 //.withExternal().source(States.Active).target(States.Idle).timerOnce(2000) } private fun timeout(ctx: StateContext<States, Events>) { println("*** timeout: ${ctx.source.id}") // Off イベント送信(Idle 状態へ戻す) ctx.stateMachine.sendEvent(Events.Off) } }
上記を @Autowired して使います。
autoStartup を有効化したので StateMachine の start()
を呼び出す必要はありません。
src/main/kotlin/sample/Application.kt
package sample import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.CommandLineRunner import org.springframework.boot.SpringApplication import org.springframework.boot.autoconfigure.SpringBootApplication import org.springframework.messaging.Message import org.springframework.statemachine.config.StateMachineFactory import org.springframework.statemachine.listener.StateMachineListenerAdapter import org.springframework.statemachine.state.State @SpringBootApplication class Application : CommandLineRunner { @Autowired lateinit var stateMachineFactory: StateMachineFactory<States, Events> override fun run(vararg args: String?) { val machine = stateMachineFactory.stateMachine machine.addStateListener(SampleListener()) machine.sendEvent(Events.On) machine.sendEvent(Events.Off) machine.sendEvent(Events.Off) // Active 状態にして放置 machine.sendEvent(Events.On) // timerOnce を使うとプロセスが終了しなくなるため sleep は不要だった // Thread.sleep(2500) } } class SampleListener : StateMachineListenerAdapter<States, Events>() { override fun stateChanged(from: State<States, Events>?, to: State<States, Events>?) { println("*** stateChanged: ${from?.id} -> ${to?.id}") } override fun eventNotAccepted(event: Message<Events>?) { println("*** eventNotAccepted: ${event?.payload}") } } fun main(args: Array<String>) { SpringApplication.run(Application::class.java, *args) }
実行結果は以下の通りです。
実行結果
> gradle -q bootRun ・・・ *** stateChanged: Idle -> Active *** stateChanged: Active -> Idle *** eventNotAccepted: Off *** stateChanged: Idle -> Active ・・・ *** timeout: Active *** stateChanged: Active -> Idle
timerOnce
等を使うとプロセスが終了しなくなるようなので、Ctrl + c 等でプロセスを停止します。
Akka Streams で MQTT Broker へ接続
ローカルで実行した MQTT Broker(前回 参照)に対して Akka Streams の Java 用 API を使って Groovy で接続してみます。
Akka Streams 用の様々なコネクタを備えた Alpakka に MQTT Broker 用の Source や Sink が用意されているので、今回はこちらを使います。
今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170925/
Publish 処理
まずは publish 処理です。
MqttConnectionSettings
から MqttSink
を作成し、MqttMessage
を渡せば MQTT Broker へメッセージを送信できます。
MqttConnectionSettings は MQTT Broker の接続先と clientId
、そして永続化の方法を指定して作成します。
clientId はクライアント毎に一意な値を指定します。 (clientId を null
にすると IllegalArgumentException: Null clientId
となりました)
MqttQoS ではメッセージ到達の QoS を指定します。
MqttQoS.atLeastOnce()
は少なくとも 1回(重複の可能性あり)の到達可能性を指定する事になります。
@Grab('com.typesafe.akka:akka-stream_2.12:2.5.4') @Grab('com.lightbend.akka:akka-stream-alpakka-mqtt_2.12:0.11') import akka.actor.ActorSystem import akka.stream.ActorMaterializer import akka.stream.javadsl.Source import akka.stream.alpakka.mqtt.MqttQoS import akka.stream.alpakka.mqtt.MqttConnectionSettings import akka.stream.alpakka.mqtt.MqttMessage import akka.stream.alpakka.mqtt.javadsl.MqttSink import akka.util.ByteString def topic = args[0] def clientId = args[1] def message = args[2] def system = ActorSystem.create() def mat = ActorMaterializer.create(system) def settings = MqttConnectionSettings.create( 'tcp://localhost:1883', clientId, new org.eclipse.paho.client.mqttv3.persist.MemoryPersistence() ) def msg = MqttMessage.create(topic, ByteString.fromString(message)) Source.single(msg) .runWith(MqttSink.create(settings, MqttQoS.atLeastOnce()), mat) // メッセージ送信前に terminate しないようスリープで調整 sleep 1000 system.terminate()
Subscribe 処理
次に subscribe 処理です。
MqttConnectionSettings から MqttSourceSettings
を作り、MqttSource
を作成します。
MqttSourceSettings の withSubscriptions
で subscribe するトピック名と QoS を指定します。
mqtt_subscribe.groovy
@Grab('com.typesafe.akka:akka-stream_2.12:2.5.4') @Grab('com.lightbend.akka:akka-stream-alpakka-mqtt_2.12:0.11') import akka.actor.ActorSystem import akka.japi.Pair import akka.stream.ActorMaterializer import akka.stream.javadsl.Sink import akka.stream.alpakka.mqtt.MqttQoS import akka.stream.alpakka.mqtt.MqttSourceSettings import akka.stream.alpakka.mqtt.MqttConnectionSettings import akka.stream.alpakka.mqtt.javadsl.MqttSource def topic = args[0] def clientId = args[1] def system = ActorSystem.create() def mat = ActorMaterializer.create(system) def settings = MqttSourceSettings.create( MqttConnectionSettings.create( 'tcp://localhost:1883', clientId, new org.eclipse.paho.client.mqttv3.persist.MemoryPersistence() ) ).withSubscriptions( // トピックと QoS の指定 Pair.create(topic, MqttQoS.atLeastOnce()) ) MqttSource.create(settings, 10).runWith(Sink.foreach { println it }, mat) println "subscribe : ${topic}" System.in.read() system.terminate()
上記の処理では、再接続の際にセッションがクリアされてしまうので、通信が切断している間のメッセージを後で受け取るような事はできません。
再接続の際にセッションが復元される Persistent Session を適用するには、以下のように withCleanSession(false)
とする必要があります。
なお、Persistent Session を適用するには同じ clientId を使って再接続する必要があります。
mqtt_subscribe2.groovy (Persistent Session 版)
・・・ def settings = MqttSourceSettings.create( MqttConnectionSettings.create( 'tcp://localhost:1883', clientId, new org.eclipse.paho.client.mqttv3.persist.MemoryPersistence() ).withCleanSession(false) // Persistent Session ).withSubscriptions( Pair.create(topic, MqttQoS.atLeastOnce()) ) ・・・
動作確認
前回 の Moquette 組み込み実行スクリプトで MQTT Broker を起動しておきます。
MQTT Broker 実行
> groovy moquette_run.groovy ・・・ [main] INFO io.moquette.server.netty.NettyAcceptor - Server has been bound. host=0.0.0.0, port=1883 ・・・ Server started, version 0.10
Subscribe 処理を実行します。
動作確認のため Persistent Session 版も実行していますが、(Publish 処理も含め) clientId へ異なる値をそれぞれ指定します。(ここでは subscribe1・2 と publish1 としています)
また、トピック名は sample とします。
(a) Subscribe1 実行
> groovy mqtt_subscribe.groovy sample subscribe1 subscribe: sample
(b) Subscribe2(Persistent Session 版)実行
> groovy mqtt_subscribe2.groovy sample subscribe2 subscribe: sample
この状態で “a” “ab” “abc” という 3つのメッセージを publish してみます。
Publish 実行
> groovy mqtt_publish.groovy sample publish1 a > groovy mqtt_publish.groovy sample publish1 ab > groovy mqtt_publish.groovy sample publish1 abc
Subscribe の結果は以下のようになりました。
(a) Subscribe1 状況
> groovy mqtt_subscribe.groovy sample subscribe1 subscribe: sample MqttMessage(sample,ByteString(97)) MqttMessage(sample,ByteString(97, 98)) MqttMessage(sample,ByteString(97, 98, 99))
(b) Subscribe2(Persistent Session 版)状況
> groovy mqtt_subscribe2.groovy sample subscribe2 subscribe: sample MqttMessage(sample,ByteString(97)) MqttMessage(sample,ByteString(97, 98)) MqttMessage(sample,ByteString(97, 98, 99))
ここで (a) と (b) の処理を一度終了しておき、その状態で publish してみます。
Publish 実行
> groovy mqtt_publish.groovy sample publish1 abcd > groovy mqtt_publish.groovy sample publish1 abcde
(a) と (b) を再実行すると、以下のように Persistent Session 版の方は停止中に publish されたメッセージを取得できました。
(a) Subscribe1 再実行
> groovy mqtt_subscribe.groovy sample subscribe1 subscribe: sample
(b) Subscribe2(Persistent Session 版)再実行
> groovy mqtt_subscribe2.groovy sample subscribe2 subscribe: sample MqttMessage(sample,ByteString(97, 98, 99, 100)) MqttMessage(sample,ByteString(97, 98, 99, 100, 101))
MQTT Broker をローカル実行
以下の MQTT Broker をそれぞれローカルで実行してみました。
ソースは http://github.com/fits/try_samples/tree/master/blog/20170910/
Mosca
Mosca は Node.js 用の MQTT Broker です。
npm でインストールして mosca
コマンドで実行できます。
インストール例
> npm install mosca
実行例
> mosca -v +++.+++: ,+++ +++; '+++ +++. ++.+++.++ ++.++ ++,'+ `+',++ ++,++ +` +, +: .+ .+ +; +; '+ '+ +` +` +` +. +: ,+ `+ ++ +; '+ ;+ + +. +` +. +: ,+ `+ +' '+ + +. +` +. +: ,+ `+ :+. '+ +++++. +` +. +: ,+ `+ ++ '+ +++++. +` +. +: ,+ `+ ++ '+ + +. +` +. +: ,+ `+ +: +: '+ ;+ + +. +` +. +: .+ .+ +; +; '+ '+ + +. +` +. +: ++;++ ++'++ ++'+' + +. +` +. +: +++ +++. ,++' + +. {"pid":11260,"hostname":"host1","name":"mosca","level":30,"time":1504448625943,"msg":"server started","mqtt":1883,"v":1}
ログが JSON で出力されていますが、pino ※ を使えば以下のようにログを整形して出力してくれます。
※ mosca 2.5.2 を npm install すると pino もインストールされました
実行例 - pino 利用
> mosca -v | pino +++.+++: ,+++ +++; '+++ +++. ++.+++.++ ++.++ ++,'+ `+',++ ++,++ +` +, +: .+ .+ +; +; '+ '+ +` +` +` +. +: ,+ `+ ++ +; '+ ;+ + +. +` +. +: ,+ `+ +' '+ + +. +` +. +: ,+ `+ :+. '+ +++++. +` +. +: ,+ `+ ++ '+ +++++. +` +. +: ,+ `+ ++ '+ + +. +` +. +: ,+ `+ +: +: '+ ;+ + +. +` +. +: .+ .+ +; +; '+ '+ + +. +` +. +: ++;++ ++'++ ++'+' + +. +` +. +: +++ +++. ,++' + +. [2017-09-03T14:24:23.929Z] INFO (mosca/3124 on host1): server started mqtt: 1883
サーバー組み込み実行
Mosca を組み込み実行するコードは以下の通りです。
実際は new mosca.Server()
だけでサーバーが起動するのですが、そのままだとクライアントからの接続状況が分かり難いのでログ出力しています。
mosca_run.js
const mosca = require('mosca') const server = new mosca.Server() server.on('ready', () => console.log('server started')) server.on('clientConnected', client => console.log(`client connected: ${client.id}`)) server.on('published', (packet) => console.log(`published: ${JSON.stringify(packet)}`))
クライアント処理
MQTT.js をインストールして MQTT のクライアント処理を実装してみます。
MQTT.js インストール例
> npm install mqtt
まずは、指定のトピックへメッセージを publish する処理です。
publish_sample.js
const mqtt = require('mqtt') const client = mqtt.connect('mqtt://localhost') const topic = process.argv[2] const msg = process.argv[3] client.on('connect', () => { client.publish(topic, msg) client.end() })
次に、指定のトピックを subscribe する処理です。
subscribe_sample.js
const mqtt = require('mqtt') const client = mqtt.connect('mqtt://localhost') const topic = process.argv[2] client.on('connect', () => { client.subscribe(topic) }) client.on('message', (topic, msg) => { console.log(`topic: ${topic}, msg: ${msg}`) })
動作確認
まずは Mosca サーバーを実行しておきます。(mosca コマンドで実行しても可)
サーバー組み込み実行
> node mosca_run.js server started
次に data トピックを subscribe してみます。
subscribe 実行
> node subscribe_sample.js data
data トピックへ sample1 ~ 3 という文字列を publish してみます。
publish 実行
> node publish_sample.js data sample1 > node publish_sample.js data sample2 > node publish_sample.js data sample3
subscribe 側にメッセージが出力されました。
subscribe の結果
topic: data, msg: sample1 topic: data, msg: sample2 topic: data, msg: sample3
Moquette
Moquette は Java 用の MQTT Broker です。
distribution-0.10-bundle-tar.tar.gz をダウンロード・解凍した後、bin/moquette.bat や bin/moquette.sh で実行できるようですが、moquette.bat の内容に問題があって、そのままでは Java 8 で実行できませんでした。(## の行を削除するか rem を付けて、JAVA_OPTS の設定を削る等が必要でした)
サーバー組み込み実行
組み込み実行は io.moquette.server.Server
の main
メソッドを呼び出すだけです。
ただし、このままでは IllegalArgumentException: Can't locate file "null"
となってしまうので config/moquette.conf
ファイルを作成しておきます。(デフォルト設定を使うのなら中身は空でよい)
moquette_run.groovy
@GrabResolver(name = 'bintray', root = 'https://jcenter.bintray.com') @Grab('io.moquette:moquette-broker:0.10') @Grab('org.slf4j:slf4j-simple:1.7.25') import io.moquette.server.Server Server.main(args)
動作確認
Mosca と同様に動作確認を行ってみます。
サーバー組み込み実行
> groovy moquette_run.groovy ・・・ [main] INFO io.moquette.server.netty.NettyAcceptor - Server has been bound. host=0.0.0.0, port=1883 [main] INFO io.moquette.server.netty.NettyAcceptor - Configuring Websocket MQTT transport [main] INFO io.moquette.server.netty.NettyAcceptor - Property websocket_port has been setted to disabled. Websocket MQTT will be disabled [main] INFO io.moquette.server.Server - Moquette server has been initialized successfully Server started, version 0.10
data トピックを subscribe してみます。
subscribe 実行
> node subscribe_sample.js data
data トピックへ sample1 ~ 3 という文字列を publish してみます。
publish 実行
> node publish_sample.js data sample1 > node publish_sample.js data sample2 > node publish_sample.js data sample3
subscribe 側にメッセージが出力されました。
subscribe の結果
topic: data, msg: sample1 topic: data, msg: sample2 topic: data, msg: sample3