Akka Streams で MQTT Broker へ接続

ローカルで実行した MQTT Broker(前回 参照)に対して Akka Streams の JavaAPI を使って Groovy で接続してみます。

Akka Streams 用の様々なコネクタを備えた Alpakka に MQTT Broker 用の Source や Sink が用意されているので、今回はこちらを使います。

今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170925/

Publish 処理

まずは publish 処理です。

MqttConnectionSettings から MqttSink を作成し、MqttMessage を渡せば MQTT Broker へメッセージを送信できます。

MqttConnectionSettings は MQTT Broker の接続先と clientId、そして永続化の方法を指定して作成します。

clientId はクライアント毎に一意な値を指定します。 (clientId を null にすると IllegalArgumentException: Null clientId となりました)

MqttQoS ではメッセージ到達の QoS を指定します。 MqttQoS.atLeastOnce() は少なくとも 1回(重複の可能性あり)の到達可能性を指定する事になります。

@Grab('com.typesafe.akka:akka-stream_2.12:2.5.4')
@Grab('com.lightbend.akka:akka-stream-alpakka-mqtt_2.12:0.11')
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.javadsl.Source
import akka.stream.alpakka.mqtt.MqttQoS
import akka.stream.alpakka.mqtt.MqttConnectionSettings
import akka.stream.alpakka.mqtt.MqttMessage
import akka.stream.alpakka.mqtt.javadsl.MqttSink
import akka.util.ByteString

def topic = args[0]
def clientId = args[1]
def message = args[2]

def system = ActorSystem.create()
def mat = ActorMaterializer.create(system)

def settings = MqttConnectionSettings.create(
    'tcp://localhost:1883',
    clientId,
    new org.eclipse.paho.client.mqttv3.persist.MemoryPersistence()
)

def msg = MqttMessage.create(topic, ByteString.fromString(message))

Source.single(msg)
    .runWith(MqttSink.create(settings, MqttQoS.atLeastOnce()), mat)

// メッセージ送信前に terminate しないようスリープで調整
sleep 1000

system.terminate()

Subscribe 処理

次に subscribe 処理です。

MqttConnectionSettings から MqttSourceSettings を作り、MqttSource を作成します。

MqttSourceSettings の withSubscriptions で subscribe するトピック名と QoS を指定します。

mqtt_subscribe.groovy
@Grab('com.typesafe.akka:akka-stream_2.12:2.5.4')
@Grab('com.lightbend.akka:akka-stream-alpakka-mqtt_2.12:0.11')
import akka.actor.ActorSystem
import akka.japi.Pair
import akka.stream.ActorMaterializer
import akka.stream.javadsl.Sink
import akka.stream.alpakka.mqtt.MqttQoS
import akka.stream.alpakka.mqtt.MqttSourceSettings
import akka.stream.alpakka.mqtt.MqttConnectionSettings
import akka.stream.alpakka.mqtt.javadsl.MqttSource

def topic = args[0]
def clientId = args[1]

def system = ActorSystem.create()
def mat = ActorMaterializer.create(system)

def settings = MqttSourceSettings.create(
    MqttConnectionSettings.create(
        'tcp://localhost:1883',
        clientId,
        new org.eclipse.paho.client.mqttv3.persist.MemoryPersistence()
    )
).withSubscriptions(
    // トピックと QoS の指定
    Pair.create(topic, MqttQoS.atLeastOnce())
)

MqttSource.create(settings, 10).runWith(Sink.foreach { println it }, mat)

println "subscribe : ${topic}"

System.in.read()

system.terminate()

上記の処理では、再接続の際にセッションがクリアされてしまうので、通信が切断している間のメッセージを後で受け取るような事はできません。

再接続の際にセッションが復元される Persistent Session を適用するには、以下のように withCleanSession(false) とする必要があります。

なお、Persistent Session を適用するには同じ clientId を使って再接続する必要があります。

mqtt_subscribe2.groovy (Persistent Session 版)
・・・

def settings = MqttSourceSettings.create(
    MqttConnectionSettings.create(
        'tcp://localhost:1883',
        clientId,
        new org.eclipse.paho.client.mqttv3.persist.MemoryPersistence()
    ).withCleanSession(false) // Persistent Session
).withSubscriptions(
    Pair.create(topic, MqttQoS.atLeastOnce())
)

・・・

動作確認

前回 の Moquette 組み込み実行スクリプトで MQTT Broker を起動しておきます。

MQTT Broker 実行
> groovy moquette_run.groovy

・・・
[main] INFO io.moquette.server.netty.NettyAcceptor - Server has been bound. host=0.0.0.0, port=1883
・・・
Server started, version 0.10

Subscribe 処理を実行します。

動作確認のため Persistent Session 版も実行していますが、(Publish 処理も含め) clientId へ異なる値をそれぞれ指定します。(ここでは subscribe1・2 と publish1 としています)

また、トピック名は sample とします。

(a) Subscribe1 実行
> groovy mqtt_subscribe.groovy sample subscribe1

subscribe: sample
(b) Subscribe2(Persistent Session 版)実行
> groovy mqtt_subscribe2.groovy sample subscribe2

subscribe: sample

この状態で “a” “ab” “abc” という 3つのメッセージを publish してみます。

Publish 実行
> groovy mqtt_publish.groovy sample publish1 a
> groovy mqtt_publish.groovy sample publish1 ab
> groovy mqtt_publish.groovy sample publish1 abc

Subscribe の結果は以下のようになりました。

(a) Subscribe1 状況
> groovy mqtt_subscribe.groovy sample subscribe1

subscribe: sample
MqttMessage(sample,ByteString(97))
MqttMessage(sample,ByteString(97, 98))
MqttMessage(sample,ByteString(97, 98, 99))
(b) Subscribe2(Persistent Session 版)状況
> groovy mqtt_subscribe2.groovy sample subscribe2

subscribe: sample
MqttMessage(sample,ByteString(97))
MqttMessage(sample,ByteString(97, 98))
MqttMessage(sample,ByteString(97, 98, 99))

ここで (a) と (b) の処理を一度終了しておき、その状態で publish してみます。

Publish 実行
> groovy mqtt_publish.groovy sample publish1 abcd
> groovy mqtt_publish.groovy sample publish1 abcde

(a) と (b) を再実行すると、以下のように Persistent Session 版の方は停止中に publish されたメッセージを取得できました。

(a) Subscribe1 再実行
> groovy mqtt_subscribe.groovy sample subscribe1

subscribe: sample
(b) Subscribe2(Persistent Session 版)再実行
> groovy mqtt_subscribe2.groovy sample subscribe2

