MySQL Binary Log connector でバイナリログをイベント処理
MySQL Binary Log connector (mysql-binlog-connector-java) を使うと、Java プログラムで MySQL / MariaDB のバイナリログをイベント処理できます。
そのため、MySQL の CDC(Change Data Capture)として使えるかもしれません。
ソースは http://github.com/fits/try_samples/tree/master/blog/20171030/
Groovy で実装
MySQL へ接続してバイナリログの内容を取得するには BinaryLogClient
を使います。
registerEventListener
メソッドで EventListener
実装オブジェクトを登録しておくと、バイナリログの内容をデシリアライズした Event
オブジェクトを引数として onEvent
メソッドを呼び出してくれます。
BinaryLogClient のソース(listenForEventPackets() メソッドなど)を見てみると、バイナリログを順番にデシリアライズして(特定のイベントタイプをスキップするような処理は無さそう)、登録している EventListener の onEvent を順次呼び出しているだけのようなので、場合によっては処理性能に注意が必要かもしれません。
binlog_sample.groovy
@Grab('com.github.shyiko:mysql-binlog-connector-java:0.13.0') import com.github.shyiko.mysql.binlog.BinaryLogClient def host = args[0] def port = args[1] as int def user = args[2] def pass = args[3] def client = new BinaryLogClient(host, port, user, pass) client.registerEventListener { ev -> // バイナリログの内容を処理 println ev } client.connect()
動作確認
動作確認は Docker で行ってみます。
準備
BinaryLogClient で接続するために MySQL 側でレプリケーション用の設定を行います。
まずは、レプリケーションの設定ファイルを用意します。
/home/vagrant/mysql/conf/repl.cnf (レプリケーションの設定)
[mysqld] log-bin=mysql-bin server-id=1
次に、レプリケーション用の接続ユーザーを追加するための SQL ファイルも用意しておきます。
/home/vagrant/mysql/init/repl-user.sql (レプリケーション用のユーザー)
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO repl@'%' IDENTIFIED BY 'pass';
今回はコンテナ間の接続(DB への接続)に Docker のユーザー定義ネットワークを使います。
そのため、まずはブリッジネットワーク(sample1)を作成しておきます。
Docker ユーザー定義ネットワークの作成
$ docker network create --driver bridge sample1
/home/vagrant/groovy/binlog_sample.groovy ファイルを用意した後、sample1 のネットワークへ参加するように Groovy のコンテナを実行します。
Groovy コンテナ実行
$ docker run --rm -it --net=sample1 -v /home/vagrant/groovy:/work groovy bash ・・・ groovy@・・・:~$ cd /work
a. MySQL 5.7 の場合
それでは、MySQL のコンテナを実行して動作確認を行います。
- MySQL 5.7.20
MySQL の Docker 公式イメージでは、/etc/mysql/conf.d 内の設定ファイルを適用し、/docker-entrypoint-initdb.d 内の SQL ファイルを実行するようになっています。
今回はこれを使って、先ほど用意しておいたレプリケーションの設定ファイルとユーザー作成 SQL を適用するように実行します。
a-1. MySQL コンテナ実行
$ docker run --name mysql1 --net=sample1 -e MYSQL_ROOT_PASSWORD=secret -d -v /home/vagrant/mysql/conf:/etc/mysql/conf.d -v /home/vagrant/mysql/init:/docker-entrypoint-initdb.d mysql
事前に実行しておいた Groovy コンテナ上で binlog_sample.groovy を実行します。 ユーザー名とパスワードはレプリケーション用のものを使用します。
a-2. binlog_sample.groovy 実行(Groovy コンテナ)
groovy@・・・:/work$ groovy binlog_sample.groovy mysql1 3306 repl pass Oct 22, 2017 4:46:02 PM com.github.shyiko.mysql.binlog.BinaryLogClient connect INFO: Connected to mysql1:3306 at mysql-bin.000003/154 (sid:65535, cid:3) Event{header=EventHeaderV4{timestamp=0, eventType=ROTATE, serverId=1, headerLength=19, dataLength=28, nextPosition=0, flags=32}, data=RotateEventData{binlogFilename='mysql-bin.000003', binlogPosition=154}} ・・・
この状態で以下の SQL を実行してみます。
CREATE DATABASE db1; USE db1; CREATE TABLE tbl1 (id int NOT NULL PRIMARY KEY, name varchar(10) NOT NULL); INSERT INTO tbl1 VALUES (1, 'a'); UPDATE tbl1 SET name = 'aa' WHERE id = 1; DELETE FROM tbl1 WHERE id = 1; CREATE TABLE tbl2 (id int NOT NULL PRIMARY KEY, name varchar(10) NOT NULL); START TRANSACTION; INSERT INTO tbl1 VALUES (1, 'a'); INSERT INTO tbl2 VALUES (2, 'b'); COMMIT;
上記 SQL 実行後の出力結果です。
a-3. binlog_sample.groovy 出力結果
・・・ Event{header=EventHeaderV4{timestamp=1508690815000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=219, flags=0}, data=null} Event{header=EventHeaderV4{timestamp=1508690815000, eventType=QUERY, serverId=1, headerLength=19, dataLength=72, nextPosition=310, flags=8}, data=QueryEventData{threadId=4, executionTime=0, errorCode=0, database='db1', sql='CREATE DATABASE db1'}} Event{header=EventHeaderV4{timestamp=1508690815000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=375, flags=0}, data=null} Event{header=EventHeaderV4{timestamp=1508690815000, eventType=QUERY, serverId=1, headerLength=19, dataLength=127, nextPosition=521, flags=0}, data=QueryEventData{threadId=4, executionTime=0, errorCode=0, database='db1', sql='CREATE TABLE tbl1 (id int NOT NULL PRIMARY KEY, name varchar(10) NOT NULL)'}} Event{header=EventHeaderV4{timestamp=1508690815000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=586, flags=0}, data=null} Event{header=EventHeaderV4{timestamp=1508690815000, eventType=QUERY, serverId=1, headerLength=19, dataLength=52, nextPosition=657, flags=8}, data=QueryEventData{threadId=4, executionTime=0, errorCode=0, database='db1', sql='BEGIN'}} Event{header=EventHeaderV4{timestamp=1508690815000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=30, nextPosition=706, flags=0}, data=TableMapEventData{tableId=219, database='db1', table='tbl1', columnTypes=3, 15, columnMetadata=0, 10, columnNullability={}}} Event{header=EventHeaderV4{timestamp=1508690815000, eventType=EXT_WRITE_ROWS, serverId=1, headerLength=19, dataLength=23, nextPosition=748, flags=0}, data=WriteRowsEventData{tableId=219, includedColumns={0, 1}, rows=[ [1, a] ]}} Event{header=EventHeaderV4{timestamp=1508690815000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=779, flags=0}, data=XidEventData{xid=14}} Event{header=EventHeaderV4{timestamp=1508690815000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=844, flags=0}, data=null} Event{header=EventHeaderV4{timestamp=1508690815000, eventType=QUERY, serverId=1, headerLength=19, dataLength=52, nextPosition=915, flags=8}, data=QueryEventData{threadId=4, executionTime=0, errorCode=0, database='db1', sql='BEGIN'}} Event{header=EventHeaderV4{timestamp=1508690815000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=30, nextPosition=964, flags=0}, data=TableMapEventData{tableId=219, database='db1', table='tbl1', columnTypes=3, 15, columnMetadata=0, 10, columnNullability={}}} Event{header=EventHeaderV4{timestamp=1508690815000, eventType=EXT_UPDATE_ROWS, serverId=1, headerLength=19, dataLength=32, nextPosition=1015, flags=0}, data=UpdateRowsEventData{tableId=219, includedColumnsBeforeUpdate={0, 1}, includedColumns={0, 1}, rows=[ {before=[1, a], after=[1, aa]} ]}} Event{header=EventHeaderV4{timestamp=1508690815000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=1046, flags=0}, data=XidEventData{xid=15}} Event{header=EventHeaderV4{timestamp=1508690815000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=1111, flags=0}, data=null} Event{header=EventHeaderV4{timestamp=1508690815000, eventType=QUERY, serverId=1, headerLength=19, dataLength=52, nextPosition=1182, flags=8}, data=QueryEventData{threadId=4, executionTime=0, errorCode=0, database='db1', sql='BEGIN'}} Event{header=EventHeaderV4{timestamp=1508690815000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=30, nextPosition=1231, flags=0}, data=TableMapEventData{tableId=219, database='db1', table='tbl1', columnTypes=3, 15, columnMetadata=0, 10, columnNullability={}}} Event{header=EventHeaderV4{timestamp=1508690815000, eventType=EXT_DELETE_ROWS, serverId=1, headerLength=19, dataLength=24, nextPosition=1274, flags=0}, data=DeleteRowsEventData{tableId=219, includedColumns={0, 1}, rows=[ [1, aa] ]}} Event{header=EventHeaderV4{timestamp=1508690815000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=1305, flags=0}, data=XidEventData{xid=16}} Event{header=EventHeaderV4{timestamp=1508690815000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=1370, flags=0}, data=null} Event{header=EventHeaderV4{timestamp=1508690815000, eventType=QUERY, serverId=1, headerLength=19, dataLength=127, nextPosition=1516, flags=0}, data=QueryEventData{threadId=4, executionTime=1, errorCode=0, database='db1', sql='CREATE TABLE tbl2 (id int NOT NULL PRIMARY KEY, name varchar(10) NOT NULL)'}} Event{header=EventHeaderV4{timestamp=1508690819000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=1581, flags=0}, data=null} Event{header=EventHeaderV4{timestamp=1508690816000, eventType=QUERY, serverId=1, headerLength=19, dataLength=52, nextPosition=1652, flags=8}, data=QueryEventData{threadId=4, executionTime=0, errorCode=0, database='db1', sql='BEGIN'}} Event{header=EventHeaderV4{timestamp=1508690816000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=30, nextPosition=1701, flags=0}, data=TableMapEventData{tableId=219, database='db1', table='tbl1', columnTypes=3, 15, columnMetadata=0, 10, columnNullability={}}} Event{header=EventHeaderV4{timestamp=1508690816000, eventType=EXT_WRITE_ROWS, serverId=1, headerLength=19, dataLength=23, nextPosition=1743, flags=0}, data=WriteRowsEventData{tableId=219, includedColumns={0, 1}, rows=[ [1, a] ]}} Event{header=EventHeaderV4{timestamp=1508690816000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=30, nextPosition=1792, flags=0}, data=TableMapEventData{tableId=220, database='db1', table='tbl2', columnTypes=3, 15, columnMetadata=0, 10, columnNullability={}}} Event{header=EventHeaderV4{timestamp=1508690816000, eventType=EXT_WRITE_ROWS, serverId=1, headerLength=19, dataLength=23, nextPosition=1834, flags=0}, data=WriteRowsEventData{tableId=220, includedColumns={0, 1}, rows=[ [2, b] ]}} Event{header=EventHeaderV4{timestamp=1508690819000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=1865, flags=0}, data=XidEventData{xid=19}}
eventType を簡単にまとめると以下のようになりました。
SQL | eventType |
---|---|
CREATE | QUERY |
INSERT | EXT_WRITE_ROWS |
UPDATE | EXT_UPDATE_ROWS |
DELETE | EXT_DELETE_ROWS |
注意点として、EXT_XXX_ROWS の Event 内容にテーブル名は含まれておらず tableId
で判断する必要がありそうです。
tableId とテーブル名のマッピングは直前の TABLE_MAP
で実施されています。
また、バイナリログのフォーマット(以下)は RBR(行ベースレプリケーション) となっていました。
mysql> show global variables like 'binlog_format'; +---------------+-------+ | Variable_name | Value | +---------------+-------+ | binlog_format | ROW | +---------------+-------+
b. MariaDB 10.2 の場合
ついでに、MariaDB でも試してみます。
- MariaDB 10.2.9
設定は MySQL と同じものが使えるので、ここでは Docker イメージ名を mariadb に変えて実行するだけです。(以下ではコンテナ名も変えています)
b-1. MariaDB コンテナ実行
$ docker run --name mariadb1 --net=sample1 -e MYSQL_ROOT_PASSWORD=secret -d -v /home/vagrant/mysql/conf:/etc/mysql/conf.d -v /home/vagrant/mysql/init:/docker-entrypoint-initdb.d mariadb
接続先を mariadb1 (MariaDB) へ変えてスクリプトを実行します。
b-2. binlog_sample.groovy 実行(Groovy コンテナ)
groovy@・・・:/work$ groovy binlog_sample.groovy mariadb1 3306 repl pass ・・・
b-3. binlog_sample.groovy 出力結果
・・・ Event{header=EventHeaderV4{timestamp=1508691100000, eventType=QUERY, serverId=1, headerLength=19, dataLength=23, nextPosition=384, flags=8}, data=QueryEventData{threadId=0, executionTime=0, errorCode=0, database='', sql='# Dum'}} Event{header=EventHeaderV4{timestamp=1508691100000, eventType=QUERY, serverId=1, headerLength=19, dataLength=66, nextPosition=469, flags=8}, data=QueryEventData{threadId=10, executionTime=0, errorCode=0, database='db1', sql='CREATE DATABASE db1'}} Event{header=EventHeaderV4{timestamp=1508691100000, eventType=QUERY, serverId=1, headerLength=19, dataLength=23, nextPosition=511, flags=8}, data=QueryEventData{threadId=0, executionTime=0, errorCode=0, database='', sql='# Dum'}} Event{header=EventHeaderV4{timestamp=1508691100000, eventType=QUERY, serverId=1, headerLength=19, dataLength=121, nextPosition=651, flags=0}, data=QueryEventData{threadId=10, executionTime=0, errorCode=0, database='db1', sql='CREATE TABLE tbl1 (id int NOT NULL PRIMARY KEY, name varchar(10) NOT NULL)'}} Event{header=EventHeaderV4{timestamp=1508691100000, eventType=QUERY, serverId=1, headerLength=19, dataLength=23, nextPosition=693, flags=8}, data=QueryEventData{threadId=0, executionTime=0, errorCode=0, database='', sql='BEGIN'}} Event{header=EventHeaderV4{timestamp=1508691100000, eventType=QUERY, serverId=1, headerLength=19, dataLength=79, nextPosition=791, flags=0}, data=QueryEventData{threadId=10, executionTime=0, errorCode=0, database='db1', sql='INSERT INTO tbl1 VALUES (1, 'a')'}} Event{header=EventHeaderV4{timestamp=1508691100000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=822, flags=0}, data=XidEventData{xid=12}} Event{header=EventHeaderV4{timestamp=1508691100000, eventType=QUERY, serverId=1, headerLength=19, dataLength=23, nextPosition=864, flags=8}, data=QueryEventData{threadId=0, executionTime=0, errorCode=0, database='', sql='BEGIN'}} Event{header=EventHeaderV4{timestamp=1508691100000, eventType=QUERY, serverId=1, headerLength=19, dataLength=87, nextPosition=970, flags=0}, data=QueryEventData{threadId=10, executionTime=0, errorCode=0, database='db1', sql='UPDATE tbl1 SET name = 'aa' WHERE id = 1'}} Event{header=EventHeaderV4{timestamp=1508691100000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=1001, flags=0}, data=XidEventData{xid=13}} Event{header=EventHeaderV4{timestamp=1508691100000, eventType=QUERY, serverId=1, headerLength=19, dataLength=23, nextPosition=1043, flags=8}, data=QueryEventData{threadId=0, executionTime=0, errorCode=0, database='', sql='BEGIN'}} Event{header=EventHeaderV4{timestamp=1508691100000, eventType=QUERY, serverId=1, headerLength=19, dataLength=76, nextPosition=1138, flags=0}, data=QueryEventData{threadId=10, executionTime=0, errorCode=0, database='db1', sql='DELETE FROM tbl1 WHERE id = 1'}} Event{header=EventHeaderV4{timestamp=1508691100000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=1169, flags=0}, data=XidEventData{xid=14}} Event{header=EventHeaderV4{timestamp=1508691100000, eventType=QUERY, serverId=1, headerLength=19, dataLength=23, nextPosition=1211, flags=8}, data=QueryEventData{threadId=0, executionTime=0, errorCode=0, database='', sql='# Dum'}} Event{header=EventHeaderV4{timestamp=1508691100000, eventType=QUERY, serverId=1, headerLength=19, dataLength=121, nextPosition=1351, flags=0}, data=QueryEventData{threadId=10, executionTime=1, errorCode=0, database='db1', sql='CREATE TABLE tbl2 (id int NOT NULL PRIMARY KEY, name varchar(10) NOT NULL)'}} Event{header=EventHeaderV4{timestamp=1508691101000, eventType=QUERY, serverId=1, headerLength=19, dataLength=23, nextPosition=1393, flags=8}, data=QueryEventData{threadId=0, executionTime=0, errorCode=0, database='', sql='BEGIN'}} Event{header=EventHeaderV4{timestamp=1508691101000, eventType=QUERY, serverId=1, headerLength=19, dataLength=79, nextPosition=1491, flags=0}, data=QueryEventData{threadId=10, executionTime=0, errorCode=0, database='db1', sql='INSERT INTO tbl1 VALUES (1, 'a')'}} Event{header=EventHeaderV4{timestamp=1508691101000, eventType=QUERY, serverId=1, headerLength=19, dataLength=79, nextPosition=1589, flags=0}, data=QueryEventData{threadId=10, executionTime=0, errorCode=0, database='db1', sql='INSERT INTO tbl2 VALUES (2, 'b')'}} Event{header=EventHeaderV4{timestamp=1508691101000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=1620, flags=0}, data=XidEventData{xid=17}}
eventType を簡単にまとめると以下のようになりました。
SQL | eventType |
---|---|
CREATE | QUERY |
INSERT | QUERY |
UPDATE | QUERY |
DELETE | QUERY |
バイナリログのフォーマットは MBR(ミックスベースレプリケーション)となっていました。
MariaDB [(none)]> show global variables like 'binlog_format'; +---------------+-------+ | Variable_name | Value | +---------------+-------+ | binlog_format | MIXED | +---------------+-------+
Akka でステートマシンを処理
前回 の有限ステートマシン(FSM)の処理を Akka の Java 用 API を使って実装してみます。
ソースは http://github.com/fits/try_samples/tree/master/blog/20171016/
ステートマシンの実装
まずは、以下のステートマシンを実装します。
- 初期状態は Idle 状態
- Idle 状態で On イベントが発生すると Active 状態へ遷移
- Active 状態で Off イベントが発生すると Idle 状態へ遷移
現在の状態 | Off | On |
---|---|---|
Idle | Active | |
Active | Idle |
AbstractFSM<状態の型, データの型>
を extends する事でステートマシンを実装します。
状態以外にも状態のデータを指定できるので、Active 状態へ変化した回数をカウントするようにしてみました。
指定の状態で特定のイベントを処理するには when(現在の状態, FSMStateFunctionBuilder)
を使い、イベントに対する処理は matchEventEquals()
等を使います。
goTo(遷移先の状態)
で状態を遷移させ、その際に using(データ)
で状態遷移後のデータを指定できます。
処理対象外のイベントを受け取った場合の処理は whenUnhandled(FSMStateFunctionBuilder)
で指定でき、状態遷移の状況確認に onTransition
が使えます。
sample.groovy
@Grab('com.typesafe.akka:akka-actor_2.12:2.5.6') import akka.actor.AbstractFSM import akka.actor.ActorSystem import akka.actor.ActorRef import akka.actor.Props enum States { Idle, Active } enum Events { On, Off } class SampleStateMachine extends AbstractFSM<States, Integer> { { // 初期状態の設定 startWith(States.Idle, 0) // On イベントで Idle から Active 状態へ遷移 when(States.Idle, matchEventEquals(Events.On) { event, data -> // Active へ遷移 goTo(States.Active).using(data + 1) }) // Off イベントで Active から Idle 状態へ遷移 when(States.Active, matchEventEquals(Events.Off) { event, data -> // Idle へ遷移 goTo(States.Idle) }) // 処理されないイベント発生時 whenUnhandled( matchAnyEvent { event, data -> println "*** Unhandled event=${event}, data=${data}" stay() } ) // 状態遷移の発生時 onTransition { from, to -> println "*** stateChanged: ${from} -> ${to}, data=${stateData()}, next data=${nextStateData()}" } } } def system = ActorSystem.create() def actor = system.actorOf(Props.create(SampleStateMachine)) actor.tell(Events.On, ActorRef.noSender()) actor.tell(Events.Off, ActorRef.noSender()) actor.tell(Events.Off, ActorRef.noSender()) sleep 2000 system.terminate()
実行結果は以下の通りです。
実行結果
> groovy sample.groovy *** stateChanged: Idle -> Active, data=0, next data=1 *** stateChanged: Active -> Idle, data=1, next data=1 *** Unhandled event=Off, data=1
タイムアウト付きステートマシンの実装1
次は、タイムアウト時の遷移を追加してみます。
現在の状態 | Off | On | Timeout (2秒) |
---|---|---|---|
Idle | Active | ||
Active | Idle | Idle |
when で scala.concurrent.duration.FiniteDuration
を指定すると状態のタイムアウト ※ を指定できます。
※ ただし、このタイムアウトは その状態で何もメッセージを受信しなかった時間に対する タイムアウトのようです 状態の遷移が発生しなくても メッセージを受信する度にタイムアウトはリセットされます
タイムアウト発生時のイベント(メッセージ)は StateTimeout()
の戻り値と等しくなります。
timeout_sample1.groovy
・・・ import scala.concurrent.duration.Duration import java.util.concurrent.TimeUnit enum States { Idle, Active } enum Events { On, Off } class SampleStateMachine extends AbstractFSM<States, Integer> { { startWith(States.Idle, 0) when(States.Idle, matchEventEquals(Events.On) { event, data -> goTo(States.Active).using(data + 1) }) when(States.Active, Duration.create(2, TimeUnit.SECONDS), matchEventEquals(Events.Off) { event, data -> goTo(States.Idle) }.eventEquals(StateTimeout()) { event, data -> // タイムアウト時の処理 println "*** timeout: event=${event}, data=${data}" goTo(States.Idle) /* 以下でも可 self().tell(Events.Off, self()) stay() */ } ) ・・・ } } def system = ActorSystem.create() def actor = system.actorOf(Props.create(SampleStateMachine)) actor.tell(Events.On, ActorRef.noSender()) actor.tell(Events.Off, ActorRef.noSender()) actor.tell(Events.Off, ActorRef.noSender()) actor.tell(Events.On, ActorRef.noSender()) sleep 2500 system.terminate()
実行結果は以下の通りです。
実行結果
> groovy timeout_sample1.groovy *** stateChanged: Idle -> Active, data=0, next data=1 *** stateChanged: Active -> Idle, data=1, next data=1 *** Unhandled event=Off, data=1 *** stateChanged: Idle -> Active, data=1, next data=2 *** timeout: event=StateTimeout, data=2 *** stateChanged: Active -> Idle, data=2, next data=2
when で指定したタイムアウトはイベント(メッセージ)を受信するとリセットされる事を確認するため、以下のようにタイムアウト発生前に invalid-message という文字列を 2回 tell するようにしてみます。
timeout_sample1b.groovy (タイムアウトの検証)
・・・ def system = ActorSystem.create() def actor = system.actorOf(Props.create(SampleStateMachine)) ・・・ actor.tell(Events.On, ActorRef.noSender()) sleep 1500 // 1回目 actor.tell('invalid-message', ActorRef.noSender()) sleep 1500 // 2回目 actor.tell('invalid-message', ActorRef.noSender()) sleep 2500 system.terminate()
実行結果は、以下のように invalid-message という文字列を 2回受信した後にタイムアウトしました。
つまり、メッセージの受信でタイムアウトがリセットされていると考えられます。
実行結果(タイムアウトの検証)
> groovy timeout_sample1b.groovy ・・・ *** stateChanged: Idle -> Active, data=1, next data=2 *** Unhandled event=invalid-message, data=2 *** Unhandled event=invalid-message, data=2 *** timeout: event=StateTimeout, data=2 *** stateChanged: Active -> Idle, data=2, next data=2
タイムアウト付きステートマシンの実装2 (状態のタイムアウト)
メッセージの受信有無に左右されないタイムアウトを実現するには、when のタイムアウトを使わずに setTimer()
を使う事で実装できそうです。
timeout_sample2.groovy
・・・ class SampleStateMachine extends AbstractFSM<States, Integer> { { ・・・ when(States.Active, matchEventEquals(Events.Off) { event, data -> goTo(States.Idle) }.eventEquals(StateTimeout()) { event, data -> println "*** timeout: event=${event}, data=${data}" goTo(States.Idle) /* 以下でも可 self().tell(Events.Off, self()) stay() */ } ) ・・・ onTransition { from, to -> println "*** stateChanged: ${from} -> ${to}, data=${stateData()}, next data=${nextStateData()}" if (to == States.Active) { // Active 状態のタイムアウト設定 setTimer( 'active-timeout', StateTimeout(), Duration.create(2, TimeUnit.SECONDS) ) } else { // タイムアウトのキャンセル cancelTimer('active-timeout') } } } } def system = ActorSystem.create() def actor = system.actorOf(Props.create(SampleStateMachine)) ・・・ actor.tell(Events.On, ActorRef.noSender()) sleep 1500 actor.tell("invalid-message", ActorRef.noSender()) sleep 1500 actor.tell("invalid-message", ActorRef.noSender()) sleep 2500 system.terminate()
実行してみると、2回目の invalid-message を受信する前にタイムアウトが発生しており意図通りの動作となりました。
実行結果
> groovy timeout_sample2.groovy ・・・ *** stateChanged: Idle -> Active, data=1, next data=2 *** Unhandled event=invalid-message, data=2 *** timeout: event=StateTimeout, data=2 *** stateChanged: Active -> Idle, data=2, next data=2 *** Unhandled event=invalid-message, data=2
Akka Streams で MQTT Broker へ接続
ローカルで実行した MQTT Broker(前回 参照)に対して Akka Streams の Java 用 API を使って Groovy で接続してみます。
Akka Streams 用の様々なコネクタを備えた Alpakka に MQTT Broker 用の Source や Sink が用意されているので、今回はこちらを使います。
今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170925/
Publish 処理
まずは publish 処理です。
MqttConnectionSettings
から MqttSink
を作成し、MqttMessage
を渡せば MQTT Broker へメッセージを送信できます。
MqttConnectionSettings は MQTT Broker の接続先と clientId
、そして永続化の方法を指定して作成します。
clientId はクライアント毎に一意な値を指定します。 (clientId を null
にすると IllegalArgumentException: Null clientId
となりました)
MqttQoS ではメッセージ到達の QoS を指定します。
MqttQoS.atLeastOnce()
は少なくとも 1回(重複の可能性あり)の到達可能性を指定する事になります。
@Grab('com.typesafe.akka:akka-stream_2.12:2.5.4') @Grab('com.lightbend.akka:akka-stream-alpakka-mqtt_2.12:0.11') import akka.actor.ActorSystem import akka.stream.ActorMaterializer import akka.stream.javadsl.Source import akka.stream.alpakka.mqtt.MqttQoS import akka.stream.alpakka.mqtt.MqttConnectionSettings import akka.stream.alpakka.mqtt.MqttMessage import akka.stream.alpakka.mqtt.javadsl.MqttSink import akka.util.ByteString def topic = args[0] def clientId = args[1] def message = args[2] def system = ActorSystem.create() def mat = ActorMaterializer.create(system) def settings = MqttConnectionSettings.create( 'tcp://localhost:1883', clientId, new org.eclipse.paho.client.mqttv3.persist.MemoryPersistence() ) def msg = MqttMessage.create(topic, ByteString.fromString(message)) Source.single(msg) .runWith(MqttSink.create(settings, MqttQoS.atLeastOnce()), mat) // メッセージ送信前に terminate しないようスリープで調整 sleep 1000 system.terminate()
Subscribe 処理
次に subscribe 処理です。
MqttConnectionSettings から MqttSourceSettings
を作り、MqttSource
を作成します。
MqttSourceSettings の withSubscriptions
で subscribe するトピック名と QoS を指定します。
mqtt_subscribe.groovy
@Grab('com.typesafe.akka:akka-stream_2.12:2.5.4') @Grab('com.lightbend.akka:akka-stream-alpakka-mqtt_2.12:0.11') import akka.actor.ActorSystem import akka.japi.Pair import akka.stream.ActorMaterializer import akka.stream.javadsl.Sink import akka.stream.alpakka.mqtt.MqttQoS import akka.stream.alpakka.mqtt.MqttSourceSettings import akka.stream.alpakka.mqtt.MqttConnectionSettings import akka.stream.alpakka.mqtt.javadsl.MqttSource def topic = args[0] def clientId = args[1] def system = ActorSystem.create() def mat = ActorMaterializer.create(system) def settings = MqttSourceSettings.create( MqttConnectionSettings.create( 'tcp://localhost:1883', clientId, new org.eclipse.paho.client.mqttv3.persist.MemoryPersistence() ) ).withSubscriptions( // トピックと QoS の指定 Pair.create(topic, MqttQoS.atLeastOnce()) ) MqttSource.create(settings, 10).runWith(Sink.foreach { println it }, mat) println "subscribe : ${topic}" System.in.read() system.terminate()
上記の処理では、再接続の際にセッションがクリアされてしまうので、通信が切断している間のメッセージを後で受け取るような事はできません。
再接続の際にセッションが復元される Persistent Session を適用するには、以下のように withCleanSession(false)
とする必要があります。
なお、Persistent Session を適用するには同じ clientId を使って再接続する必要があります。
mqtt_subscribe2.groovy (Persistent Session 版)
・・・ def settings = MqttSourceSettings.create( MqttConnectionSettings.create( 'tcp://localhost:1883', clientId, new org.eclipse.paho.client.mqttv3.persist.MemoryPersistence() ).withCleanSession(false) // Persistent Session ).withSubscriptions( Pair.create(topic, MqttQoS.atLeastOnce()) ) ・・・
動作確認
前回 の Moquette 組み込み実行スクリプトで MQTT Broker を起動しておきます。
MQTT Broker 実行
> groovy moquette_run.groovy ・・・ [main] INFO io.moquette.server.netty.NettyAcceptor - Server has been bound. host=0.0.0.0, port=1883 ・・・ Server started, version 0.10
Subscribe 処理を実行します。
動作確認のため Persistent Session 版も実行していますが、(Publish 処理も含め) clientId へ異なる値をそれぞれ指定します。(ここでは subscribe1・2 と publish1 としています)
また、トピック名は sample とします。
(a) Subscribe1 実行
> groovy mqtt_subscribe.groovy sample subscribe1 subscribe: sample
(b) Subscribe2(Persistent Session 版)実行
> groovy mqtt_subscribe2.groovy sample subscribe2 subscribe: sample
この状態で “a” “ab” “abc” という 3つのメッセージを publish してみます。
Publish 実行
> groovy mqtt_publish.groovy sample publish1 a > groovy mqtt_publish.groovy sample publish1 ab > groovy mqtt_publish.groovy sample publish1 abc
Subscribe の結果は以下のようになりました。
(a) Subscribe1 状況
> groovy mqtt_subscribe.groovy sample subscribe1 subscribe: sample MqttMessage(sample,ByteString(97)) MqttMessage(sample,ByteString(97, 98)) MqttMessage(sample,ByteString(97, 98, 99))
(b) Subscribe2(Persistent Session 版)状況
> groovy mqtt_subscribe2.groovy sample subscribe2 subscribe: sample MqttMessage(sample,ByteString(97)) MqttMessage(sample,ByteString(97, 98)) MqttMessage(sample,ByteString(97, 98, 99))
ここで (a) と (b) の処理を一度終了しておき、その状態で publish してみます。
Publish 実行
> groovy mqtt_publish.groovy sample publish1 abcd > groovy mqtt_publish.groovy sample publish1 abcde
(a) と (b) を再実行すると、以下のように Persistent Session 版の方は停止中に publish されたメッセージを取得できました。
(a) Subscribe1 再実行
> groovy mqtt_subscribe.groovy sample subscribe1 subscribe: sample
(b) Subscribe2(Persistent Session 版)再実行
> groovy mqtt_subscribe2.groovy sample subscribe2 subscribe: sample MqttMessage(sample,ByteString(97, 98, 99, 100)) MqttMessage(sample,ByteString(97, 98, 99, 100, 101))
JanusGraph でグラフ操作 - Groovy
TinkerPop の API と互換性があり Cassandra 等をストレージとして使用できる JanusGraph というグラフデータベースがあります。
今回は、「TinkerPop でグラフ操作 - Groovy」 のサンプルコードを JanusGraph 用に変更してみます。
ソースは http://github.com/fits/try_samples/tree/master/blog/20170814/
はじめに
今回はグラフデータの保存先として Cassandra を使うため、Cassandra を事前に実行しておきます。
a. 設定ファイル
TinkerPop の org.apache.tinkerpop.gremlin.structure.util.GraphFactory
を使用する場合、gremlin.graph
へ org.janusgraph.core.JanusGraphFactory
を設定し、JanusGraph 用の設定(storage.backend
等)を加えれば JanusGraph で使えます。
Cassandra を使う場合は storage.backend
へ cassandra
を設定します。
setting.properties
gremlin.graph=org.janusgraph.core.JanusGraphFactory storage.backend=cassandra storage.hostname=127.0.0.1
b. グラフデータ作成
JanusGraph は TinkerPop API と互換性があるため 前回 の処理内容を特に変える必要はなく、@Grapes
の依存ライブラリ構成を JanusGraph 用に変えるだけです。
add-data.groovy
// 前回との違いは @Grapes の内容のみ @Grapes([ @Grab('org.janusgraph:janusgraph-cassandra:0.1.1'), @Grab('org.slf4j:slf4j-simple:1.7.25'), @GrabExclude('xml-apis#xml-apis;1.3.04'), @GrabExclude('com.github.jeremyh#jBCrypt;jbcrypt-0.4'), @GrabExclude('org.slf4j#slf4j-log4j12'), @GrabExclude('ch.qos.logback#logback-classic'), @GrabExclude('org.codehaus.groovy:groovy-swing'), @GrabExclude('org.codehaus.groovy:groovy-xml'), @GrabExclude('org.codehaus.groovy:groovy-jsr223') ]) import org.apache.tinkerpop.gremlin.structure.util.GraphFactory def conf = args[0] def addNode = { g, type, id, name = id -> def res = g.addVertex(type) res.property('oid', id) res.property('name', name) res } def createData = { g -> def p = addNode(g, 'Principals', 'principals') def u1 = addNode(g, 'User', 'user1') def u2 = addNode(g, 'User', 'user2') def ad = addNode(g, 'User', 'admin') def g1 = addNode(g, 'Group', 'group1') [u1, u2, ad, g1].each { it.addEdge('PART_OF', p) } u2.addEdge('PART_OF', g1) def r = addNode(g, 'Resources', 'resources') def s1 = addNode(g, 'Service', 'service1') def s2 = addNode(g, 'Service', 'service2') def s2o1 = addNode(g, 'Operation', 'service2.get', 'get') def s2o2 = addNode(g, 'Operation', 'service2.post', 'post') [s2o1, s2o2].each { s2.addEdge('METHOD', it) } [s1, s2].each { r.addEdge('RESOURCE', it) } u1.addEdge('PERMIT', s1) g1.addEdge('PERMIT', s2o2) ad.addEdge('PERMIT', r) } GraphFactory.open(conf).withAutoCloseable { g -> g.tx().withAutoCloseable { tx -> createData(g) tx.commit() } }
実行結果
> groovy add-data.groovy setting.properties ・・・ [main] INFO com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor - AddHost: 127.0.0.1 [main] INFO org.janusgraph.diskstorage.Backend - Initiated backend operations thread pool of size 8 [main] INFO org.janusgraph.diskstorage.log.kcvs.KCVSLog - Loaded unidentified ReadMarker start time 2017-08-12T14:17:38.078Z into org.janusgraph.diskstorage.log.kcvs.KCVSLog$MessagePuller@3bdb2c78
c. 経路の探索
こちらも @Grapes の内容を変えるだけです。(確認のためログ出力するようにしています)
find-data.groovy
// 前回との違いは @Grapes の内容のみ @Grapes([ @Grab('org.janusgraph:janusgraph-cassandra:0.1.1'), //@Grab('org.slf4j:slf4j-nop:1.7.25'), @Grab('org.slf4j:slf4j-simple:1.7.25'), @GrabExclude('xml-apis#xml-apis;1.3.04'), @GrabExclude('com.github.jeremyh#jBCrypt;jbcrypt-0.4'), @GrabExclude('org.slf4j#slf4j-log4j12'), @GrabExclude('ch.qos.logback#logback-classic'), @GrabExclude('org.codehaus.groovy:groovy-swing'), @GrabExclude('org.codehaus.groovy:groovy-xml'), @GrabExclude('org.codehaus.groovy:groovy-jsr223') ]) import org.apache.tinkerpop.gremlin.structure.util.GraphFactory import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__ def conf = args[0] def start = args[1] def end = args[2] def toStr = { "${it.label()}[${it.id()}]{${it.properties().join(', ')}}" } GraphFactory.open(conf).withAutoCloseable { g -> g.tx().withAutoCloseable { def p = g.traversal().V() .has('oid', start) .repeat(__.outE().as('e').inV()) .until(__.has('oid', end)) .where(__.select('e').unfold().hasLabel('PERMIT')) .path() p.each { println it.objects().collect(toStr).join(' -> ') } } }
実行結果
> groovy find-data.groovy setting.properties user2 service2.post ・・・ [main] WARN org.janusgraph.graphdb.transaction.StandardJanusGraphTx - Query requires iterating over all vertices [(oid = user2)]. For better performance, use indexes ・・・ User[4304]{vp[oid->user2], vp[name->user2]} -> PART_OF[4d6-3bk-4r9-3ao]{} -> Group[4272]{vp[oid->group1], vp[name->group1]} -> PERMIT[6c6-3ao-9hx-37c]{} -> Operation[4152]{vp[oid->service2.post], vp[name->post]}
インデックスを使った方が性能的に望ましいとの警告ログが出力されました。
d. インデックスの作成
警告ログを解消するためにインデックスを使ってみます。
TinkerPop で(汎用的な)インデックス作成 API を見つけられなかったので、インデックス作成に関しては JanusGraph の専用 API を使う必要がありそうです。
JanusGraph のインデックス機能には以下の 2通りがあり、指定のプロパティ値を持つノードやエッジを見つける用途には (a) を使えば良さそうです。
- (a) Graph Indexes
- (b) Vertex-centric Indexes
そして、(a) では以下のようなインデックスを使えます。
名称 | 概要 | 生成メソッド名 |
---|---|---|
Composite Index | 外部のインデックスエンジンを使わずに高速 | buildCompositeIndex |
Mixed Index | 外部のインデックスエンジン(Elasticsearch 等)を使って数値の範囲検索や全文検索などが可能 | buildMixedIndex |
今回は oid プロパティに対して (a) の Composite Index を作ってみます。
(a) Graph Indexes は JanusGraphManagement
の buildIndex
メソッドで作成します。
その戻り値 IndexBuilder
の buildCompositeIndex
メソッドで Composite Index となります。 (ユニークインデックスとする場合は unique
メソッドを呼び出します)
ここで、インデックスを作成しただけでは、既存データのインデックス化は行われないようなので、インデックス作成後に既存データのインデックス化も行うようにしています。
create_index.groovy
@Grapes([ @Grab('org.janusgraph:janusgraph-cassandra:0.1.1'), @Grab('org.slf4j:slf4j-simple:1.7.25'), @GrabExclude('xml-apis#xml-apis;1.3.04'), @GrabExclude('com.github.jeremyh#jBCrypt;jbcrypt-0.4'), @GrabExclude('org.slf4j#slf4j-log4j12'), @GrabExclude('ch.qos.logback#logback-classic'), @GrabExclude('org.codehaus.groovy:groovy-swing'), @GrabExclude('org.codehaus.groovy:groovy-xml'), @GrabExclude('org.codehaus.groovy:groovy-jsr223') ]) import org.janusgraph.core.JanusGraphFactory import org.janusgraph.core.schema.SchemaAction import org.apache.tinkerpop.gremlin.structure.Vertex def conf = args[0] def graph = JanusGraphFactory.open(conf) graph.withAutoCloseable { g -> // JanusGraphManagement の取得 def manage = g.openManagement() def oidKey = manage.getPropertyKey('oid') // インデックスが未作成の場合にインデックスを作成 if (manage.getGraphIndex('oidIndex') == null) { // oid を対象とした Composite Index の作成 manage.buildIndex('oidIndex', Vertex) .addKey(oidKey) .buildCompositeIndex() manage.commit() // インデックス作成の完了待ち manage.awaitGraphIndexStatus(g, 'oidIndex').call() } manage = g.openManagement() // 既存データのインデックス化 manage.updateIndex(manage.getGraphIndex('oidIndex'), SchemaAction.REINDEX).get() manage.commit() }
インデックス作成結果
> groovy create_index.groovy setting.properties ・・・ [pool-19-thread-1] INFO org.janusgraph.graphdb.database.management.ManagementSystem$UpdateStatusTrigger - Set status REGISTERED on schema element oidIndex with property keys [] [pool-19-thread-1] INFO org.janusgraph.graphdb.database.management.ManagementLogger - Received all acknowledgements for eviction [1] ・・・ [Thread-3] INFO com.netflix.astyanax.thrift.ThriftKeyspaceImpl - Detected partitioner org.apache.cassandra.dht.Murmur3Partitioner for keyspace janusgraph [Thread-7] INFO org.janusgraph.graphdb.olap.job.IndexRepairJob - Found index oidIndex [Thread-3] INFO org.janusgraph.graphdb.database.management.ManagementSystem - Index update job successful for [oidIndex]
これで警告ログは出力されなくなりました。
経路探索の実行結果
> groovy find-data.groovy setting.properties user2 service2.post ・・・ User[4304]{vp[oid->user2], vp[name->user2]} -> PART_OF[4d6-3bk-4r9-3ao]{} -> Group[4272]{vp[oid->group1], vp[name->group1]} -> PERMIT[6c6-3ao-9hx-37c]{} -> Operation[4152]{vp[oid->service2.post], vp[name->post]}
TinkerPop でグラフ操作 - Groovy
前回、Neo4j の Cypher を使って実施したグラフ操作を Apache TinkerPop を使って Groovy (@Grab
を使用)で実装してみました。
- Groovy 2.5.0 beta1
- Apache TinkerPop 3.2
Apache TinkerPop はグラフ処理のためのフレームワークで、Neo4j 等の様々なグラフ DB ※ に対して共通のインターフェースを提供します。
※ グラフ DB だけではなく、 Cassandra、HBase、DynamoDB 等をサポートする ライブラリも提供されています
ソースは http://github.com/fits/try_samples/tree/master/blog/20170718/
a. 設定ファイル
TinkerPop にはインメモリーの TinkerGraph が用意されていますが、前回と同様に Neo4j を使う事にします。
ただ、前回と違って Neo4j をサーバー起動せずに組み込み実行します。
TinkerPop には Graph
オブジェクトを汎用的に生成する手段として org.apache.tinkerpop.gremlin.structure.util.GraphFactory
が用意されています。
GraphFactory.open(<設定ファイル>)
を使えば、依存ライブラリと設定ファイルを差し替えて DB を切り替える事もできそうなので、今回はこの方法を使います。
Neo4j を組み込み利用する場合、gremlin.graph
に org.apache.tinkerpop.gremlin.neo4j.structure.Neo4jGraph
を設定して gremlin.neo4j.directory
に DB ファイルを出力するディレクトリを指定します。 (Neo4j をサーバー起動した場合の data/databases/graph.db)
setting.properties
gremlin.graph=org.apache.tinkerpop.gremlin.neo4j.structure.Neo4jGraph gremlin.neo4j.directory=neo4jdb
b. グラフデータ作成
前回 と同様のデータを作成する処理を実装してみます。
Groovy で実行する際の注意点として、neo4j-tinkerpop-api-impl 等は依存ライブラリとして Groovy 2.4.11 のライブラリを含んでおり、このバージョン以外の groovy コマンドで実行すると org.codehaus.groovy.control.MultipleCompilationErrorsException
が発生してしまいます。
そこで今回は、Groovy 2.5.0 beta1 で実行できるように @GrabExclude
を使って groovy-xml
等を除くようにしています。
グラフの操作は GraphFactory.open()
で取得した Graph
オブジェクトに対して実施します。
ノードの追加は addVertex
、エッジの追加は addEdge
メソッドで行う事ができ、property
メソッドで任意の属性を設定できます。
トランザクションは tx
メソッドで開始します。※
※ TinkerGraph のように tx メソッドをサポートしていないものもありますので (その場合に tx() を呼び出すとエラーになる) 実際は tx のサポート有無をチェックしてから 呼び出すようにした方が安全だと思います (例) if (g.features().graph().supportsTransactions()) { g.tx().withAutoCloseable { t -> ・・・ } }
add-data.groovy
@Grapes([ @Grab('org.apache.tinkerpop:neo4j-gremlin:3.2.5'), @Grab('org.neo4j:neo4j-tinkerpop-api-impl:0.6-3.2.2'), @Grab('org.slf4j:slf4j-nop:1.7.25'), @GrabExclude('org.codehaus.groovy:groovy-xml'), @GrabExclude('org.codehaus.groovy:groovy-swing'), @GrabExclude('org.codehaus.groovy:groovy-jsr223') ]) import org.apache.tinkerpop.gremlin.structure.util.GraphFactory def conf = args[0] def addNode = { g, type, id, name = id -> def res = g.addVertex(type) res.property('oid', id) res.property('name', name) res } def createData = { g -> def p = addNode(g, 'Principals', 'principals') def u1 = addNode(g, 'User', 'user1') def u2 = addNode(g, 'User', 'user2') def ad = addNode(g, 'User', 'admin') def g1 = addNode(g, 'Group', 'group1') [u1, u2, ad, g1].each { it.addEdge('PART_OF', p) } u2.addEdge('PART_OF', g1) def r = addNode(g, 'Resources', 'resources') def s1 = addNode(g, 'Service', 'service1') def s2 = addNode(g, 'Service', 'service2') def s2o1 = addNode(g, 'Operation', 'service2.get', 'get') def s2o2 = addNode(g, 'Operation', 'service2.post', 'post') [s2o1, s2o2].each { s2.addEdge('METHOD', it) } [s1, s2].each { r.addEdge('RESOURCE', it) } u1.addEdge('PERMIT', s1) g1.addEdge('PERMIT', s2o2) ad.addEdge('PERMIT', r) } GraphFactory.open(conf).withAutoCloseable { g -> g.tx().withAutoCloseable { tx -> createData(g) tx.commit() } }
withAutoCloseable
は AutoCloseable
のリソースをクローズするための Groovy の機能です(TinkerPop の API ではありません)
実行結果
> groovy add-data.groovy setting.properties
c. 経路の探索
前回 の経路探索の処理を TinkerPop の API で実装してみます。
traversal().V()
でノードを対象とした GraphTraversal<Vertex,Vertex>
を取得でき、has(<プロパティ名>, <プロパティ値>)
等を使ってノードの条件を指定できます。
(c-1) 複数エッジ(条件なし)
まず、エッジは気にせずに指定ノードから指定ノードまでのパス(経路)を取得する処理を実装してみます。
終点のノードまで複数のノードで繋がっている場合は repeat(・・・).until(<終点ノードの条件>)
で探せます。
エッジの条件を指定しない場合は repeat(__.out()).until(・・・)
で取得できます。(__
はクラス名です)
パスを取得するには path()
を使います。
Path
からは objects()
でパスに含まれるノード(やエッジ)を List<Object>
で取得できます。
なお、以下の処理でトランザクションは必要ありませんが、一応トランザクションを使っています。 (JanusGraph をマルチスレッドで使うケースではトランザクションが必要になったので)
find-data-simple.groovy
@Grapes([ @Grab('org.apache.tinkerpop:neo4j-gremlin:3.2.5'), @Grab('org.neo4j:neo4j-tinkerpop-api-impl:0.6-3.2.2'), @Grab('org.slf4j:slf4j-nop:1.7.25'), @GrabExclude('org.codehaus.groovy:groovy-xml'), @GrabExclude('org.codehaus.groovy:groovy-swing'), @GrabExclude('org.codehaus.groovy:groovy-jsr223') ]) import org.apache.tinkerpop.gremlin.structure.util.GraphFactory import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__ def conf = args[0] def start = args[1] def end = args[2] def toStr = { "${it.label()}[${it.id()}]{${it.properties().join(', ')}}" } GraphFactory.open(conf).withAutoCloseable { g -> g.tx().withAutoCloseable { def p = g.traversal().V() .has('oid', start) // 始点ノードの条件 .repeat(__.out()) .until(__.has('oid', end)) // 終点ノードの条件 .path() p.each { println it.objects().collect(toStr).join(' -> ') } } }
実行結果
user2 から service2.post へのパスを取得してみます。
> groovy find-data-simple.groovy setting.properties user2 service2.post User[2]{vp[oid->user2], vp[name->user2]} -> Group[4]{vp[oid->group1], vp[name->group1]} -> Operation[9]{vp[oid->service2.post], vp[name->post]}
__.out()
を使った事で、パスにはノードの情報だけが含まれエッジの情報を含んでいません。
(c-2) 複数エッジ(条件あり)
エッジの条件を指定するには repeat
でエッジも指定します。
A ノードと B ノードが C エッジで繋がっている (A)-[C]->(B)
のような状態で、A の __.outE()
が C エッジで、C エッジの inV()
が B ノードとなります。
そのため、repeat(__.outE().as('e').inV())
とすれば、複数の外向きエッジで繋がっているノードを検索する事ができます。
ここで as(<ステップラベル名>)
を使って、該当するエッジにラベル名を付けておき、where
での条件判定(PERMIT エッジを含むかどうか)に使います。
__.select('e')
の結果は GraphTraversal<Vertex, ArrayList<Edge>>
のようになるので __.select('e').hasLabel(・・・)
とはできません。 (ClassCastException になります)
unfold()
で GraphTraversal<Vertex, ArrayListGraphTraversal<Vertex, Edge>
にして hasLabel
を使えば、該当のラベルを持つエッジを含んでいるかどうかを条件判定できます。
find-data.groovy
・・・ GraphFactory.open(conf).withAutoCloseable { g -> g.tx().withAutoCloseable { def p = g.traversal().V() .has('oid', start) // 始点ノードの条件 .repeat(__.outE().as('e').inV()) .until(__.has('oid', end)) // 終点ノードの条件 .where(__.select('e').unfold().hasLabel('PERMIT')) // PERMIT エッジを含んでいるかどうかの判定 .path() p.each { println it.objects().collect(toStr).join(' -> ') } } }
実行結果
> groovy find-data.groovy setting.properties user2 service2.post User[2]{vp[oid->user2], vp[name->user2]} -> PART_OF[4]{} -> Group[4]{vp[oid->group1], vp[name->group1]} -> PERMIT[10]{} -> Operation[9]{vp[oid->service2.post], vp[name->post]}
__.outE()
を使った事でエッジの情報もパスに含まれるようになりました。
Groovy で Kafka を組み込み実行
Groovy で Apache Kafka を組み込み実行してみました。
今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170410/
Kafka 組み込み実行
Kafka の実行には ZooKeeper が必要なため、ZooKeeper と Kafka の両方を組み込み実行します。
ZooKeeperServerMain を実行 (initializeAndRun
) すると処理をブロックしてしまい、後続の処理を実行できないので、今回は別スレッドで実行するようにしました。
initializeAndRun メソッドは以下のように配列の要素数によって引数の解釈が異なるようです。
引数の数 | 引数の内容 |
---|---|
1つ | 設定ファイルのパス |
2つ以上 | ポート番号, データディレクトリ, tickTime, maxClientCnxns |
kafka_embed.groovy
@Grab('org.apache.kafka:kafka_2.12:0.10.2.0') @Grab('org.apache.zookeeper:zookeeper:3.5.2-alpha') import kafka.server.KafkaServerStartable import org.apache.zookeeper.server.ZooKeeperServerMain def zkPort = '2181' def zkDir = 'zk-tmp' def kafkaDir = 'kafka-logs' def zk = new ZooKeeperServerMain() Thread.start { // ZooKeeper の実行 zk.initializeAndRun([zkPort, zkDir] as String[]) } def kafkaProps = new Properties() kafkaProps.setProperty('zookeeper.connect', "localhost:${zkPort}") kafkaProps.setProperty('log.dir', kafkaDir) def kafka = KafkaServerStartable.fromProps(kafkaProps) // Kafka の実行 kafka.startup() println 'startup ...' System.in.read() kafka.shutdown() zk.shutdown()
Groovy のデフォルト設定ではメモリ不足で起動に失敗したため、JAVA_OPTS
環境変数で最大メモリサイズを変更して実行します。
実行
> set JAVA_OPTS=-Xmx512m > groovy kafka_embed.groovy startup ・・・
備考
ZooKeeper を組み込み実行するには、Apache Curator の org.apache.curator.test.TestingServer
を使う方法もあります。
TestingServer の close 時に ZooKeeper のデータディレクトリを削除しないようにするには InstanceSpec
を使います。 (コンストラクタの第5引数を false にすると削除しなくなる)
kafka_embed_curator.groovy
@Grab('org.apache.kafka:kafka_2.12:0.10.2.0') @Grapes([ @Grab('org.apache.curator:curator-test:3.3.0'), @GrabExclude('io.netty#netty:3.7.0.Final') ]) import kafka.server.KafkaServerStartable import org.apache.curator.test.TestingServer import org.apache.curator.test.InstanceSpec def zkPort = 2181 def zkDir = 'zk-tmp' def kafkaDir = 'kafka-logs' // close 時にデータディレクトリを残すように false を指定 def spec = new InstanceSpec(new File(zkDir), zkPort, -1, -1, false, -1) def props = new Properties() props.setProperty('zookeeper.connect', "localhost:${zkPort}") props.setProperty('log.dir', kafkaDir) // 第2引数を true にするとコンストラクタ内で start メソッドを実行する new TestingServer(spec, false).withCloseable { zk -> zk.start() def kafka = KafkaServerStartable.fromProps(props) kafka.startup() println 'startup ...' System.in.read() kafka.shutdown() zk.stop() }
Kafka クライアント
ついでに、Kafka の各種クライアント処理も Groovy で実装してみます。
a. KafkaProducer でメッセージ送信
まずは KafkaProducer を使った Kafka へのメッセージ送信処理です。
メッセージはトピックへ送信する事になり、トピックが未作成の場合は自動的に作成されます。(kafka.admin.TopicCommand
等でトピックを事前に作成しておく事も可能)
bootstrap.servers
で接続先の Kafka を指定します。
kafka_client_producer.groovy
@Grab('org.apache.kafka:kafka-clients:0.10.2.0') @Grab('org.slf4j:slf4j-simple:1.7.24') import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.clients.producer.ProducerRecord def topic = args[0] def key = args[1] def value = args[2] def props = new Properties() props.setProperty('bootstrap.servers', 'localhost:9092') props.setProperty('key.serializer', 'org.apache.kafka.common.serialization.StringSerializer') props.setProperty('value.serializer', 'org.apache.kafka.common.serialization.StringSerializer') new KafkaProducer(props).withCloseable { producer -> def res = producer.send(new ProducerRecord(topic, key, value)) println "***** result: ${res.get()}" }
KafkaProducer 実行例
> groovy kafka_client_producer.groovy sample1 a 123 ・・・ ***** result: sample1-0@0 ・・・ > groovy kafka_client_producer.groovy sample1 b 456 ・・・ ***** result: sample1-0@1 ・・・
b. KafkaConsumer でメッセージ受信
次に KafkaConsumer でメッセージを受信してみます。
トピックを subscribe
する事でメッセージを受信します。
デフォルトでは subscribe 後に送信されたメッセージを受信する事になります。
送信済みのメッセージも受信するには auto.offset.reset
へ earliest
を設定します。
Kafka では group.id
で指定したグループ ID 毎にメッセージの Offset (どのメッセージまでを受信したか) が管理されます。
複数のクライアントが同一グループ ID に属している場合は、その中の 1つがメッセージを受信する事になるようです。
kafka_client_consumer.groovy
@Grab('org.apache.kafka:kafka-clients:0.10.2.0') @Grab('org.slf4j:slf4j-simple:1.7.24') import org.apache.kafka.clients.consumer.KafkaConsumer import java.util.concurrent.CountDownLatch import java.util.concurrent.Executors def topic = args[0] def group = args[1] def props = new Properties() props.setProperty('bootstrap.servers', 'localhost:9092') props.setProperty('group.id', group) props.setProperty('key.deserializer', 'org.apache.kafka.common.serialization.StringDeserializer') props.setProperty('value.deserializer', 'org.apache.kafka.common.serialization.StringDeserializer') props.setProperty('auto.offset.reset', 'earliest') def stopLatch = new CountDownLatch(1) def es = Executors.newSingleThreadExecutor() es.submit { new KafkaConsumer(props).withCloseable { consumer -> consumer.subscribe([topic]) while(stopLatch.count > 0) { def records = consumer.poll(1000) records.each { println "***** result: ${it}" } } } } System.in.read() stopLatch.countDown() es.shutdown()
KafkaConsumer 実行例
> groovy kafka_client_consumer.groovy sample1 g1 ・・・ ***** result: ConsumerRecord(topic = sample1, partition = 0, offset = 0, CreateTime = 1491758385860, checksum = 1240240547, serialized key size = 1, serialized value size = 3, key = a, value = 123) ***** result: ConsumerRecord(topic = sample1, partition = 0, offset = 1, CreateTime = 1491758400116, checksum = 728766236, serialized key size = 1, serialized value size = 3, key = b, value = 456)
c. KafkaStreams でメッセージ受信
KafkaStreams を使ってメッセージを受信します。
グループ ID は application.id
で指定するようです。
kafka_client_stream.groovy
@Grab('org.apache.kafka:kafka-streams:0.10.2.0') @Grab('org.slf4j:slf4j-simple:1.7.24') import org.apache.kafka.streams.KafkaStreams import org.apache.kafka.streams.kstream.KStreamBuilder import org.apache.kafka.common.serialization.Serdes def topic = args[0] def group = args[1] def props = new Properties() props.put('application.id', group) props.put('bootstrap.servers', 'localhost:9092') props.put('key.serde', Serdes.String().class) props.put('value.serde', Serdes.String().class) def builder = new KStreamBuilder() builder.stream(topic).print() def streams = new KafkaStreams(builder, props) streams.start() System.in.read() streams.close()
動作確認のため、未使用のグループ ID を使って実行します。
KafkaStreams 実行例
> groovy kafka_client_stream.groovy sample1 g2 ・・・ [KSTREAM-SOURCE-0000000000]: a , 123 [KSTREAM-SOURCE-0000000000]: b , 456
d. KafkaConsumerGroupService で Offset 確認
KafkaConsumerGroupService を使ってグループ ID 毎の Offset を確認してみます。
kafka_group_offset.groovy
@Grab('org.apache.kafka:kafka_2.12:0.10.2.0') import kafka.admin.ConsumerGroupCommand.ConsumerGroupCommandOptions import kafka.admin.ConsumerGroupCommand.KafkaConsumerGroupService def group = args[0] def params = ['--bootstrap-server', 'localhost:9092', '--group', group] as String[] def opts = new ConsumerGroupCommandOptions(params) def svc = new KafkaConsumerGroupService(opts) def res = svc.describeGroup() res._2.foreach { it.foreach { st -> println "topic = ${st.topic.value}, offset = ${st.offset.value}, partition = ${st.partition.value}" } } svc.close()
KafkaConsumerGroupService 実行例
> groovy kafka_group_offset.groovy g1 topic = sample1, offset = 2, partition = 0
Groovy で Apache Flink を使用
「Groovy で Apache Spark を使用」と同様の処理を Apache Flink で試してみました。
今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170311/
サンプルスクリプト
今回はローカルで実行するだけなので ExecutionEnvironment.createLocalEnvironment()
で取得した LocalEnvironment
を使用します。
map メソッドの引数へ Groovy のクロージャを使ったところ、org.apache.flink.api.common.InvalidProgramException: The implementation of the MapFunction is not serializable. ・・・
となってしまい駄目でしたので、MapFunction
の実装クラスを定義しました。
その場合、MapFunction の型引数をきちんと指定する必要があります。(そうしないと InvalidTypesException
が発生)
なお、flink-clients_2.10 を使用する場合、scala-library の @Grab 定義は不要でした。(flink-clients_2.11 の場合のみ scala-library が必要)
money_count.groovy
@Grapes([ @Grab('org.apache.flink:flink-java:1.2.0'), @GrabExclude('io.netty#netty;3.7.0.Final') ]) @Grab('org.apache.flink:flink-clients_2.11:1.2.0') @Grab('org.scala-lang:scala-library:2.11.8') @Grab('org.jboss.netty:netty:3.2.10.Final') import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.java.ExecutionEnvironment import org.apache.flink.api.java.tuple.Tuple2 import groovy.transform.CompileStatic // @CompileStatic は必須ではない(無くても動作する) @CompileStatic class ToTuple implements MapFunction<String, Tuple2<String, Integer>> { Tuple2 map(String v) { new Tuple2(v, 1) } } def env = ExecutionEnvironment.createLocalEnvironment() env.readTextFile(args[0]).map(new ToTuple()).groupBy(0).sum(1).print()
groupBy
メソッドではグルーピング対象とする項目を、sum
メソッドでは合計する項目を数値で指定します。
実行
Groovy のデフォルト設定では java.lang.IllegalArgumentException: Size of total memory must be positive.
が発生しましたので、JAVA_OPTS
環境変数で最大メモリサイズ (-Xmx) を変更して実行します。
実行結果
> set JAVA_OPTS=-Xmx512m > groovy money_count.groovy input_sample.txt Connected to JobManager at Actor[akka://flink/user/jobmanager_1#1033779003] 03/08/2017 00:56:11 Job execution switched to status RUNNING. ・・・ (10000,2) (10,2) (100,2) (50,1) (500,1) (1,2) (1000,3) (2000,1) (5,3)
input_sample.txt の内容は以下の通りです。
input_sample.txt
100 1 5 50 500 1000 10000 1000 1 10 5 5 10 100 1000 10000 2000