MySQL Binary Log connector でバイナリログをイベント処理

MySQL Binary Log connector (mysql-binlog-connector-java) を使うと、Java プログラムで MySQL / MariaDB のバイナリログをイベント処理できます。

そのため、MySQL の CDC(Change Data Capture)として使えるかもしれません。

ソースは http://github.com/fits/try_samples/tree/master/blog/20171030/

Groovy で実装

MySQL へ接続してバイナリログの内容を取得するには BinaryLogClient を使います。

registerEventListener メソッドで EventListener 実装オブジェクトを登録しておくと、バイナリログの内容をデシリアライズした Event オブジェクトを引数として onEvent メソッドを呼び出してくれます。

BinaryLogClient のソース(listenForEventPackets() メソッドなど)を見てみると、バイナリログを順番にデシリアライズして(特定のイベントタイプをスキップするような処理は無さそう)、登録している EventListener の onEvent を順次呼び出しているだけのようなので、場合によっては処理性能に注意が必要かもしれません。

binlog_sample.groovy
@Grab('com.github.shyiko:mysql-binlog-connector-java:0.13.0')
import com.github.shyiko.mysql.binlog.BinaryLogClient

def host = args[0]
def port = args[1] as int
def user = args[2]
def pass = args[3]

def client = new BinaryLogClient(host, port, user, pass)

client.registerEventListener { ev ->
    // バイナリログの内容を処理
    println ev
}

client.connect()

動作確認

動作確認は Docker で行ってみます。

準備

BinaryLogClient で接続するために MySQL 側でレプリケーション用の設定を行います。

まずは、レプリケーションの設定ファイルを用意します。

/home/vagrant/mysql/conf/repl.cnf (レプリケーションの設定)
[mysqld]
log-bin=mysql-bin
server-id=1

次に、レプリケーション用の接続ユーザーを追加するための SQL ファイルも用意しておきます。

/home/vagrant/mysql/init/repl-user.sqlレプリケーション用のユーザー)
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO repl@'%' IDENTIFIED BY 'pass';

今回はコンテナ間の接続(DB への接続)に Docker のユーザー定義ネットワークを使います。

そのため、まずはブリッジネットワーク(sample1)を作成しておきます。

Docker ユーザー定義ネットワークの作成
$ docker network create --driver bridge sample1

/home/vagrant/groovy/binlog_sample.groovy ファイルを用意した後、sample1 のネットワークへ参加するように Groovy のコンテナを実行します。

Groovy コンテナ実行
$ docker run --rm -it --net=sample1 -v /home/vagrant/groovy:/work groovy bash

・・・
groovy@・・・:~$ cd /work

a. MySQL 5.7 の場合

それでは、MySQL のコンテナを実行して動作確認を行います。

MySQL の Docker 公式イメージでは、/etc/mysql/conf.d 内の設定ファイルを適用し、/docker-entrypoint-initdb.d 内の SQL ファイルを実行するようになっています。

今回はこれを使って、先ほど用意しておいたレプリケーションの設定ファイルとユーザー作成 SQL を適用するように実行します。

a-1. MySQL コンテナ実行
$ docker run --name mysql1 --net=sample1 -e MYSQL_ROOT_PASSWORD=secret -d -v /home/vagrant/mysql/conf:/etc/mysql/conf.d -v /home/vagrant/mysql/init:/docker-entrypoint-initdb.d mysql

事前に実行しておいた Groovy コンテナ上で binlog_sample.groovy を実行します。 ユーザー名とパスワードはレプリケーション用のものを使用します。

a-2. binlog_sample.groovy 実行(Groovy コンテナ)
groovy@・・・:/work$ groovy binlog_sample.groovy mysql1 3306 repl pass

Oct 22, 2017 4:46:02 PM com.github.shyiko.mysql.binlog.BinaryLogClient connect
INFO: Connected to mysql1:3306 at mysql-bin.000003/154 (sid:65535, cid:3)
Event{header=EventHeaderV4{timestamp=0, eventType=ROTATE, serverId=1, headerLength=19, dataLength=28, nextPosition=0, flags=32}, data=RotateEventData{binlogFilename='mysql-bin.000003', binlogPosition=154}}
・・・

この状態で以下の SQL を実行してみます。

CREATE DATABASE db1;
USE db1;

CREATE TABLE tbl1 (id int NOT NULL PRIMARY KEY, name varchar(10) NOT NULL);

INSERT INTO tbl1 VALUES (1, 'a');
UPDATE tbl1 SET name = 'aa' WHERE id = 1;
DELETE FROM tbl1 WHERE id = 1;

CREATE TABLE tbl2 (id int NOT NULL PRIMARY KEY, name varchar(10) NOT NULL);

START TRANSACTION;

INSERT INTO tbl1 VALUES (1, 'a');
INSERT INTO tbl2 VALUES (2, 'b');

COMMIT;

上記 SQL 実行後の出力結果です。

a-3. binlog_sample.groovy 出力結果
・・・
Event{header=EventHeaderV4{timestamp=1508690815000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=219, flags=0}, data=null}
Event{header=EventHeaderV4{timestamp=1508690815000, eventType=QUERY, serverId=1, headerLength=19, dataLength=72, nextPosition=310, flags=8}, data=QueryEventData{threadId=4, executionTime=0, errorCode=0, database='db1', sql='CREATE DATABASE db1'}}
Event{header=EventHeaderV4{timestamp=1508690815000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=375, flags=0}, data=null}
Event{header=EventHeaderV4{timestamp=1508690815000, eventType=QUERY, serverId=1, headerLength=19, dataLength=127, nextPosition=521, flags=0}, data=QueryEventData{threadId=4, executionTime=0, errorCode=0, database='db1', sql='CREATE TABLE tbl1 (id int NOT NULL PRIMARY KEY, name varchar(10) NOT NULL)'}}
Event{header=EventHeaderV4{timestamp=1508690815000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=586, flags=0}, data=null}
Event{header=EventHeaderV4{timestamp=1508690815000, eventType=QUERY, serverId=1, headerLength=19, dataLength=52, nextPosition=657, flags=8}, data=QueryEventData{threadId=4, executionTime=0, errorCode=0, database='db1', sql='BEGIN'}}
Event{header=EventHeaderV4{timestamp=1508690815000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=30, nextPosition=706, flags=0}, data=TableMapEventData{tableId=219, database='db1', table='tbl1', columnTypes=3, 15, columnMetadata=0, 10, columnNullability={}}}
Event{header=EventHeaderV4{timestamp=1508690815000, eventType=EXT_WRITE_ROWS, serverId=1, headerLength=19, dataLength=23, nextPosition=748, flags=0}, data=WriteRowsEventData{tableId=219, includedColumns={0, 1}, rows=[
    [1, a]
]}}
Event{header=EventHeaderV4{timestamp=1508690815000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=779, flags=0}, data=XidEventData{xid=14}}
Event{header=EventHeaderV4{timestamp=1508690815000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=844, flags=0}, data=null}
Event{header=EventHeaderV4{timestamp=1508690815000, eventType=QUERY, serverId=1, headerLength=19, dataLength=52, nextPosition=915, flags=8}, data=QueryEventData{threadId=4, executionTime=0, errorCode=0, database='db1', sql='BEGIN'}}
Event{header=EventHeaderV4{timestamp=1508690815000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=30, nextPosition=964, flags=0}, data=TableMapEventData{tableId=219, database='db1', table='tbl1', columnTypes=3, 15, columnMetadata=0, 10, columnNullability={}}}
Event{header=EventHeaderV4{timestamp=1508690815000, eventType=EXT_UPDATE_ROWS, serverId=1, headerLength=19, dataLength=32, nextPosition=1015, flags=0}, data=UpdateRowsEventData{tableId=219, includedColumnsBeforeUpdate={0, 1}, includedColumns={0, 1}, rows=[
    {before=[1, a], after=[1, aa]}
]}}
Event{header=EventHeaderV4{timestamp=1508690815000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=1046, flags=0}, data=XidEventData{xid=15}}
Event{header=EventHeaderV4{timestamp=1508690815000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=1111, flags=0}, data=null}
Event{header=EventHeaderV4{timestamp=1508690815000, eventType=QUERY, serverId=1, headerLength=19, dataLength=52, nextPosition=1182, flags=8}, data=QueryEventData{threadId=4, executionTime=0, errorCode=0, database='db1', sql='BEGIN'}}
Event{header=EventHeaderV4{timestamp=1508690815000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=30, nextPosition=1231, flags=0}, data=TableMapEventData{tableId=219, database='db1', table='tbl1', columnTypes=3, 15, columnMetadata=0, 10, columnNullability={}}}
Event{header=EventHeaderV4{timestamp=1508690815000, eventType=EXT_DELETE_ROWS, serverId=1, headerLength=19, dataLength=24, nextPosition=1274, flags=0}, data=DeleteRowsEventData{tableId=219, includedColumns={0, 1}, rows=[
    [1, aa]
]}}
Event{header=EventHeaderV4{timestamp=1508690815000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=1305, flags=0}, data=XidEventData{xid=16}}
Event{header=EventHeaderV4{timestamp=1508690815000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=1370, flags=0}, data=null}
Event{header=EventHeaderV4{timestamp=1508690815000, eventType=QUERY, serverId=1, headerLength=19, dataLength=127, nextPosition=1516, flags=0}, data=QueryEventData{threadId=4, executionTime=1, errorCode=0, database='db1', sql='CREATE TABLE tbl2 (id int NOT NULL PRIMARY KEY, name varchar(10) NOT NULL)'}}
Event{header=EventHeaderV4{timestamp=1508690819000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=1581, flags=0}, data=null}
Event{header=EventHeaderV4{timestamp=1508690816000, eventType=QUERY, serverId=1, headerLength=19, dataLength=52, nextPosition=1652, flags=8}, data=QueryEventData{threadId=4, executionTime=0, errorCode=0, database='db1', sql='BEGIN'}}
Event{header=EventHeaderV4{timestamp=1508690816000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=30, nextPosition=1701, flags=0}, data=TableMapEventData{tableId=219, database='db1', table='tbl1', columnTypes=3, 15, columnMetadata=0, 10, columnNullability={}}}
Event{header=EventHeaderV4{timestamp=1508690816000, eventType=EXT_WRITE_ROWS, serverId=1, headerLength=19, dataLength=23, nextPosition=1743, flags=0}, data=WriteRowsEventData{tableId=219, includedColumns={0, 1}, rows=[
    [1, a]
]}}
Event{header=EventHeaderV4{timestamp=1508690816000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=30, nextPosition=1792, flags=0}, data=TableMapEventData{tableId=220, database='db1', table='tbl2', columnTypes=3, 15, columnMetadata=0, 10, columnNullability={}}}
Event{header=EventHeaderV4{timestamp=1508690816000, eventType=EXT_WRITE_ROWS, serverId=1, headerLength=19, dataLength=23, nextPosition=1834, flags=0}, data=WriteRowsEventData{tableId=220, includedColumns={0, 1}, rows=[
    [2, b]
]}}
Event{header=EventHeaderV4{timestamp=1508690819000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=1865, flags=0}, data=XidEventData{xid=19}}