subscribe: sample
MqttMessage(sample,ByteString(97, 98, 99, 100))
MqttMessage(sample,ByteString(97, 98, 99, 100, 101))

MQTT Broker をローカル実行

以下の MQTT Broker をそれぞれローカルで実行してみました。

ソースは http://github.com/fits/try_samples/tree/master/blog/20170910/

Mosca

Mosca は Node.js 用の MQTT Broker です。

npm でインストールして mosca コマンドで実行できます。

インストール例
> npm install mosca
実行例
> mosca -v

       +++.+++:   ,+++    +++;   '+++    +++.
      ++.+++.++   ++.++  ++,'+  `+',++  ++,++
      +`  +,  +: .+  .+  +;  +; '+  '+  +`  +`
      +`  +.  +: ,+  `+  ++  +; '+  ;+  +   +.
      +`  +.  +: ,+  `+   +'    '+      +   +.
      +`  +.  +: ,+  `+   :+.   '+      +++++.
      +`  +.  +: ,+  `+    ++   '+      +++++.
      +`  +.  +: ,+  `+     ++  '+      +   +.
      +`  +.  +: ,+  `+  +:  +: '+  ;+  +   +.
      +`  +.  +: .+  .+  +;  +; '+  '+  +   +.
      +`  +.  +:  ++;++  ++'++   ++'+'  +   +.
      +`  +.  +:   +++    +++.   ,++'   +   +.
{"pid":11260,"hostname":"host1","name":"mosca","level":30,"time":1504448625943,"msg":"server started","mqtt":1883,"v":1}

ログが JSON で出力されていますが、pino ※ を使えば以下のようにログを整形して出力してくれます。

 ※ mosca 2.5.2 を npm install すると pino もインストールされました
実行例 - pino 利用
> mosca -v | pino

       +++.+++:   ,+++    +++;   '+++    +++.
      ++.+++.++   ++.++  ++,'+  `+',++  ++,++
      +`  +,  +: .+  .+  +;  +; '+  '+  +`  +`
      +`  +.  +: ,+  `+  ++  +; '+  ;+  +   +.
      +`  +.  +: ,+  `+   +'    '+      +   +.
      +`  +.  +: ,+  `+   :+.   '+      +++++.
      +`  +.  +: ,+  `+    ++   '+      +++++.
      +`  +.  +: ,+  `+     ++  '+      +   +.
      +`  +.  +: ,+  `+  +:  +: '+  ;+  +   +.
      +`  +.  +: .+  .+  +;  +; '+  '+  +   +.
      +`  +.  +:  ++;++  ++'++   ++'+'  +   +.
      +`  +.  +:   +++    +++.   ,++'   +   +.
[2017-09-03T14:24:23.929Z] INFO (mosca/3124 on host1): server started
    mqtt: 1883

サーバー組み込み実行

Mosca を組み込み実行するコードは以下の通りです。

実際は new mosca.Server() だけでサーバーが起動するのですが、そのままだとクライアントからの接続状況が分かり難いのでログ出力しています。

mosca_run.js
const mosca = require('mosca')

const server = new mosca.Server()

server.on('ready', () => console.log('server started'))

server.on('clientConnected', client => 
    console.log(`client connected: ${client.id}`))

server.on('published', (packet) => 
    console.log(`published: ${JSON.stringify(packet)}`))

クライアント処理

MQTT.js をインストールして MQTT のクライアント処理を実装してみます。

MQTT.js インストール例
> npm install mqtt

まずは、指定のトピックへメッセージを publish する処理です。

publish_sample.js
const mqtt = require('mqtt')
const client = mqtt.connect('mqtt://localhost')

const topic = process.argv[2]
const msg = process.argv[3]

client.on('connect', () => {
    client.publish(topic, msg)

    client.end()
})

次に、指定のトピックを subscribe する処理です。

subscribe_sample.js
const mqtt = require('mqtt')
const client = mqtt.connect('mqtt://localhost')

const topic = process.argv[2]

client.on('connect', () => {
    client.subscribe(topic)
})

client.on('message', (topic, msg) => {
    console.log(`topic: ${topic}, msg: ${msg}`)
})

動作確認

まずは Mosca サーバーを実行しておきます。(mosca コマンドで実行しても可)

サーバー組み込み実行
> node mosca_run.js

server started

次に data トピックを subscribe してみます。

subscribe 実行
> node subscribe_sample.js data

data トピックへ sample1 ~ 3 という文字列を publish してみます。

publish 実行
> node publish_sample.js data sample1
> node publish_sample.js data sample2
> node publish_sample.js data sample3

subscribe 側にメッセージが出力されました。

subscribe の結果
topic: data, msg: sample1
topic: data, msg: sample2
topic: data, msg: sample3

Moquette

Moquette は Java 用の MQTT Broker です。