eventType を簡単にまとめると以下のようになりました。

SQL eventType
CREATE QUERY
INSERT EXT_WRITE_ROWS
UPDATE EXT_UPDATE_ROWS
DELETE EXT_DELETE_ROWS

注意点として、EXT_XXX_ROWS の Event 内容にテーブル名は含まれておらず tableId で判断する必要がありそうです。 tableId とテーブル名のマッピングは直前の TABLE_MAP で実施されています。

また、バイナリログのフォーマット(以下)は RBR(行ベースレプリケーション) となっていました。

mysql> show global variables like 'binlog_format';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW   |
+---------------+-------+

b. MariaDB 10.2 の場合

ついでに、MariaDB でも試してみます。

設定は MySQL と同じものが使えるので、ここでは Docker イメージ名を mariadb に変えて実行するだけです。(以下ではコンテナ名も変えています)

b-1. MariaDB コンテナ実行
$ docker run --name mariadb1 --net=sample1 -e MYSQL_ROOT_PASSWORD=secret -d -v /home/vagrant/mysql/conf:/etc/mysql/conf.d -v /home/vagrant/mysql/init:/docker-entrypoint-initdb.d mariadb

接続先を mariadb1 (MariaDB) へ変えてスクリプトを実行します。

b-2. binlog_sample.groovy 実行(Groovy コンテナ)
groovy@・・・:/work$ groovy binlog_sample.groovy mariadb1 3306 repl pass

・・・

MySQL と同じ SQL を実行した後の出力結果です。

b-3. binlog_sample.groovy 出力結果
・・・
Event{header=EventHeaderV4{timestamp=1508691100000, eventType=QUERY, serverId=1, headerLength=19, dataLength=23, nextPosition=384, flags=8}, data=QueryEventData{threadId=0, executionTime=0, errorCode=0, database='', sql='# Dum'}}
Event{header=EventHeaderV4{timestamp=1508691100000, eventType=QUERY, serverId=1, headerLength=19, dataLength=66, nextPosition=469, flags=8}, data=QueryEventData{threadId=10, executionTime=0, errorCode=0, database='db1', sql='CREATE DATABASE db1'}}
Event{header=EventHeaderV4{timestamp=1508691100000, eventType=QUERY, serverId=1, headerLength=19, dataLength=23, nextPosition=511, flags=8}, data=QueryEventData{threadId=0, executionTime=0, errorCode=0, database='', sql='# Dum'}}
Event{header=EventHeaderV4{timestamp=1508691100000, eventType=QUERY, serverId=1, headerLength=19, dataLength=121, nextPosition=651, flags=0}, data=QueryEventData{threadId=10, executionTime=0, errorCode=0, database='db1', sql='CREATE TABLE tbl1 (id int NOT NULL PRIMARY KEY, name varchar(10) NOT NULL)'}}
Event{header=EventHeaderV4{timestamp=1508691100000, eventType=QUERY, serverId=1, headerLength=19, dataLength=23, nextPosition=693, flags=8}, data=QueryEventData{threadId=0, executionTime=0, errorCode=0, database='', sql='BEGIN'}}
Event{header=EventHeaderV4{timestamp=1508691100000, eventType=QUERY, serverId=1, headerLength=19, dataLength=79, nextPosition=791, flags=0}, data=QueryEventData{threadId=10, executionTime=0, errorCode=0, database='db1', sql='INSERT INTO tbl1 VALUES (1, 'a')'}}
Event{header=EventHeaderV4{timestamp=1508691100000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=822, flags=0}, data=XidEventData{xid=12}}
Event{header=EventHeaderV4{timestamp=1508691100000, eventType=QUERY, serverId=1, headerLength=19, dataLength=23, nextPosition=864, flags=8}, data=QueryEventData{threadId=0, executionTime=0, errorCode=0, database='', sql='BEGIN'}}
Event{header=EventHeaderV4{timestamp=1508691100000, eventType=QUERY, serverId=1, headerLength=19, dataLength=87, nextPosition=970, flags=0}, data=QueryEventData{threadId=10, executionTime=0, errorCode=0, database='db1', sql='UPDATE tbl1 SET name = 'aa' WHERE id = 1'}}
Event{header=EventHeaderV4{timestamp=1508691100000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=1001, flags=0}, data=XidEventData{xid=13}}
Event{header=EventHeaderV4{timestamp=1508691100000, eventType=QUERY, serverId=1, headerLength=19, dataLength=23, nextPosition=1043, flags=8}, data=QueryEventData{threadId=0, executionTime=0, errorCode=0, database='', sql='BEGIN'}}
Event{header=EventHeaderV4{timestamp=1508691100000, eventType=QUERY, serverId=1, headerLength=19, dataLength=76, nextPosition=1138, flags=0}, data=QueryEventData{threadId=10, executionTime=0, errorCode=0, database='db1', sql='DELETE FROM tbl1 WHERE id = 1'}}
Event{header=EventHeaderV4{timestamp=1508691100000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=1169, flags=0}, data=XidEventData{xid=14}}
Event{header=EventHeaderV4{timestamp=1508691100000, eventType=QUERY, serverId=1, headerLength=19, dataLength=23, nextPosition=1211, flags=8}, data=QueryEventData{threadId=0, executionTime=0, errorCode=0, database='', sql='# Dum'}}
Event{header=EventHeaderV4{timestamp=1508691100000, eventType=QUERY, serverId=1, headerLength=19, dataLength=121, nextPosition=1351, flags=0}, data=QueryEventData{threadId=10, executionTime=1, errorCode=0, database='db1', sql='CREATE TABLE tbl2 (id int NOT NULL PRIMARY KEY, name varchar(10) NOT NULL)'}}
Event{header=EventHeaderV4{timestamp=1508691101000, eventType=QUERY, serverId=1, headerLength=19, dataLength=23, nextPosition=1393, flags=8}, data=QueryEventData{threadId=0, executionTime=0, errorCode=0, database='', sql='BEGIN'}}
Event{header=EventHeaderV4{timestamp=1508691101000, eventType=QUERY, serverId=1, headerLength=19, dataLength=79, nextPosition=1491, flags=0}, data=QueryEventData{threadId=10, executionTime=0, errorCode=0, database='db1', sql='INSERT INTO tbl1 VALUES (1, 'a')'}}
Event{header=EventHeaderV4{timestamp=1508691101000, eventType=QUERY, serverId=1, headerLength=19, dataLength=79, nextPosition=1589, flags=0}, data=QueryEventData{threadId=10, executionTime=0, errorCode=0, database='db1', sql='INSERT INTO tbl2 VALUES (2, 'b')'}}
Event{header=EventHeaderV4{timestamp=1508691101000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=1620, flags=0}, data=XidEventData{xid=17}}

eventType を簡単にまとめると以下のようになりました。

SQL eventType
CREATE QUERY
INSERT QUERY
UPDATE QUERY
DELETE QUERY

バイナリログのフォーマットは MBRミックスベースレプリケーション)となっていました。

MariaDB [(none)]> show global variables like 'binlog_format';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | MIXED |
+---------------+-------+

Akka でステートマシンを処理

前回 の有限ステートマシン(FSM)の処理を Akka の JavaAPI を使って実装してみます。

ソースは http://github.com/fits/try_samples/tree/master/blog/20171016/

ステートマシンの実装

まずは、以下のステートマシンを実装します。

  • 初期状態は Idle 状態
  • Idle 状態で On イベントが発生すると Active 状態へ遷移
  • Active 状態で Off イベントが発生すると Idle 状態へ遷移
現在の状態 Off On
Idle Active
Active Idle

AbstractFSM<状態の型, データの型> を extends する事でステートマシンを実装します。

状態以外にも状態のデータを指定できるので、Active 状態へ変化した回数をカウントするようにしてみました。

指定の状態で特定のイベントを処理するには when(現在の状態, FSMStateFunctionBuilder) を使い、イベントに対する処理は matchEventEquals() 等を使います。

goTo(遷移先の状態) で状態を遷移させ、その際に using(データ) で状態遷移後のデータを指定できます。

処理対象外のイベントを受け取った場合の処理は whenUnhandled(FSMStateFunctionBuilder) で指定でき、状態遷移の状況確認に onTransition が使えます。

sample.groovy
@Grab('com.typesafe.akka:akka-actor_2.12:2.5.6')
import akka.actor.AbstractFSM
import akka.actor.ActorSystem
import akka.actor.ActorRef
import akka.actor.Props

enum States { Idle, Active }
enum Events { On, Off }

class SampleStateMachine extends AbstractFSM<States, Integer> {
    {
        // 初期状態の設定
        startWith(States.Idle, 0)

        // On イベントで Idle から Active 状態へ遷移
        when(States.Idle, matchEventEquals(Events.On) { event, data ->
            // Active へ遷移
            goTo(States.Active).using(data + 1)
        })

        // Off イベントで Active から Idle 状態へ遷移
        when(States.Active, matchEventEquals(Events.Off) { event, data ->
            // Idle へ遷移
            goTo(States.Idle)
        })

        // 処理されないイベント発生時
        whenUnhandled(
            matchAnyEvent { event, data ->
                println "*** Unhandled event=${event}, data=${data}"
                stay()
            }
        )

        // 状態遷移の発生時
        onTransition { from, to -> 
            println "*** stateChanged: ${from} -> ${to}, data=${stateData()}, next data=${nextStateData()}"
        }
    }
}

def system = ActorSystem.create()

def actor = system.actorOf(Props.create(SampleStateMachine))

actor.tell(Events.On, ActorRef.noSender())
actor.tell(Events.Off, ActorRef.noSender())

actor.tell(Events.Off, ActorRef.noSender())

sleep 2000

system.terminate()

実行結果は以下の通りです。

実行結果
> groovy sample.groovy

*** stateChanged: Idle -> Active, data=0, next data=1
*** stateChanged: Active -> Idle, data=1, next data=1
*** Unhandled event=Off, data=1

タイムアウト付きステートマシンの実装1

次は、タイムアウト時の遷移を追加してみます。

現在の状態 Off On Timeout (2秒)
Idle Active
Active Idle Idle

when で scala.concurrent.duration.FiniteDuration を指定すると状態のタイムアウト ※ を指定できます。

 ※ ただし、このタイムアウトは
    その状態で何もメッセージを受信しなかった時間に対する
    タイムアウトのようです

    状態の遷移が発生しなくても
    メッセージを受信する度にタイムアウトはリセットされます

タイムアウト発生時のイベント(メッセージ)は StateTimeout() の戻り値と等しくなります。

timeout_sample1.groovy
・・・
import scala.concurrent.duration.Duration
import java.util.concurrent.TimeUnit

enum States { Idle, Active }
enum Events { On, Off }

class SampleStateMachine extends AbstractFSM<States, Integer> {
    {
        startWith(States.Idle, 0)

        when(States.Idle, matchEventEquals(Events.On) { event, data ->
            goTo(States.Active).using(data + 1)
        })

        when(States.Active, Duration.create(2, TimeUnit.SECONDS), 
            matchEventEquals(Events.Off) { event, data ->
                goTo(States.Idle)
            }.eventEquals(StateTimeout()) { event, data ->
                // タイムアウト時の処理
                println "*** timeout: event=${event}, data=${data}"

                goTo(States.Idle)
            /*  以下でも可
                self().tell(Events.Off, self())
                stay()
            */
            }
        )
        ・・・
    }
}

def system = ActorSystem.create()

def actor = system.actorOf(Props.create(SampleStateMachine))

actor.tell(Events.On, ActorRef.noSender())
actor.tell(Events.Off, ActorRef.noSender())

actor.tell(Events.Off, ActorRef.noSender())

actor.tell(Events.On, ActorRef.noSender())

sleep 2500

system.terminate()

実行結果は以下の通りです。

実行結果
> groovy timeout_sample1.groovy

*** stateChanged: Idle -> Active, data=0, next data=1
*** stateChanged: Active -> Idle, data=1, next data=1
*** Unhandled event=Off, data=1
*** stateChanged: Idle -> Active, data=1, next data=2
*** timeout: event=StateTimeout, data=2
*** stateChanged: Active -> Idle, data=2, next data=2

when で指定したタイムアウトはイベント(メッセージ)を受信するとリセットされる事を確認するため、以下のようにタイムアウト発生前に invalid-message という文字列を 2回 tell するようにしてみます。

timeout_sample1b.groovy (タイムアウトの検証)
・・・
def system = ActorSystem.create()

def actor = system.actorOf(Props.create(SampleStateMachine))

・・・

actor.tell(Events.On, ActorRef.noSender())

sleep 1500

// 1回目
actor.tell('invalid-message', ActorRef.noSender())

sleep 1500

// 2回目
actor.tell('invalid-message', ActorRef.noSender())

sleep 2500

system.terminate()

実行結果は、以下のように invalid-message という文字列を 2回受信した後にタイムアウトしました。

つまり、メッセージの受信でタイムアウトがリセットされていると考えられます。

実行結果(タイムアウトの検証)
> groovy timeout_sample1b.groovy