distribution-0.10-bundle-tar.tar.gz をダウンロード・解凍した後、bin/moquette.bat や bin/moquette.sh で実行できるようですが、moquette.bat の内容に問題があって、そのままでは Java 8 で実行できませんでした。(## の行を削除するか rem を付けて、JAVA_OPTS の設定を削る等が必要でした)

サーバー組み込み実行

組み込み実行は io.moquette.server.Servermain メソッドを呼び出すだけです。

ただし、このままでは IllegalArgumentException: Can't locate file "null" となってしまうので config/moquette.conf ファイルを作成しておきます。(デフォルト設定を使うのなら中身は空でよい)

moquette_run.groovy
@GrabResolver(name = 'bintray', root = 'https://jcenter.bintray.com')
@Grab('io.moquette:moquette-broker:0.10')
@Grab('org.slf4j:slf4j-simple:1.7.25')
import io.moquette.server.Server

Server.main(args)

動作確認

Mosca と同様に動作確認を行ってみます。

サーバー組み込み実行
> groovy moquette_run.groovy

・・・
[main] INFO io.moquette.server.netty.NettyAcceptor - Server has been bound. host=0.0.0.0, port=1883
[main] INFO io.moquette.server.netty.NettyAcceptor - Configuring Websocket MQTT transport
[main] INFO io.moquette.server.netty.NettyAcceptor - Property websocket_port has been setted to disabled. Websocket MQTT will be disabled
[main] INFO io.moquette.server.Server - Moquette server has been initialized successfully
Server started, version 0.10

data トピックを subscribe してみます。

subscribe 実行
> node subscribe_sample.js data

data トピックへ sample1 ~ 3 という文字列を publish してみます。

publish 実行
> node publish_sample.js data sample1
> node publish_sample.js data sample2
> node publish_sample.js data sample3

subscribe 側にメッセージが出力されました。

subscribe の結果
topic: data, msg: sample1
topic: data, msg: sample2
topic: data, msg: sample3

MXNet で iris を分類

MXNet を使って、階層型ニューラルネットによる iris の分類を試してみました。

ソースは http://github.com/fits/try_samples/tree/master/blog/20170821/

準備

MXNet は様々なプログラミング言語用の Docker イメージを提供しているので、今回は Python 用の mxnet/python をそのまま使います。

Docker イメージの pull
$ docker pull mxnet/python

(1) iris データセット

Keras で iris を分類」 では sklearn の iris データセットを使いましたが、今回は numpy を使って iris.data ファイルを処理してデータセットを作成します。

iris データセット (iris.data) は https://archive.ics.uci.edu/ml/datasets/iris からダウンロードし配置(今回は /vagrant/work)しておきます。

品種(Iris-setosa 等)は質的データ(カテゴリカルデータ)ですが、このままだと MXNet では扱えなさそうだったので数値化するようにしました。

numpy で該当するような関数が見当たらなかったので、品種をリスト化して該当する要素のインデックスを取得する方法で数値化するようにしてみました。 (map 処理も無さそうだったので vectorize で代用しています)

iris データセットの作成処理
import numpy as np

# 品種の数値化(Iris-setosa -> 0, Iris-versicolor -> 1, Iris-virginica -> 2)
def categorical(ds):
    ct = np.unique(ds).tolist()
    return np.vectorize(lambda x: ct.index(x))(ds)

iris = np.loadtxt('iris.data', delimiter = ',', dtype = [
    ('sepal-length', 'f4'), ('sepal-width', 'f4'), 
    ('petal-length', 'f4'), ('petal-width', 'f4'), 
    ('species', 'S15')
])

iris_data = np.c_[
    iris['sepal-length'], 
    iris['sepal-width'], 
    iris['petal-length'], 
    iris['petal-width']
]

iris_label = categorical(iris['species'])

これで iris_data と iris_label は以下のような内容になります。

iris_data の内容
[[ 5.0999999   3.5         1.39999998  0.2       ]
 [ 4.9000001   3.          1.39999998  0.2       ]
 [ 4.69999981  3.20000005  1.29999995  0.2       ]
 [ 4.5999999   3.0999999   1.5         0.2       ]
 ・・・
 ・・・]
iris_label の内容
[0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
 0 0 0 0 0 0 0 0 0 0 0 0 0 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 2
 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2
 2 2]

(2) 学習と評価

それでは iris データセットを学習用と評価用に分割して学習と評価をそれぞれ実行してみます。

MXNet では学習や評価に使うデータは mxnet.io.DataIter で表現し、numpy の ndarray を使う場合は mx.io.NDArrayIter を使えば良さそうです。

学習・評価用のデータ分割してくれるような mxnet.io.DataIter は見当たらなかったので、「Keras で iris を分類」 と同様に random.permutation で 0 ~ 149 の数値をランダムに配置した配列を作成して、学習・評価用にデータ分割しました。

また、MXNet には以下のような API が用意されていますが、今回は Module API の方を使う事にします。

Module API では mx.symFullyConnected 等を使ってニューラルネットの構成を定義して mx.mod.Module でモジュールを作成、fit で学習、score で評価を実施できます。

/vagrant/work/iris_sample.py
import mxnet as mx
import numpy as np

train_test_rate = 0.7

def categorical(ds):
    ct = np.unique(ds).tolist()
    return np.vectorize(lambda x: ct.index(x))(ds)

iris = np.loadtxt('iris.data', delimiter = ',', dtype = [
    ('sepal-length', 'f4'), ('sepal-width', 'f4'), 
    ('petal-length', 'f4'), ('petal-width', 'f4'), 
    ('species', 'S15')
])

iris_data = np.c_[
    iris['sepal-length'], 
    iris['sepal-width'], 
    iris['petal-length'], 
    iris['petal-width']
]

iris_label = categorical(iris['species'])

data_size = len(iris)
train_size = int(data_size * train_test_rate)

perm = np.random.permutation(data_size)
train_perm = perm[0:train_size]
test_perm = perm[train_size:]

# 学習用データセット
train_iter = mx.io.NDArrayIter(iris_data[train_perm], iris_label[train_perm])
# 評価用データセット
test_iter = mx.io.NDArrayIter(iris_data[test_perm], iris_label[test_perm])

# ニューラルネットの構成定義
data = mx.sym.Variable('data')

net = mx.sym.FullyConnected(data = data, name = 'fc1', num_hidden = 5)
net = mx.sym.Activation(data = net, name = 'relu1', act_type = 'relu')

net = mx.sym.FullyConnected(data = net, name = 'fc2', num_hidden = 3)
net = mx.sym.SoftmaxOutput(data = net, name = 'softmax')

# モジュールの作成
mod = mx.mod.Module(net)

# 学習
mod.fit(train_iter, num_epoch = 20)

# 評価
res = mod.score(test_iter, mx.metric.Accuracy())

print(res)

mxnet/python の Docker コンテナを起動して実行します。

実行例
$ docker run -it --rm -v /vagrant/work:/work mxnet/python

・・・# cd /work
・・・# python iris_sample.py
[('accuracy', 0.9555555555555556)]

JanusGraph でグラフ操作 - Groovy

TinkerPop の API と互換性があり Cassandra 等をストレージとして使用できる JanusGraph というグラフデータベースがあります。

今回は、「TinkerPop でグラフ操作 - Groovy」 のサンプルコードを JanusGraph 用に変更してみます。

ソースは http://github.com/fits/try_samples/tree/master/blog/20170814/

はじめに

今回はグラフデータの保存先として Cassandra を使うため、Cassandra を事前に実行しておきます。

a. 設定ファイル

TinkerPop の org.apache.tinkerpop.gremlin.structure.util.GraphFactory を使用する場合、gremlin.graphorg.janusgraph.core.JanusGraphFactory を設定し、JanusGraph 用の設定(storage.backend 等)を加えれば JanusGraph で使えます。

Cassandra を使う場合は storage.backendcassandra を設定します。

setting.properties
gremlin.graph=org.janusgraph.core.JanusGraphFactory

storage.backend=cassandra
storage.hostname=127.0.0.1

b. グラフデータ作成

JanusGraph は TinkerPop API と互換性があるため 前回 の処理内容を特に変える必要はなく、@Grapes の依存ライブラリ構成を JanusGraph 用に変えるだけです。

add-data.groovy
// 前回との違いは @Grapes の内容のみ
@Grapes([
    @Grab('org.janusgraph:janusgraph-cassandra:0.1.1'),
    @Grab('org.slf4j:slf4j-simple:1.7.25'),
    @GrabExclude('xml-apis#xml-apis;1.3.04'),
    @GrabExclude('com.github.jeremyh#jBCrypt;jbcrypt-0.4'),
    @GrabExclude('org.slf4j#slf4j-log4j12'),
    @GrabExclude('ch.qos.logback#logback-classic'),
    @GrabExclude('org.codehaus.groovy:groovy-swing'),
    @GrabExclude('org.codehaus.groovy:groovy-xml'),
    @GrabExclude('org.codehaus.groovy:groovy-jsr223')
])
import org.apache.tinkerpop.gremlin.structure.util.GraphFactory

def conf = args[0]

def addNode = { g, type, id, name = id ->
    def res = g.addVertex(type)

    res.property('oid', id)
    res.property('name', name)

    res
}

def createData = { g -> 
    def p = addNode(g, 'Principals', 'principals')

    def u1 = addNode(g, 'User', 'user1')
    def u2 = addNode(g, 'User', 'user2')
    def ad = addNode(g, 'User', 'admin')

    def g1 = addNode(g, 'Group', 'group1')

    [u1, u2, ad, g1].each {
        it.addEdge('PART_OF', p)
    }

    u2.addEdge('PART_OF', g1)

    def r = addNode(g, 'Resources', 'resources')

    def s1 = addNode(g, 'Service', 'service1')

    def s2 = addNode(g, 'Service', 'service2')
    def s2o1 = addNode(g, 'Operation', 'service2.get', 'get')
    def s2o2 = addNode(g, 'Operation', 'service2.post', 'post')

    [s2o1, s2o2].each {
        s2.addEdge('METHOD', it)
    }

    [s1, s2].each {
        r.addEdge('RESOURCE', it)
    }

    u1.addEdge('PERMIT', s1)
    g1.addEdge('PERMIT', s2o2)
    ad.addEdge('PERMIT', r)
}

GraphFactory.open(conf).withAutoCloseable { g ->
    g.tx().withAutoCloseable { tx ->
        createData(g)

        tx.commit()
    }
}
実行結果
> groovy add-data.groovy setting.properties

・・・
[main] INFO com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor - AddHost: 127.0.0.1
[main] INFO org.janusgraph.diskstorage.Backend - Initiated backend operations thread pool of size 8
[main] INFO org.janusgraph.diskstorage.log.kcvs.KCVSLog - Loaded unidentified ReadMarker start time 2017-08-12T14:17:38.078Z into org.janusgraph.diskstorage.log.kcvs.KCVSLog$MessagePuller@3bdb2c78

c. 経路の探索

こちらも @Grapes の内容を変えるだけです。(確認のためログ出力するようにしています)

find-data.groovy
// 前回との違いは @Grapes の内容のみ
@Grapes([
    @Grab('org.janusgraph:janusgraph-cassandra:0.1.1'),
    //@Grab('org.slf4j:slf4j-nop:1.7.25'),
    @Grab('org.slf4j:slf4j-simple:1.7.25'),
    @GrabExclude('xml-apis#xml-apis;1.3.04'),
    @GrabExclude('com.github.jeremyh#jBCrypt;jbcrypt-0.4'),
    @GrabExclude('org.slf4j#slf4j-log4j12'),
    @GrabExclude('ch.qos.logback#logback-classic'),
    @GrabExclude('org.codehaus.groovy:groovy-swing'),
    @GrabExclude('org.codehaus.groovy:groovy-xml'),
    @GrabExclude('org.codehaus.groovy:groovy-jsr223')
])
import org.apache.tinkerpop.gremlin.structure.util.GraphFactory
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__

def conf = args[0]
def start = args[1]
def end = args[2]

def toStr = {
    "${it.label()}[${it.id()}]{${it.properties().join(', ')}}"
}

GraphFactory.open(conf).withAutoCloseable { g ->
    g.tx().withAutoCloseable {
        def p = g.traversal().V()
            .has('oid', start)
            .repeat(__.outE().as('e').inV())
            .until(__.has('oid', end))
            .where(__.select('e').unfold().hasLabel('PERMIT'))
            .path()

        p.each {
            println it.objects().collect(toStr).join(' -> ')
        }
    }
}
実行結果
> groovy find-data.groovy setting.properties user2 service2.post

・・・
[main] WARN org.janusgraph.graphdb.transaction.StandardJanusGraphTx - Query requires iterating over all vertices [(oid = user2)]. For better performance, use indexes
・・・
User[4304]{vp[oid->user2], vp[name->user2]} -> PART_OF[4d6-3bk-4r9-3ao]{} -> Group[4272]{vp[oid->group1], vp[name->group1]} -> PERMIT[6c6-3ao-9hx-37c]{} -> Operation[4152]{vp[oid->service2.post], vp[name->post]}

インデックスを使った方が性能的に望ましいとの警告ログが出力されました。

d. インデックスの作成

警告ログを解消するためにインデックスを使ってみます。

TinkerPop で(汎用的な)インデックス作成 API を見つけられなかったので、インデックス作成に関しては JanusGraph の専用 API を使う必要がありそうです。

JanusGraph のインデックス機能には以下の 2通りがあり、指定のプロパティ値を持つノードやエッジを見つける用途には (a) を使えば良さそうです。

  • (a) Graph Indexes
  • (b) Vertex-centric Indexes

そして、(a) では以下のようなインデックスを使えます。

名称 概要 生成メソッド名
Composite Index 外部のインデックスエンジンを使わずに高速 buildCompositeIndex
Mixed Index 外部のインデックスエンジン(Elasticsearch 等)を使って数値の範囲検索や全文検索などが可能 buildMixedIndex

今回は oid プロパティに対して (a) の Composite Index を作ってみます。

(a) Graph Indexes は JanusGraphManagementbuildIndex メソッドで作成します。 その戻り値 IndexBuilderbuildCompositeIndex メソッドで Composite Index となります。 (ユニークインデックスとする場合は unique メソッドを呼び出します)

ここで、インデックスを作成しただけでは、既存データのインデックス化は行われないようなので、インデックス作成後に既存データのインデックス化も行うようにしています。

create_index.groovy
@Grapes([
    @Grab('org.janusgraph:janusgraph-cassandra:0.1.1'),
    @Grab('org.slf4j:slf4j-simple:1.7.25'),
    @GrabExclude('xml-apis#xml-apis;1.3.04'),
    @GrabExclude('com.github.jeremyh#jBCrypt;jbcrypt-0.4'),
    @GrabExclude('org.slf4j#slf4j-log4j12'),
    @GrabExclude('ch.qos.logback#logback-classic'),
    @GrabExclude('org.codehaus.groovy:groovy-swing'),
    @GrabExclude('org.codehaus.groovy:groovy-xml'),
    @GrabExclude('org.codehaus.groovy:groovy-jsr223')
])
import org.janusgraph.core.JanusGraphFactory
import org.janusgraph.core.schema.SchemaAction
import org.apache.tinkerpop.gremlin.structure.Vertex

def conf = args[0]

def graph = JanusGraphFactory.open(conf)

graph.withAutoCloseable { g ->
    // JanusGraphManagement の取得
    def manage = g.openManagement()

    def oidKey = manage.getPropertyKey('oid')

    // インデックスが未作成の場合にインデックスを作成
    if (manage.getGraphIndex('oidIndex') == null) {
        // oid を対象とした Composite Index の作成
        manage.buildIndex('oidIndex', Vertex)
            .addKey(oidKey)
            .buildCompositeIndex()

        manage.commit()
        // インデックス作成の完了待ち
        manage.awaitGraphIndexStatus(g, 'oidIndex').call()
    }

    manage = g.openManagement()
    // 既存データのインデックス化
    manage.updateIndex(manage.getGraphIndex('oidIndex'), SchemaAction.REINDEX).get()

    manage.commit()
}
インデックス作成結果
> groovy create_index.groovy setting.properties

・・・
[pool-19-thread-1] INFO org.janusgraph.graphdb.database.management.ManagementSystem$UpdateStatusTrigger - Set status REGISTERED on schema element oidIndex with property keys []
[pool-19-thread-1] INFO org.janusgraph.graphdb.database.management.ManagementLogger - Received all acknowledgements for eviction [1]
・・・
[Thread-3] INFO com.netflix.astyanax.thrift.ThriftKeyspaceImpl - Detected partitioner org.apache.cassandra.dht.Murmur3Partitioner for keyspace janusgraph
[Thread-7] INFO org.janusgraph.graphdb.olap.job.IndexRepairJob - Found index oidIndex
[Thread-3] INFO org.janusgraph.graphdb.database.management.ManagementSystem - Index update job successful for [oidIndex]

これで警告ログは出力されなくなりました。

経路探索の実行結果
> groovy find-data.groovy setting.properties user2 service2.post

・・・
User[4304]{vp[oid->user2], vp[name->user2]} -> PART_OF[4d6-3bk-4r9-3ao]{} -> Group[4272]{vp[oid->group1], vp[name->group1]} -> PERMIT[6c6-3ao-9hx-37c]{} -> Operation[4152]{vp[oid->service2.post], vp[name->post]}

TinkerPop でグラフ操作 - Kotlin

前回 の処理を Kotlin で実装してみました。

ソースは http://github.com/fits/try_samples/tree/master/blog/20170724/

a. ビルド定義・設定ファイル

今回は Gradle のサブプロジェクトとして実行します。

build.gradle (Gradle ビルド定義ファイル)
buildscript {
    ext.kotlin_version = '1.1.3-2'

    repositories {
        jcenter()
    }

    dependencies {
        classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:${kotlin_version}"
    }
}
// サブプロジェクト共通設定
subprojects {
    apply plugin: 'kotlin'
    apply plugin: 'application'

    mainClassName = 'AppKt'

    sourceCompatibility = JavaVersion.VERSION_1_8

    compileKotlin {
        kotlinOptions.jvmTarget = '1.8'
    }

    repositories {
        jcenter()
    }

    dependencies {
        compile "org.jetbrains.kotlin:kotlin-stdlib-jre8:${kotlin_version}"
        compile 'org.apache.tinkerpop:neo4j-gremlin:3.2.5'

        runtime 'org.neo4j:neo4j-tinkerpop-api-impl:0.6-3.2.2'
        runtime 'org.slf4j:slf4j-nop:1.7.25'
    }

    run {
        if (project.hasProperty('args')) {
            args project.args.split(' ')
        }
    }
}

サブプロジェクトは以下の通り

settings.gradle (Gradle 設定ファイル)
include 'add-data'
include 'find-data'

TinkerPop の設定ファイルは基本的に 前回 と同じものを使いますが、Gradle のサブプロジェクトとして実行する関係でディレクトリのパスを変えています。

conf/setting.properties (TinkerPop 設定ファイル)
gremlin.graph=org.apache.tinkerpop.gremlin.neo4j.structure.Neo4jGraph
gremlin.neo4j.directory=../neo4jdb

b. グラフデータ作成

処理内容は 前回 と同じです。

add-data/src/main/kotlin/App.kt
import org.apache.tinkerpop.gremlin.structure.Graph
import org.apache.tinkerpop.gremlin.structure.Vertex
import org.apache.tinkerpop.gremlin.structure.util.GraphFactory

fun main(args: Array<String>) {
    val conf = args[0]

    GraphFactory.open(conf).use { g ->
        g.tx().use { tx ->
            createData(g)

            tx.commit()
        }
    }
}

fun createData(g: Graph) {
    val p = addNode(g, "Principals", "principals")

    val u1 = addNode(g, "User", "user1")
    val u2 = addNode(g, "User", "user2")
    val ad = addNode(g, "User", "admin")

    val g1 = addNode(g, "Group", "group1")

    listOf(u1, u2, ad, g1).forEach {
        it.addEdge("PART_OF", p)
    }

    u2.addEdge("PART_OF", g1)

    val r = addNode(g, "Resources", "resources")

    val s1 = addNode(g, "Service", "service1")

    val s2 = addNode(g, "Service", "service2")
    val s2o1 = addNode(g, "Operation", "service2.get", "get")
    val s2o2 = addNode(g, "Operation", "service2.post", "post")

    listOf(s2o1, s2o2).forEach {
        s2.addEdge("METHOD", it)
    }

    listOf(s1, s2).forEach {
        r.addEdge("RESOURCE", it)
    }

    u1.addEdge("PERMIT", s1)
    g1.addEdge("PERMIT", s2o2)
    ad.addEdge("PERMIT", r)
}

fun addNode(g: Graph, label: String, id: String, name: String = id): Vertex {
    val node = g.addVertex(label)

    node.property("oid", id)
    node.property("name", name)

    return node
}

add-data サブプロジェクトを実行するため :add-data:run とします。

実行
> gradle :add-data:run -q -Pargs="../conf/setting.properties"

c. 経路の探索

Kotlin では __as予約語なので、`as` のようにエスケープする必要があります。

また、Groovy と違って outEhas では型を指定する必要がありました。

find-data/src/main/kotlin/App.kt
import org.apache.tinkerpop.gremlin.structure.Vertex
import org.apache.tinkerpop.gremlin.structure.util.GraphFactory
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.`__`.*
import org.apache.tinkerpop.gremlin.structure.Element

fun main(args: Array<String>) {
    val conf = args[0]
    val start = args[1]
    val end = args[2]

    GraphFactory.open(conf).use { g ->
        g.tx().use {
            val p = g.traversal().V()
                    .has("oid", start)
                    .repeat(outE<Vertex>().`as`("e").inV())
                    .until(has<Vertex>("oid", end))
                    .where(select<Vertex, Vertex>("e").unfold<Vertex>().hasLabel("PERMIT"))
                    .path()

            p.forEach {
                println(it.objects().asSequence().map(::toStr).joinToString(" -> "))
            }
        }
    }
}

fun toStr(n: Any) = when(n) {
    is Element -> "${n.label()}[${n.id()}]{${n.properties<String>().asSequence().joinToString(", ")}}"
    else -> ""
}
実行結果
> gradle :find-data:run -q -Pargs="../conf/setting.properties user2 service2.post"

User[2]{vp[oid->user2], vp[name->user2]} -> PART_OF[4]{} -> Group[4]{vp[oid->group1], vp[name->group1]} -> PERMIT[10]{} -> Operation[9]{vp[oid->service2.post], vp[name->post]}

TinkerPop でグラフ操作 - Groovy

前回、Neo4j の Cypher を使って実施したグラフ操作を Apache TinkerPop を使って Groovy (@Grab を使用)で実装してみました。

Apache TinkerPop はグラフ処理のためのフレームワークで、Neo4j 等の様々なグラフ DB ※ に対して共通のインターフェースを提供します。

 ※ グラフ DB だけではなく、
    Cassandra、HBase、DynamoDB 等をサポートする
    ライブラリも提供されています

ソースは http://github.com/fits/try_samples/tree/master/blog/20170718/

a. 設定ファイル

TinkerPop にはインメモリーの TinkerGraph が用意されていますが、前回と同様に Neo4j を使う事にします。

ただ、前回と違って Neo4j をサーバー起動せずに組み込み実行します。

TinkerPop には Graph オブジェクトを汎用的に生成する手段として org.apache.tinkerpop.gremlin.structure.util.GraphFactory が用意されています。

GraphFactory.open(<設定ファイル>) を使えば、依存ライブラリと設定ファイルを差し替えて DB を切り替える事もできそうなので、今回はこの方法を使います。

Neo4j を組み込み利用する場合、gremlin.graphorg.apache.tinkerpop.gremlin.neo4j.structure.Neo4jGraph を設定して gremlin.neo4j.directory に DB ファイルを出力するディレクトリを指定します。 (Neo4j をサーバー起動した場合の data/databases/graph.db)

setting.properties
gremlin.graph=org.apache.tinkerpop.gremlin.neo4j.structure.Neo4jGraph
gremlin.neo4j.directory=neo4jdb

b. グラフデータ作成

前回 と同様のデータを作成する処理を実装してみます。

Groovy で実行する際の注意点として、neo4j-tinkerpop-api-impl 等は依存ライブラリとして Groovy 2.4.11 のライブラリを含んでおり、このバージョン以外の groovy コマンドで実行すると org.codehaus.groovy.control.MultipleCompilationErrorsException が発生してしまいます。

そこで今回は、Groovy 2.5.0 beta1 で実行できるように @GrabExclude を使って groovy-xml 等を除くようにしています。

グラフの操作は GraphFactory.open() で取得した Graph オブジェクトに対して実施します。

ノードの追加は addVertex、エッジの追加は addEdge メソッドで行う事ができ、property メソッドで任意の属性を設定できます。

トランザクションtx メソッドで開始します。※

 ※ TinkerGraph のように tx メソッドをサポートしていないものもありますので
    (その場合に tx() を呼び出すとエラーになる)
    実際は tx のサポート有無をチェックしてから
    呼び出すようにした方が安全だと思います

    (例)
        if (g.features().graph().supportsTransactions()) {
            g.tx().withAutoCloseable { t ->
                ・・・
            }
        }
add-data.groovy
@Grapes([
    @Grab('org.apache.tinkerpop:neo4j-gremlin:3.2.5'),
    @Grab('org.neo4j:neo4j-tinkerpop-api-impl:0.6-3.2.2'),
    @Grab('org.slf4j:slf4j-nop:1.7.25'),
    @GrabExclude('org.codehaus.groovy:groovy-xml'),
    @GrabExclude('org.codehaus.groovy:groovy-swing'),
    @GrabExclude('org.codehaus.groovy:groovy-jsr223')
])
import org.apache.tinkerpop.gremlin.structure.util.GraphFactory

def conf = args[0]

def addNode = { g, type, id, name = id ->
    def res = g.addVertex(type)

    res.property('oid', id)
    res.property('name', name)

    res
}

def createData = { g -> 
    def p = addNode(g, 'Principals', 'principals')

    def u1 = addNode(g, 'User', 'user1')
    def u2 = addNode(g, 'User', 'user2')
    def ad = addNode(g, 'User', 'admin')

    def g1 = addNode(g, 'Group', 'group1')

    [u1, u2, ad, g1].each {
        it.addEdge('PART_OF', p)
    }

    u2.addEdge('PART_OF', g1)

    def r = addNode(g, 'Resources', 'resources')

    def s1 = addNode(g, 'Service', 'service1')

    def s2 = addNode(g, 'Service', 'service2')
    def s2o1 = addNode(g, 'Operation', 'service2.get', 'get')
    def s2o2 = addNode(g, 'Operation', 'service2.post', 'post')

    [s2o1, s2o2].each {
        s2.addEdge('METHOD', it)
    }

    [s1, s2].each {
        r.addEdge('RESOURCE', it)
    }

    u1.addEdge('PERMIT', s1)
    g1.addEdge('PERMIT', s2o2)
    ad.addEdge('PERMIT', r)
}

GraphFactory.open(conf).withAutoCloseable { g ->
    g.tx().withAutoCloseable { tx ->
        createData(g)

        tx.commit()
    }
}

withAutoCloseableAutoCloseable のリソースをクローズするための Groovy の機能です(TinkerPop の API ではありません)

実行結果
> groovy add-data.groovy setting.properties

c. 経路の探索

前回 の経路探索の処理を TinkerPop の API で実装してみます。

traversal().V() でノードを対象とした GraphTraversal<Vertex,Vertex> を取得でき、has(<プロパティ名>, <プロパティ値>) 等を使ってノードの条件を指定できます。

(c-1) 複数エッジ(条件なし)

まず、エッジは気にせずに指定ノードから指定ノードまでのパス(経路)を取得する処理を実装してみます。

終点のノードまで複数のノードで繋がっている場合は repeat(・・・).until(<終点ノードの条件>) で探せます。

エッジの条件を指定しない場合は repeat(__.out()).until(・・・) で取得できます。(__ はクラス名です)

パスを取得するには path() を使います。 Path からは objects() でパスに含まれるノード(やエッジ)を List<Object> で取得できます。

なお、以下の処理でトランザクションは必要ありませんが、一応トランザクションを使っています。 (JanusGraph をマルチスレッドで使うケースではトランザクションが必要になったので)

find-data-simple.groovy
@Grapes([
    @Grab('org.apache.tinkerpop:neo4j-gremlin:3.2.5'),
    @Grab('org.neo4j:neo4j-tinkerpop-api-impl:0.6-3.2.2'),
    @Grab('org.slf4j:slf4j-nop:1.7.25'),
    @GrabExclude('org.codehaus.groovy:groovy-xml'),
    @GrabExclude('org.codehaus.groovy:groovy-swing'),
    @GrabExclude('org.codehaus.groovy:groovy-jsr223')
])
import org.apache.tinkerpop.gremlin.structure.util.GraphFactory
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__

def conf = args[0]
def start = args[1]
def end = args[2]

def toStr = {
    "${it.label()}[${it.id()}]{${it.properties().join(', ')}}"
}

GraphFactory.open(conf).withAutoCloseable { g ->
    g.tx().withAutoCloseable {
        def p = g.traversal().V()
            .has('oid', start) // 始点ノードの条件
            .repeat(__.out())
            .until(__.has('oid', end)) // 終点ノードの条件
            .path()

        p.each {
            println it.objects().collect(toStr).join(' -> ')
        }
    }
}
実行結果

user2 から service2.post へのパスを取得してみます。

> groovy find-data-simple.groovy setting.properties user2 service2.post

User[2]{vp[oid->user2], vp[name->user2]} -> Group[4]{vp[oid->group1], vp[name->group1]} -> Operation[9]{vp[oid->service2.post], vp[name->post]}

__.out() を使った事で、パスにはノードの情報だけが含まれエッジの情報を含んでいません。

(c-2) 複数エッジ(条件あり)

エッジの条件を指定するには repeat でエッジも指定します。

A ノードと B ノードが C エッジで繋がっている (A)-[C]->(B) のような状態で、A の __.outE() が C エッジで、C エッジの inV() が B ノードとなります。

そのため、repeat(__.outE().as('e').inV()) とすれば、複数の外向きエッジで繋がっているノードを検索する事ができます。

ここで as(<ステップラベル名>) を使って、該当するエッジにラベル名を付けておき、where での条件判定(PERMIT エッジを含むかどうか)に使います。

__.select('e') の結果は GraphTraversal<Vertex, ArrayList<Edge>> のようになるので __.select('e').hasLabel(・・・) とはできません。 (ClassCastException になります)

unfold() で GraphTraversal<Vertex, ArrayList> を GraphTraversal<Vertex, Edge> にして hasLabel を使えば、該当のラベルを持つエッジを含んでいるかどうかを条件判定できます。

find-data.groovy
・・・
GraphFactory.open(conf).withAutoCloseable { g ->
    g.tx().withAutoCloseable {
        def p = g.traversal().V()
            .has('oid', start) // 始点ノードの条件
            .repeat(__.outE().as('e').inV())
            .until(__.has('oid', end)) // 終点ノードの条件
            .where(__.select('e').unfold().hasLabel('PERMIT')) // PERMIT エッジを含んでいるかどうかの判定
            .path()

        p.each {
            println it.objects().collect(toStr).join(' -> ')
        }
    }
}
実行結果
> groovy find-data.groovy setting.properties user2 service2.post

User[2]{vp[oid->user2], vp[name->user2]} -> PART_OF[4]{} -> Group[4]{vp[oid->group1], vp[name->group1]} -> PERMIT[10]{} -> Operation[9]{vp[oid->service2.post], vp[name->post]}

__.outE() を使った事でエッジの情報もパスに含まれるようになりました。

アクセス制御リストをグラフDBで構築 - Neo4j

柔軟性のあるアクセス制御を考えた際に、アクセス制御リスト(ACL)を有向グラフで表現すればどうだろうかと思い、グラフDBの Neo4j を使って試してみました。

概要

アクセス制御リストを有向グラフで表現し、アクセスの許可・拒否を以下のように判定する事にします。

  • アクセスの主体からアクセス対象へのパス(経路)が存在すればそのアクセスを許可する

例えば、user1 が service1 へアクセスできる事(アクセス許可が与えられている)を以下のように表現する事にします。

(a)

f:id:fits:20170703224725p:plain

ただ、これだと自由度が高すぎると思うのでルールを加えます。

  • アクセスの主体となる全てのノードは Principals ノードへ所属させる
  • アクセスの対象となる全てのノードは Resources ノードへ所属させる
  • アクセスの主体同士と対象同士でエッジ(辺)の向きを調整

これに基づいてノードやエッジをいくつか追加してみた結果が以下です。

(b)

f:id:fits:20170703224809p:plain

admin ユーザーは Resources に属する全てのリソース(service1, service2, service2 の post や get)へアクセスでき、group1 へ所属する user2 は service2.post へアクセスできる事を表現しています。

Neo4j で検証

それでは実際に Neo4j を使って検証してみます。

Neo4j ではエッジ(辺)に該当する部分をリレーションシップと呼ぶようですが、以降もエッジと呼ぶことにします。

(1) Neo4j サーバー起動

まずは Neo4j のサーバーを起動しておきます。 Neo4j の bin ディレクトリに neo4j コマンドがあるので、これを使って起動します。

今回は neo4j console で Neo4j サーバーを起動しますが、neo4j start で実行する方法もあります。

Neo4j サーバー起動
> neo4j console

サーバーが起動した後、http://localhost:7474/ へ Web ブラウザでアクセスすると管理画面が表示されるので、初期パスワードに neo4j と入力して初回のログイン処理を行っておきます。

管理画面では Cypher と呼ばれるクエリ言語を使ってデータを操作できるようになっているので、これより Cypher でデータ操作していきます。

(2) グラフデータの作成

先ほどの (a) のグラフ(user1 と service1 ノードを PERMIT エッジで繋ぐ)は以下のような Cypher クエリで作成できます。

1. (a) のデータ作成 Cypher クエリ
CREATE (u:User{oid:"user1",name:"user1"})-[r:PERMIT]->(s:Service{oid:"service1",name:"service1"})
RETURN u,r,s

f:id:fits:20170703224725p:plain

CREATE でノードやエッジを作成でき、ノードの内容は (変数名:ラベル{プロパティ名:値, ・・・})、エッジの内容は [変数名:関連名{プロパティ名:値, ・・・}] のように指定できるようです。

A から B ノードへ向いたエッジを作成する際は (A)-[エッジ]->(B) のようにします

RETURN を使うと指定した変数の内容が返され、管理画面では RETURN の内容を SVG で表示してくれます。※

 ※ Cypher クエリ実行毎の左メニューの "Graph" を選択すると SVG のグラフ表示で、
    "Code" を選択すると Response の JSON を確認できます

ノードやエッジには一意の id が付与されますが、ノードを指定するのに不便なので、今回は oid という独自プロパティを設定するようにしました。

管理画面では name プロパティの値を表示するようなので name も設定しています。

次に、(a) の状態に対してノードやエッジを追加して (b) の状態にしていきます。

今度はいくつかのステップに分けてデータを追加します。

まずは principals, resources, admin を作成し、user2 と PART_OF エッジで繋いだ group1 を作成しました。

2. principals, resources, admin, user2, group1 の追加
CREATE (:Principals{name:"principals"}),(:Resources{name:"resources"})
CREATE (:User{oid:"admin",name:"admin"})
CREATE (:User{oid:"user2",name:"user2"})-[:PART_OF]->(:Group{oid:"group1",name:"group1"})

次は get と post というメソッドを持った service2 を追加します。 ここでは service2 を作った後で、METHOD エッジで繋がった get と post を作成するようにしてみました。

3. get と post を持つ service2 の追加
CREATE (s2:Service{oid:"service2",name:"service2"})
MERGE (s2)-[:METHOD]->(:Operation{oid:"service2.get",name:"get"})
MERGE (s2)-[:METHOD]->(:Operation{oid:"service2.post",name:"post"})

次に PERMIT エッジを作成していくつかのノードを繋ぎます。

4. group1 から service2.post への PERMIT を設定
MATCH (g1:Group{oid:"group1"}),(m1:Operation{oid:"service2.post"})
MERGE (g1)-[:PERMIT]->(m1)
5. admin から resources への PERMIT を設定
MATCH (au:User{oid:"admin"}),(r:Resources)
MERGE (au)-[:PERMIT]->(r)

これまでに作成した全ての User と Group が principals へ所属するように、全ての Service が resources へ所属するようにエッジで繋ぎます。

6. principals と resources へのエッジをそれぞれ設定
MATCH (u:User),(g:Group),(s:Service),(p:Principals),(r:Resources)
MERGE (u)-[:PART_OF]->(p)
MERGE (g)-[:PART_OF]->(p)
MERGE (r)-[:RESOURCE]->(s)

最後に全てのデータを表示します。

全表示

MATCH (a) RETURN a

f:id:fits:20170703224809p:plain

(3) アクセス許可の判定(経路の探索)

このデータに対して、どのような Cypher クエリを使って経路探索(= アクセス許可の確認)すればよいのか検証していきます。

単独エッジ

まずは、単純に user1 から service1 へ PERMIT で繋がっているかどうかは、以下のクエリで確認できます。

user1 から service1 へ PERMIT で直接繋がっているものを取得
MATCH (u:User{oid:"user1"})-[:PERMIT]->(s:Service{oid:"service1"})
RETURN u,s

f:id:fits:20170703224957p:plain

このクエリでは PERMIT エッジで直接繋がっているケースにしか対応できないので汎用的には使えません。

複数エッジ(条件なし)

次に、PERMIT で繋がっているかどうかは気にしないで、user2 から service2.post へのパス(経路)を取得してみます。

エッジの条件を [*] とする事で、複数のエッジで繋がっているパスを取得できます。

また、MATCH で指定したパスを変数(以下の path)へ設定して、これを RETURN するようにすればパスの内容を取得できます。

user2 から service2.post へのパスを取得
MATCH path=(:User{oid:"user2"})-[*]->(:Operation{oid:"service2.post"})
RETURN path

f:id:fits:20170703225057p:plain

複数エッジ(条件あり)

上記のクエリに対して、PERMIT のエッジを少なくとも 1つ含むという条件を加えるには whereany が使えます。

relationships(変数) を使って変数の中からエッジのみを抽出する事が可能です。 (nodes(変数) を使えばノードだけを抽出できる)

type(変数) を使えばラベル名を取得できますので、これを使えば PERMIT かどうかの判定が可能です。

user2 から service2.post へのパス(PERMIT を含む)を取得
MATCH path=(:User{oid:"user2"})-[*]->(:Operation{oid:"service2.post"})
WHERE any(r IN relationships(path) WHERE type(r) = "PERMIT")
RETURN path

f:id:fits:20170703225057p:plain

このクエリなら汎用的に使えそうです。

ノードのラベル(今回の User, Service 等)を特定する必要が無ければ、以下のようにプロパティの条件だけを指定する事も可能です。

admin から service1 へのパス(PERMIT を含む)を取得
MATCH path=({oid:"admin"})-[*]->({oid:"service1"})
WHERE any(r IN relationships(path) WHERE type(r) = "PERMIT")
RETURN path

f:id:fits:20170703225200p:plain

user1 と service1 もこの方法で取得できます。

user1 から service1 へのパス(PERMIT を含む)を取得
MATCH path=(:User{oid:"user1"})-[*]->({oid:"service1"})
WHERE any(r IN relationships(path) WHERE type(r) = "PERMIT")
RETURN path

f:id:fits:20170703225231p:plain

最後に、パスが繋がっていない場合も確認しておきます。 当然ながら結果は空となりました。

user2 から service1 へのパス(PERMIT を含む)を取得
MATCH path=(u:User{oid:"user2"})-[*]->({oid:"service1"})
WHERE any(r IN relationships(path) WHERE type(r) = "PERMIT")
RETURN path

f:id:fits:20170703225256p:plain

備考

relationships を使う以外に [rs *] のように変数を使う方法でも同じ結果が得られます。

user2 から service2.post へのパス(PERMIT を含む)を取得 - deprecated
MATCH path=(:User{oid:"user2"})-[rs *]->(:Operation{oid:"service2.post"})
WHERE any(r IN rs WHERE type(r) = "PERMIT")
RETURN path

こちらの方がシンプルなように思うのですが、このクエリでは Binding relationships to a list in a variable length pattern is deprecated と警告されましたので、使わない方が無難かもしれません。