・・・
*** stateChanged: Idle -> Active, data=1, next data=2
*** Unhandled event=invalid-message, data=2
*** Unhandled event=invalid-message, data=2
*** timeout: event=StateTimeout, data=2
*** stateChanged: Active -> Idle, data=2, next data=2

タイムアウト付きステートマシンの実装2 (状態のタイムアウト

メッセージの受信有無に左右されないタイムアウトを実現するには、when のタイムアウトを使わずに setTimer() を使う事で実装できそうです。

timeout_sample2.groovy
・・・
class SampleStateMachine extends AbstractFSM<States, Integer> {
    {
        ・・・
        when(States.Active, 
            matchEventEquals(Events.Off) { event, data ->
                goTo(States.Idle)
            }.eventEquals(StateTimeout()) { event, data ->
                println "*** timeout: event=${event}, data=${data}"

                goTo(States.Idle)
            /* 以下でも可
                self().tell(Events.Off, self())
                stay()
            */
            }
        )
        ・・・
        onTransition { from, to -> 
            println "*** stateChanged: ${from} -> ${to}, data=${stateData()}, next data=${nextStateData()}"

            if (to == States.Active) {
                // Active 状態のタイムアウト設定
                setTimer(
                    'active-timeout', 
                    StateTimeout(), 
                    Duration.create(2, TimeUnit.SECONDS)
                )
            }
            else {
                // タイムアウトのキャンセル
                cancelTimer('active-timeout')
            }
        }
    }
}

def system = ActorSystem.create()

def actor = system.actorOf(Props.create(SampleStateMachine))

・・・

actor.tell(Events.On, ActorRef.noSender())

sleep 1500

actor.tell("invalid-message", ActorRef.noSender())

sleep 1500

actor.tell("invalid-message", ActorRef.noSender())

sleep 2500

system.terminate()

実行してみると、2回目の invalid-message を受信する前にタイムアウトが発生しており意図通りの動作となりました。

実行結果
> groovy timeout_sample2.groovy

・・・
*** stateChanged: Idle -> Active, data=1, next data=2
*** Unhandled event=invalid-message, data=2
*** timeout: event=StateTimeout, data=2
*** stateChanged: Active -> Idle, data=2, next data=2
*** Unhandled event=invalid-message, data=2

Akka Streams で MQTT Broker へ接続

ローカルで実行した MQTT Broker(前回 参照)に対して Akka Streams の JavaAPI を使って Groovy で接続してみます。

Akka Streams 用の様々なコネクタを備えた Alpakka に MQTT Broker 用の Source や Sink が用意されているので、今回はこちらを使います。

今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170925/

Publish 処理

まずは publish 処理です。

MqttConnectionSettings から MqttSink を作成し、MqttMessage を渡せば MQTT Broker へメッセージを送信できます。

MqttConnectionSettings は MQTT Broker の接続先と clientId、そして永続化の方法を指定して作成します。

clientId はクライアント毎に一意な値を指定します。 (clientId を null にすると IllegalArgumentException: Null clientId となりました)

MqttQoS ではメッセージ到達の QoS を指定します。 MqttQoS.atLeastOnce() は少なくとも 1回(重複の可能性あり)の到達可能性を指定する事になります。

@Grab('com.typesafe.akka:akka-stream_2.12:2.5.4')
@Grab('com.lightbend.akka:akka-stream-alpakka-mqtt_2.12:0.11')
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.javadsl.Source
import akka.stream.alpakka.mqtt.MqttQoS
import akka.stream.alpakka.mqtt.MqttConnectionSettings
import akka.stream.alpakka.mqtt.MqttMessage
import akka.stream.alpakka.mqtt.javadsl.MqttSink
import akka.util.ByteString

def topic = args[0]
def clientId = args[1]
def message = args[2]

def system = ActorSystem.create()
def mat = ActorMaterializer.create(system)

def settings = MqttConnectionSettings.create(
    'tcp://localhost:1883',
    clientId,
    new org.eclipse.paho.client.mqttv3.persist.MemoryPersistence()
)

def msg = MqttMessage.create(topic, ByteString.fromString(message))

Source.single(msg)
    .runWith(MqttSink.create(settings, MqttQoS.atLeastOnce()), mat)

// メッセージ送信前に terminate しないようスリープで調整
sleep 1000

system.terminate()

Subscribe 処理

次に subscribe 処理です。

MqttConnectionSettings から MqttSourceSettings を作り、MqttSource を作成します。

MqttSourceSettings の withSubscriptions で subscribe するトピック名と QoS を指定します。

mqtt_subscribe.groovy
@Grab('com.typesafe.akka:akka-stream_2.12:2.5.4')
@Grab('com.lightbend.akka:akka-stream-alpakka-mqtt_2.12:0.11')
import akka.actor.ActorSystem
import akka.japi.Pair
import akka.stream.ActorMaterializer
import akka.stream.javadsl.Sink
import akka.stream.alpakka.mqtt.MqttQoS
import akka.stream.alpakka.mqtt.MqttSourceSettings
import akka.stream.alpakka.mqtt.MqttConnectionSettings
import akka.stream.alpakka.mqtt.javadsl.MqttSource

def topic = args[0]
def clientId = args[1]

def system = ActorSystem.create()
def mat = ActorMaterializer.create(system)

def settings = MqttSourceSettings.create(
    MqttConnectionSettings.create(
        'tcp://localhost:1883',
        clientId,
        new org.eclipse.paho.client.mqttv3.persist.MemoryPersistence()
    )
).withSubscriptions(
    // トピックと QoS の指定
    Pair.create(topic, MqttQoS.atLeastOnce())
)

MqttSource.create(settings, 10).runWith(Sink.foreach { println it }, mat)

println "subscribe : ${topic}"

System.in.read()

system.terminate()

上記の処理では、再接続の際にセッションがクリアされてしまうので、通信が切断している間のメッセージを後で受け取るような事はできません。

再接続の際にセッションが復元される Persistent Session を適用するには、以下のように withCleanSession(false) とする必要があります。

なお、Persistent Session を適用するには同じ clientId を使って再接続する必要があります。

mqtt_subscribe2.groovy (Persistent Session 版)
・・・

def settings = MqttSourceSettings.create(
    MqttConnectionSettings.create(
        'tcp://localhost:1883',
        clientId,
        new org.eclipse.paho.client.mqttv3.persist.MemoryPersistence()
    ).withCleanSession(false) // Persistent Session
).withSubscriptions(
    Pair.create(topic, MqttQoS.atLeastOnce())
)

・・・

動作確認

前回 の Moquette 組み込み実行スクリプトで MQTT Broker を起動しておきます。

MQTT Broker 実行
> groovy moquette_run.groovy

・・・
[main] INFO io.moquette.server.netty.NettyAcceptor - Server has been bound. host=0.0.0.0, port=1883
・・・
Server started, version 0.10

Subscribe 処理を実行します。

動作確認のため Persistent Session 版も実行していますが、(Publish 処理も含め) clientId へ異なる値をそれぞれ指定します。(ここでは subscribe1・2 と publish1 としています)

また、トピック名は sample とします。

(a) Subscribe1 実行
> groovy mqtt_subscribe.groovy sample subscribe1

subscribe: sample
(b) Subscribe2(Persistent Session 版)実行
> groovy mqtt_subscribe2.groovy sample subscribe2

subscribe: sample

この状態で “a” “ab” “abc” という 3つのメッセージを publish してみます。

Publish 実行
> groovy mqtt_publish.groovy sample publish1 a
> groovy mqtt_publish.groovy sample publish1 ab
> groovy mqtt_publish.groovy sample publish1 abc

Subscribe の結果は以下のようになりました。

(a) Subscribe1 状況
> groovy mqtt_subscribe.groovy sample subscribe1

subscribe: sample
MqttMessage(sample,ByteString(97))
MqttMessage(sample,ByteString(97, 98))
MqttMessage(sample,ByteString(97, 98, 99))
(b) Subscribe2(Persistent Session 版)状況
> groovy mqtt_subscribe2.groovy sample subscribe2

subscribe: sample
MqttMessage(sample,ByteString(97))
MqttMessage(sample,ByteString(97, 98))
MqttMessage(sample,ByteString(97, 98, 99))

ここで (a) と (b) の処理を一度終了しておき、その状態で publish してみます。

Publish 実行
> groovy mqtt_publish.groovy sample publish1 abcd
> groovy mqtt_publish.groovy sample publish1 abcde

(a) と (b) を再実行すると、以下のように Persistent Session 版の方は停止中に publish されたメッセージを取得できました。

(a) Subscribe1 再実行
> groovy mqtt_subscribe.groovy sample subscribe1

subscribe: sample
(b) Subscribe2(Persistent Session 版)再実行
> groovy mqtt_subscribe2.groovy sample subscribe2

subscribe: sample
MqttMessage(sample,ByteString(97, 98, 99, 100))
MqttMessage(sample,ByteString(97, 98, 99, 100, 101))

JanusGraph でグラフ操作 - Groovy

TinkerPop の API と互換性があり Cassandra 等をストレージとして使用できる JanusGraph というグラフデータベースがあります。

今回は、「TinkerPop でグラフ操作 - Groovy」 のサンプルコードを JanusGraph 用に変更してみます。

ソースは http://github.com/fits/try_samples/tree/master/blog/20170814/

はじめに

今回はグラフデータの保存先として Cassandra を使うため、Cassandra を事前に実行しておきます。

a. 設定ファイル

TinkerPop の org.apache.tinkerpop.gremlin.structure.util.GraphFactory を使用する場合、gremlin.graphorg.janusgraph.core.JanusGraphFactory を設定し、JanusGraph 用の設定(storage.backend 等)を加えれば JanusGraph で使えます。

Cassandra を使う場合は storage.backendcassandra を設定します。

setting.properties
gremlin.graph=org.janusgraph.core.JanusGraphFactory

storage.backend=cassandra
storage.hostname=127.0.0.1

b. グラフデータ作成

JanusGraph は TinkerPop API と互換性があるため 前回 の処理内容を特に変える必要はなく、@Grapes の依存ライブラリ構成を JanusGraph 用に変えるだけです。

add-data.groovy
// 前回との違いは @Grapes の内容のみ
@Grapes([
    @Grab('org.janusgraph:janusgraph-cassandra:0.1.1'),
    @Grab('org.slf4j:slf4j-simple:1.7.25'),
    @GrabExclude('xml-apis#xml-apis;1.3.04'),
    @GrabExclude('com.github.jeremyh#jBCrypt;jbcrypt-0.4'),
    @GrabExclude('org.slf4j#slf4j-log4j12'),
    @GrabExclude('ch.qos.logback#logback-classic'),
    @GrabExclude('org.codehaus.groovy:groovy-swing'),
    @GrabExclude('org.codehaus.groovy:groovy-xml'),
    @GrabExclude('org.codehaus.groovy:groovy-jsr223')
])
import org.apache.tinkerpop.gremlin.structure.util.GraphFactory

def conf = args[0]

def addNode = { g, type, id, name = id ->
    def res = g.addVertex(type)

    res.property('oid', id)
    res.property('name', name)

    res
}

def createData = { g -> 
    def p = addNode(g, 'Principals', 'principals')

    def u1 = addNode(g, 'User', 'user1')
    def u2 = addNode(g, 'User', 'user2')
    def ad = addNode(g, 'User', 'admin')

    def g1 = addNode(g, 'Group', 'group1')

    [u1, u2, ad, g1].each {
        it.addEdge('PART_OF', p)
    }

    u2.addEdge('PART_OF', g1)

    def r = addNode(g, 'Resources', 'resources')

    def s1 = addNode(g, 'Service', 'service1')

    def s2 = addNode(g, 'Service', 'service2')
    def s2o1 = addNode(g, 'Operation', 'service2.get', 'get')
    def s2o2 = addNode(g, 'Operation', 'service2.post', 'post')

    [s2o1, s2o2].each {
        s2.addEdge('METHOD', it)
    }

    [s1, s2].each {
        r.addEdge('RESOURCE', it)
    }

    u1.addEdge('PERMIT', s1)
    g1.addEdge('PERMIT', s2o2)
    ad.addEdge('PERMIT', r)
}

GraphFactory.open(conf).withAutoCloseable { g ->
    g.tx().withAutoCloseable { tx ->
        createData(g)

        tx.commit()
    }
}
実行結果
> groovy add-data.groovy setting.properties

・・・
[main] INFO com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor - AddHost: 127.0.0.1
[main] INFO org.janusgraph.diskstorage.Backend - Initiated backend operations thread pool of size 8
[main] INFO org.janusgraph.diskstorage.log.kcvs.KCVSLog - Loaded unidentified ReadMarker start time 2017-08-12T14:17:38.078Z into org.janusgraph.diskstorage.log.kcvs.KCVSLog$MessagePuller@3bdb2c78

c. 経路の探索

こちらも @Grapes の内容を変えるだけです。(確認のためログ出力するようにしています)

find-data.groovy
// 前回との違いは @Grapes の内容のみ
@Grapes([
    @Grab('org.janusgraph:janusgraph-cassandra:0.1.1'),
    //@Grab('org.slf4j:slf4j-nop:1.7.25'),
    @Grab('org.slf4j:slf4j-simple:1.7.25'),
    @GrabExclude('xml-apis#xml-apis;1.3.04'),
    @GrabExclude('com.github.jeremyh#jBCrypt;jbcrypt-0.4'),
    @GrabExclude('org.slf4j#slf4j-log4j12'),
    @GrabExclude('ch.qos.logback#logback-classic'),
    @GrabExclude('org.codehaus.groovy:groovy-swing'),
    @GrabExclude('org.codehaus.groovy:groovy-xml'),
    @GrabExclude('org.codehaus.groovy:groovy-jsr223')
])
import org.apache.tinkerpop.gremlin.structure.util.GraphFactory
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__

def conf = args[0]
def start = args[1]
def end = args[2]

def toStr = {
    "${it.label()}[${it.id()}]{${it.properties().join(', ')}}"
}

GraphFactory.open(conf).withAutoCloseable { g ->
    g.tx().withAutoCloseable {
        def p = g.traversal().V()
            .has('oid', start)
            .repeat(__.outE().as('e').inV())
            .until(__.has('oid', end))
            .where(__.select('e').unfold().hasLabel('PERMIT'))
            .path()

        p.each {
            println it.objects().collect(toStr).join(' -> ')
        }
    }
}
実行結果
> groovy find-data.groovy setting.properties user2 service2.post

・・・
[main] WARN org.janusgraph.graphdb.transaction.StandardJanusGraphTx - Query requires iterating over all vertices [(oid = user2)]. For better performance, use indexes
・・・
User[4304]{vp[oid->user2], vp[name->user2]} -> PART_OF[4d6-3bk-4r9-3ao]{} -> Group[4272]{vp[oid->group1], vp[name->group1]} -> PERMIT[6c6-3ao-9hx-37c]{} -> Operation[4152]{vp[oid->service2.post], vp[name->post]}

インデックスを使った方が性能的に望ましいとの警告ログが出力されました。

d. インデックスの作成

警告ログを解消するためにインデックスを使ってみます。

TinkerPop で(汎用的な)インデックス作成 API を見つけられなかったので、インデックス作成に関しては JanusGraph の専用 API を使う必要がありそうです。

JanusGraph のインデックス機能には以下の 2通りがあり、指定のプロパティ値を持つノードやエッジを見つける用途には (a) を使えば良さそうです。

  • (a) Graph Indexes
  • (b) Vertex-centric Indexes

そして、(a) では以下のようなインデックスを使えます。

名称 概要 生成メソッド名
Composite Index 外部のインデックスエンジンを使わずに高速 buildCompositeIndex
Mixed Index 外部のインデックスエンジン(Elasticsearch 等)を使って数値の範囲検索や全文検索などが可能 buildMixedIndex

今回は oid プロパティに対して (a) の Composite Index を作ってみます。

(a) Graph Indexes は JanusGraphManagementbuildIndex メソッドで作成します。 その戻り値 IndexBuilderbuildCompositeIndex メソッドで Composite Index となります。 (ユニークインデックスとする場合は unique メソッドを呼び出します)

ここで、インデックスを作成しただけでは、既存データのインデックス化は行われないようなので、インデックス作成後に既存データのインデックス化も行うようにしています。

create_index.groovy
@Grapes([
    @Grab('org.janusgraph:janusgraph-cassandra:0.1.1'),
    @Grab('org.slf4j:slf4j-simple:1.7.25'),
    @GrabExclude('xml-apis#xml-apis;1.3.04'),
    @GrabExclude('com.github.jeremyh#jBCrypt;jbcrypt-0.4'),
    @GrabExclude('org.slf4j#slf4j-log4j12'),
    @GrabExclude('ch.qos.logback#logback-classic'),
    @GrabExclude('org.codehaus.groovy:groovy-swing'),
    @GrabExclude('org.codehaus.groovy:groovy-xml'),
    @GrabExclude('org.codehaus.groovy:groovy-jsr223')
])
import org.janusgraph.core.JanusGraphFactory
import org.janusgraph.core.schema.SchemaAction
import org.apache.tinkerpop.gremlin.structure.Vertex

def conf = args[0]

def graph = JanusGraphFactory.open(conf)

graph.withAutoCloseable { g ->
    // JanusGraphManagement の取得
    def manage = g.openManagement()

    def oidKey = manage.getPropertyKey('oid')

    // インデックスが未作成の場合にインデックスを作成
    if (manage.getGraphIndex('oidIndex') == null) {
        // oid を対象とした Composite Index の作成
        manage.buildIndex('oidIndex', Vertex)
            .addKey(oidKey)
            .buildCompositeIndex()

        manage.commit()
        // インデックス作成の完了待ち
        manage.awaitGraphIndexStatus(g, 'oidIndex').call()
    }

    manage = g.openManagement()
    // 既存データのインデックス化
    manage.updateIndex(manage.getGraphIndex('oidIndex'), SchemaAction.REINDEX).get()

    manage.commit()
}
インデックス作成結果
> groovy create_index.groovy setting.properties

・・・
[pool-19-thread-1] INFO org.janusgraph.graphdb.database.management.ManagementSystem$UpdateStatusTrigger - Set status REGISTERED on schema element oidIndex with property keys []
[pool-19-thread-1] INFO org.janusgraph.graphdb.database.management.ManagementLogger - Received all acknowledgements for eviction [1]
・・・
[Thread-3] INFO com.netflix.astyanax.thrift.ThriftKeyspaceImpl - Detected partitioner org.apache.cassandra.dht.Murmur3Partitioner for keyspace janusgraph
[Thread-7] INFO org.janusgraph.graphdb.olap.job.IndexRepairJob - Found index oidIndex
[Thread-3] INFO org.janusgraph.graphdb.database.management.ManagementSystem - Index update job successful for [oidIndex]

これで警告ログは出力されなくなりました。

経路探索の実行結果
> groovy find-data.groovy setting.properties user2 service2.post

・・・
User[4304]{vp[oid->user2], vp[name->user2]} -> PART_OF[4d6-3bk-4r9-3ao]{} -> Group[4272]{vp[oid->group1], vp[name->group1]} -> PERMIT[6c6-3ao-9hx-37c]{} -> Operation[4152]{vp[oid->service2.post], vp[name->post]}

TinkerPop でグラフ操作 - Groovy

前回、Neo4j の Cypher を使って実施したグラフ操作を Apache TinkerPop を使って Groovy (@Grab を使用)で実装してみました。

Apache TinkerPop はグラフ処理のためのフレームワークで、Neo4j 等の様々なグラフ DB ※ に対して共通のインターフェースを提供します。

 ※ グラフ DB だけではなく、
    Cassandra、HBase、DynamoDB 等をサポートする
    ライブラリも提供されています

ソースは http://github.com/fits/try_samples/tree/master/blog/20170718/

a. 設定ファイル

TinkerPop にはインメモリーの TinkerGraph が用意されていますが、前回と同様に Neo4j を使う事にします。

ただ、前回と違って Neo4j をサーバー起動せずに組み込み実行します。

TinkerPop には Graph オブジェクトを汎用的に生成する手段として org.apache.tinkerpop.gremlin.structure.util.GraphFactory が用意されています。

GraphFactory.open(<設定ファイル>) を使えば、依存ライブラリと設定ファイルを差し替えて DB を切り替える事もできそうなので、今回はこの方法を使います。

Neo4j を組み込み利用する場合、gremlin.graphorg.apache.tinkerpop.gremlin.neo4j.structure.Neo4jGraph を設定して gremlin.neo4j.directory に DB ファイルを出力するディレクトリを指定します。 (Neo4j をサーバー起動した場合の data/databases/graph.db)

setting.properties
gremlin.graph=org.apache.tinkerpop.gremlin.neo4j.structure.Neo4jGraph
gremlin.neo4j.directory=neo4jdb

b. グラフデータ作成

前回 と同様のデータを作成する処理を実装してみます。

Groovy で実行する際の注意点として、neo4j-tinkerpop-api-impl 等は依存ライブラリとして Groovy 2.4.11 のライブラリを含んでおり、このバージョン以外の groovy コマンドで実行すると org.codehaus.groovy.control.MultipleCompilationErrorsException が発生してしまいます。

そこで今回は、Groovy 2.5.0 beta1 で実行できるように @GrabExclude を使って groovy-xml 等を除くようにしています。

グラフの操作は GraphFactory.open() で取得した Graph オブジェクトに対して実施します。

ノードの追加は addVertex、エッジの追加は addEdge メソッドで行う事ができ、property メソッドで任意の属性を設定できます。

トランザクションtx メソッドで開始します。※

 ※ TinkerGraph のように tx メソッドをサポートしていないものもありますので
    (その場合に tx() を呼び出すとエラーになる)
    実際は tx のサポート有無をチェックしてから
    呼び出すようにした方が安全だと思います

    (例)
        if (g.features().graph().supportsTransactions()) {
            g.tx().withAutoCloseable { t ->
                ・・・
            }
        }
add-data.groovy
@Grapes([
    @Grab('org.apache.tinkerpop:neo4j-gremlin:3.2.5'),
    @Grab('org.neo4j:neo4j-tinkerpop-api-impl:0.6-3.2.2'),
    @Grab('org.slf4j:slf4j-nop:1.7.25'),
    @GrabExclude('org.codehaus.groovy:groovy-xml'),
    @GrabExclude('org.codehaus.groovy:groovy-swing'),
    @GrabExclude('org.codehaus.groovy:groovy-jsr223')
])
import org.apache.tinkerpop.gremlin.structure.util.GraphFactory

def conf = args[0]

def addNode = { g, type, id, name = id ->
    def res = g.addVertex(type)

    res.property('oid', id)
    res.property('name', name)

    res
}

def createData = { g -> 
    def p = addNode(g, 'Principals', 'principals')

    def u1 = addNode(g, 'User', 'user1')
    def u2 = addNode(g, 'User', 'user2')
    def ad = addNode(g, 'User', 'admin')

    def g1 = addNode(g, 'Group', 'group1')

    [u1, u2, ad, g1].each {
        it.addEdge('PART_OF', p)
    }

    u2.addEdge('PART_OF', g1)

    def r = addNode(g, 'Resources', 'resources')

    def s1 = addNode(g, 'Service', 'service1')

    def s2 = addNode(g, 'Service', 'service2')
    def s2o1 = addNode(g, 'Operation', 'service2.get', 'get')
    def s2o2 = addNode(g, 'Operation', 'service2.post', 'post')

    [s2o1, s2o2].each {
        s2.addEdge('METHOD', it)
    }

    [s1, s2].each {
        r.addEdge('RESOURCE', it)
    }

    u1.addEdge('PERMIT', s1)
    g1.addEdge('PERMIT', s2o2)
    ad.addEdge('PERMIT', r)
}

GraphFactory.open(conf).withAutoCloseable { g ->
    g.tx().withAutoCloseable { tx ->
        createData(g)

        tx.commit()
    }
}

withAutoCloseableAutoCloseable のリソースをクローズするための Groovy の機能です(TinkerPop の API ではありません)

実行結果
> groovy add-data.groovy setting.properties

c. 経路の探索

前回 の経路探索の処理を TinkerPop の API で実装してみます。

traversal().V() でノードを対象とした GraphTraversal<Vertex,Vertex> を取得でき、has(<プロパティ名>, <プロパティ値>) 等を使ってノードの条件を指定できます。

(c-1) 複数エッジ(条件なし)

まず、エッジは気にせずに指定ノードから指定ノードまでのパス(経路)を取得する処理を実装してみます。

終点のノードまで複数のノードで繋がっている場合は repeat(・・・).until(<終点ノードの条件>) で探せます。

エッジの条件を指定しない場合は repeat(__.out()).until(・・・) で取得できます。(__ はクラス名です)

パスを取得するには path() を使います。 Path からは objects() でパスに含まれるノード(やエッジ)を List<Object> で取得できます。

なお、以下の処理でトランザクションは必要ありませんが、一応トランザクションを使っています。 (JanusGraph をマルチスレッドで使うケースではトランザクションが必要になったので)

find-data-simple.groovy
@Grapes([
    @Grab('org.apache.tinkerpop:neo4j-gremlin:3.2.5'),
    @Grab('org.neo4j:neo4j-tinkerpop-api-impl:0.6-3.2.2'),
    @Grab('org.slf4j:slf4j-nop:1.7.25'),
    @GrabExclude('org.codehaus.groovy:groovy-xml'),
    @GrabExclude('org.codehaus.groovy:groovy-swing'),
    @GrabExclude('org.codehaus.groovy:groovy-jsr223')
])
import org.apache.tinkerpop.gremlin.structure.util.GraphFactory
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__

def conf = args[0]
def start = args[1]
def end = args[2]

def toStr = {
    "${it.label()}[${it.id()}]{${it.properties().join(', ')}}"
}

GraphFactory.open(conf).withAutoCloseable { g ->
    g.tx().withAutoCloseable {
        def p = g.traversal().V()
            .has('oid', start) // 始点ノードの条件
            .repeat(__.out())
            .until(__.has('oid', end)) // 終点ノードの条件
            .path()

        p.each {
            println it.objects().collect(toStr).join(' -> ')
        }
    }
}
実行結果

user2 から service2.post へのパスを取得してみます。

> groovy find-data-simple.groovy setting.properties user2 service2.post

User[2]{vp[oid->user2], vp[name->user2]} -> Group[4]{vp[oid->group1], vp[name->group1]} -> Operation[9]{vp[oid->service2.post], vp[name->post]}

__.out() を使った事で、パスにはノードの情報だけが含まれエッジの情報を含んでいません。

(c-2) 複数エッジ(条件あり)

エッジの条件を指定するには repeat でエッジも指定します。

A ノードと B ノードが C エッジで繋がっている (A)-[C]->(B) のような状態で、A の __.outE() が C エッジで、C エッジの inV() が B ノードとなります。

そのため、repeat(__.outE().as('e').inV()) とすれば、複数の外向きエッジで繋がっているノードを検索する事ができます。

ここで as(<ステップラベル名>) を使って、該当するエッジにラベル名を付けておき、where での条件判定(PERMIT エッジを含むかどうか)に使います。

__.select('e') の結果は GraphTraversal<Vertex, ArrayList<Edge>> のようになるので __.select('e').hasLabel(・・・) とはできません。 (ClassCastException になります)

unfold() で GraphTraversal<Vertex, ArrayList> を GraphTraversal<Vertex, Edge> にして hasLabel を使えば、該当のラベルを持つエッジを含んでいるかどうかを条件判定できます。

find-data.groovy
・・・
GraphFactory.open(conf).withAutoCloseable { g ->
    g.tx().withAutoCloseable {
        def p = g.traversal().V()
            .has('oid', start) // 始点ノードの条件
            .repeat(__.outE().as('e').inV())
            .until(__.has('oid', end)) // 終点ノードの条件
            .where(__.select('e').unfold().hasLabel('PERMIT')) // PERMIT エッジを含んでいるかどうかの判定
            .path()

        p.each {
            println it.objects().collect(toStr).join(' -> ')
        }
    }
}
実行結果
> groovy find-data.groovy setting.properties user2 service2.post

User[2]{vp[oid->user2], vp[name->user2]} -> PART_OF[4]{} -> Group[4]{vp[oid->group1], vp[name->group1]} -> PERMIT[10]{} -> Operation[9]{vp[oid->service2.post], vp[name->post]}

__.outE() を使った事でエッジの情報もパスに含まれるようになりました。

Groovy で Kafka を組み込み実行

Groovy で Apache Kafka を組み込み実行してみました。

今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170410/

Kafka 組み込み実行

Kafka の実行には ZooKeeper が必要なため、ZooKeeper と Kafka の両方を組み込み実行します。

ZooKeeperServerMain を実行 (initializeAndRun) すると処理をブロックしてしまい、後続の処理を実行できないので、今回は別スレッドで実行するようにしました。

initializeAndRun メソッドは以下のように配列の要素数によって引数の解釈が異なるようです。

引数の数 引数の内容
1つ 設定ファイルのパス
2つ以上 ポート番号, データディレクトリ, tickTime, maxClientCnxns
kafka_embed.groovy
@Grab('org.apache.kafka:kafka_2.12:0.10.2.0')
@Grab('org.apache.zookeeper:zookeeper:3.5.2-alpha')
import kafka.server.KafkaServerStartable
import org.apache.zookeeper.server.ZooKeeperServerMain

def zkPort = '2181'
def zkDir = 'zk-tmp'
def kafkaDir = 'kafka-logs'

def zk = new ZooKeeperServerMain()

Thread.start {
    // ZooKeeper の実行
    zk.initializeAndRun([zkPort, zkDir] as String[])
}

def kafkaProps = new Properties()
kafkaProps.setProperty('zookeeper.connect', "localhost:${zkPort}")
kafkaProps.setProperty('log.dir', kafkaDir)

def kafka = KafkaServerStartable.fromProps(kafkaProps)
// Kafka の実行
kafka.startup()

println 'startup ...'

System.in.read()

kafka.shutdown()
zk.shutdown()

Groovy のデフォルト設定ではメモリ不足で起動に失敗したため、JAVA_OPTS 環境変数で最大メモリサイズを変更して実行します。

実行
> set JAVA_OPTS=-Xmx512m
> groovy kafka_embed.groovy

startup ・・・

備考

ZooKeeper を組み込み実行するには、Apache Curatororg.apache.curator.test.TestingServer を使う方法もあります。

TestingServer の close 時に ZooKeeper のデータディレクトリを削除しないようにするには InstanceSpec を使います。 (コンストラクタの第5引数を false にすると削除しなくなる)

kafka_embed_curator.groovy
@Grab('org.apache.kafka:kafka_2.12:0.10.2.0')
@Grapes([
    @Grab('org.apache.curator:curator-test:3.3.0'),
    @GrabExclude('io.netty#netty:3.7.0.Final')
])
import kafka.server.KafkaServerStartable
import org.apache.curator.test.TestingServer
import org.apache.curator.test.InstanceSpec

def zkPort = 2181
def zkDir = 'zk-tmp'
def kafkaDir = 'kafka-logs'

// close 時にデータディレクトリを残すように false を指定
def spec = new InstanceSpec(new File(zkDir), zkPort, -1, -1, false, -1)

def props = new Properties()
props.setProperty('zookeeper.connect', "localhost:${zkPort}")
props.setProperty('log.dir', kafkaDir)

// 第2引数を true にするとコンストラクタ内で start メソッドを実行する
new TestingServer(spec, false).withCloseable { zk ->
    zk.start()

    def kafka = KafkaServerStartable.fromProps(props)

    kafka.startup()

    println 'startup ...'

    System.in.read()

    kafka.shutdown()

    zk.stop()
}

Kafka クライアント

ついでに、Kafka の各種クライアント処理も Groovy で実装してみます。

a. KafkaProducer でメッセージ送信

まずは KafkaProducer を使った Kafka へのメッセージ送信処理です。

メッセージはトピックへ送信する事になり、トピックが未作成の場合は自動的に作成されます。(kafka.admin.TopicCommand 等でトピックを事前に作成しておく事も可能)

bootstrap.servers で接続先の Kafka を指定します。

kafka_client_producer.groovy
@Grab('org.apache.kafka:kafka-clients:0.10.2.0')
@Grab('org.slf4j:slf4j-simple:1.7.24')
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord

def topic = args[0]
def key = args[1]
def value = args[2]

def props = new Properties()

props.setProperty('bootstrap.servers', 'localhost:9092')
props.setProperty('key.serializer', 'org.apache.kafka.common.serialization.StringSerializer')
props.setProperty('value.serializer', 'org.apache.kafka.common.serialization.StringSerializer')

new KafkaProducer(props).withCloseable { producer ->

    def res = producer.send(new ProducerRecord(topic, key, value))

    println "***** result: ${res.get()}"
}
KafkaProducer 実行例
> groovy kafka_client_producer.groovy sample1 a 123

・・・
***** result: sample1-0@0
・・・

> groovy kafka_client_producer.groovy sample1 b 456

・・・
***** result: sample1-0@1
・・・

b. KafkaConsumer でメッセージ受信

次に KafkaConsumer でメッセージを受信してみます。

トピックを subscribe する事でメッセージを受信します。

デフォルトでは subscribe 後に送信されたメッセージを受信する事になります。 送信済みのメッセージも受信するには auto.offset.resetearliest を設定します。

Kafka では group.id で指定したグループ ID 毎にメッセージの Offset (どのメッセージまでを受信したか) が管理されます。

複数のクライアントが同一グループ ID に属している場合は、その中の 1つがメッセージを受信する事になるようです。

kafka_client_consumer.groovy
@Grab('org.apache.kafka:kafka-clients:0.10.2.0')
@Grab('org.slf4j:slf4j-simple:1.7.24')
import org.apache.kafka.clients.consumer.KafkaConsumer
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executors

def topic = args[0]
def group = args[1]

def props = new Properties()

props.setProperty('bootstrap.servers', 'localhost:9092')
props.setProperty('group.id', group)
props.setProperty('key.deserializer', 'org.apache.kafka.common.serialization.StringDeserializer')
props.setProperty('value.deserializer', 'org.apache.kafka.common.serialization.StringDeserializer')
props.setProperty('auto.offset.reset', 'earliest')

def stopLatch = new CountDownLatch(1)

def es = Executors.newSingleThreadExecutor()

es.submit {
    new KafkaConsumer(props).withCloseable { consumer ->

        consumer.subscribe([topic])

        while(stopLatch.count > 0) {
            def records = consumer.poll(1000)

            records.each {
                println "***** result: ${it}"
            }
        }
    }
}

System.in.read()

stopLatch.countDown()

es.shutdown()
KafkaConsumer 実行例
> groovy kafka_client_consumer.groovy sample1 g1

・・・
***** result: ConsumerRecord(topic = sample1, partition = 0, offset = 0, CreateTime = 1491758385860, checksum = 1240240547, serialized key size = 1, serialized value size = 3, key = a, value = 123)
***** result: ConsumerRecord(topic = sample1, partition = 0, offset = 1, CreateTime = 1491758400116, checksum = 728766236, serialized key size = 1, serialized value size = 3, key = b, value = 456)

c. KafkaStreams でメッセージ受信

KafkaStreams を使ってメッセージを受信します。

グループ ID は application.id で指定するようです。

kafka_client_stream.groovy
@Grab('org.apache.kafka:kafka-streams:0.10.2.0')
@Grab('org.slf4j:slf4j-simple:1.7.24')
import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.kstream.KStreamBuilder
import org.apache.kafka.common.serialization.Serdes

def topic = args[0]
def group = args[1]

def props = new Properties()

props.put('application.id', group)
props.put('bootstrap.servers', 'localhost:9092')
props.put('key.serde', Serdes.String().class)
props.put('value.serde', Serdes.String().class)

def builder = new KStreamBuilder()
builder.stream(topic).print()

def streams = new KafkaStreams(builder, props)

streams.start()

System.in.read()

streams.close()

動作確認のため、未使用のグループ ID を使って実行します。

KafkaStreams 実行例
> groovy kafka_client_stream.groovy sample1 g2

・・・
[KSTREAM-SOURCE-0000000000]: a , 123
[KSTREAM-SOURCE-0000000000]: b , 456

d. KafkaConsumerGroupService で Offset 確認

KafkaConsumerGroupService を使ってグループ ID 毎の Offset を確認してみます。

kafka_group_offset.groovy
@Grab('org.apache.kafka:kafka_2.12:0.10.2.0')
import kafka.admin.ConsumerGroupCommand.ConsumerGroupCommandOptions
import kafka.admin.ConsumerGroupCommand.KafkaConsumerGroupService

def group = args[0]

def params = ['--bootstrap-server', 'localhost:9092', '--group', group] as String[]

def opts = new ConsumerGroupCommandOptions(params)

def svc = new KafkaConsumerGroupService(opts)

def res = svc.describeGroup()

res._2.foreach {
    it.foreach { st ->
        println "topic = ${st.topic.value}, offset = ${st.offset.value}, partition = ${st.partition.value}"
    }
}

svc.close()
KafkaConsumerGroupService 実行例
> groovy kafka_group_offset.groovy g1

topic = sample1, offset = 2, partition = 0

Groovy で Apache Flink を使用

Groovy で Apache Spark を使用」と同様の処理を Apache Flink で試してみました。

今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170311/

サンプルスクリプト

今回はローカルで実行するだけなので ExecutionEnvironment.createLocalEnvironment() で取得した LocalEnvironment を使用します。

map メソッドの引数へ Groovy のクロージャを使ったところ、org.apache.flink.api.common.InvalidProgramException: The implementation of the MapFunction is not serializable. ・・・ となってしまい駄目でしたので、MapFunction の実装クラスを定義しました。

その場合、MapFunction の型引数をきちんと指定する必要があります。(そうしないと InvalidTypesException が発生)

なお、flink-clients_2.10 を使用する場合、scala-library の @Grab 定義は不要でした。(flink-clients_2.11 の場合のみ scala-library が必要)

money_count.groovy
@Grapes([
    @Grab('org.apache.flink:flink-java:1.2.0'),
    @GrabExclude('io.netty#netty;3.7.0.Final')
])
@Grab('org.apache.flink:flink-clients_2.11:1.2.0')
@Grab('org.scala-lang:scala-library:2.11.8')
@Grab('org.jboss.netty:netty:3.2.10.Final')
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.java.ExecutionEnvironment
import org.apache.flink.api.java.tuple.Tuple2
import groovy.transform.CompileStatic

// @CompileStatic は必須ではない(無くても動作する)
@CompileStatic
class ToTuple implements MapFunction<String, Tuple2<String, Integer>> {
    Tuple2 map(String v) {
        new Tuple2(v, 1)
    }
}

def env = ExecutionEnvironment.createLocalEnvironment()

env.readTextFile(args[0]).map(new ToTuple()).groupBy(0).sum(1).print()

groupBy メソッドではグルーピング対象とする項目を、sum メソッドでは合計する項目を数値で指定します。

実行

Groovy のデフォルト設定では java.lang.IllegalArgumentException: Size of total memory must be positive. が発生しましたので、JAVA_OPTS 環境変数で最大メモリサイズ (-Xmx) を変更して実行します。

実行結果
> set JAVA_OPTS=-Xmx512m
> groovy money_count.groovy input_sample.txt

Connected to JobManager at Actor[akka://flink/user/jobmanager_1#1033779003]
03/08/2017 00:56:11     Job execution switched to status RUNNING.
・・・
(10000,2)
(10,2)
(100,2)
(50,1)
(500,1)
(1,2)
(1000,3)
(2000,1)
(5,3)

input_sample.txt の内容は以下の通りです。

input_sample.txt
100
1
5
50
500
1000
10000
1000
1
10
5
5
10
100
1000
10000
2000