TinkerPop でグラフ操作 - Groovy

前回、Neo4j の Cypher を使って実施したグラフ操作を Apache TinkerPop を使って Groovy (@Grab を使用)で実装してみました。

Apache TinkerPop はグラフ処理のためのフレームワークで、Neo4j 等の様々なグラフ DB ※ に対して共通のインターフェースを提供します。

 ※ グラフ DB だけではなく、
    Cassandra、HBase、DynamoDB 等をサポートする
    ライブラリも提供されています

ソースは http://github.com/fits/try_samples/tree/master/blog/20170718/

a. 設定ファイル

TinkerPop にはインメモリーの TinkerGraph が用意されていますが、前回と同様に Neo4j を使う事にします。

ただ、前回と違って Neo4j をサーバー起動せずに組み込み実行します。

TinkerPop には Graph オブジェクトを汎用的に生成する手段として org.apache.tinkerpop.gremlin.structure.util.GraphFactory が用意されています。

GraphFactory.open(<設定ファイル>) を使えば、依存ライブラリと設定ファイルを差し替えて DB を切り替える事もできそうなので、今回はこの方法を使います。

Neo4j を組み込み利用する場合、gremlin.graphorg.apache.tinkerpop.gremlin.neo4j.structure.Neo4jGraph を設定して gremlin.neo4j.directory に DB ファイルを出力するディレクトリを指定します。 (Neo4j をサーバー起動した場合の data/databases/graph.db)

setting.properties
gremlin.graph=org.apache.tinkerpop.gremlin.neo4j.structure.Neo4jGraph
gremlin.neo4j.directory=neo4jdb

b. グラフデータ作成

前回 と同様のデータを作成する処理を実装してみます。

Groovy で実行する際の注意点として、neo4j-tinkerpop-api-impl 等は依存ライブラリとして Groovy 2.4.11 のライブラリを含んでおり、このバージョン以外の groovy コマンドで実行すると org.codehaus.groovy.control.MultipleCompilationErrorsException が発生してしまいます。

そこで今回は、Groovy 2.5.0 beta1 で実行できるように @GrabExclude を使って groovy-xml 等を除くようにしています。

グラフの操作は GraphFactory.open() で取得した Graph オブジェクトに対して実施します。

ノードの追加は addVertex、エッジの追加は addEdge メソッドで行う事ができ、property メソッドで任意の属性を設定できます。

トランザクションtx メソッドで開始します。※

 ※ TinkerGraph のように tx メソッドをサポートしていないものもありますので
    (その場合に tx() を呼び出すとエラーになる)
    実際は tx のサポート有無をチェックしてから
    呼び出すようにした方が安全だと思います

    (例)
        if (g.features().graph().supportsTransactions()) {
            g.tx().withAutoCloseable { t ->
                ・・・
            }
        }
add-data.groovy
@Grapes([
    @Grab('org.apache.tinkerpop:neo4j-gremlin:3.2.5'),
    @Grab('org.neo4j:neo4j-tinkerpop-api-impl:0.6-3.2.2'),
    @Grab('org.slf4j:slf4j-nop:1.7.25'),
    @GrabExclude('org.codehaus.groovy:groovy-xml'),
    @GrabExclude('org.codehaus.groovy:groovy-swing'),
    @GrabExclude('org.codehaus.groovy:groovy-jsr223')
])
import org.apache.tinkerpop.gremlin.structure.util.GraphFactory

def conf = args[0]

def addNode = { g, type, id, name = id ->
    def res = g.addVertex(type)

    res.property('oid', id)
    res.property('name', name)

    res
}

def createData = { g -> 
    def p = addNode(g, 'Principals', 'principals')

    def u1 = addNode(g, 'User', 'user1')
    def u2 = addNode(g, 'User', 'user2')
    def ad = addNode(g, 'User', 'admin')

    def g1 = addNode(g, 'Group', 'group1')

    [u1, u2, ad, g1].each {
        it.addEdge('PART_OF', p)
    }

    u2.addEdge('PART_OF', g1)

    def r = addNode(g, 'Resources', 'resources')

    def s1 = addNode(g, 'Service', 'service1')

    def s2 = addNode(g, 'Service', 'service2')
    def s2o1 = addNode(g, 'Operation', 'service2.get', 'get')
    def s2o2 = addNode(g, 'Operation', 'service2.post', 'post')

    [s2o1, s2o2].each {
        s2.addEdge('METHOD', it)
    }

    [s1, s2].each {
        r.addEdge('RESOURCE', it)
    }

    u1.addEdge('PERMIT', s1)
    g1.addEdge('PERMIT', s2o2)
    ad.addEdge('PERMIT', r)
}

GraphFactory.open(conf).withAutoCloseable { g ->
    g.tx().withAutoCloseable { tx ->
        createData(g)

        tx.commit()
    }
}

withAutoCloseableAutoCloseable のリソースをクローズするための Groovy の機能です(TinkerPop の API ではありません)

実行結果
> groovy add-data.groovy setting.properties

c. 経路の探索

前回 の経路探索の処理を TinkerPop の API で実装してみます。

traversal().V() でノードを対象とした GraphTraversal<Vertex,Vertex> を取得でき、has(<プロパティ名>, <プロパティ値>) 等を使ってノードの条件を指定できます。

(c-1) 複数エッジ(条件なし)

まず、エッジは気にせずに指定ノードから指定ノードまでのパス(経路)を取得する処理を実装してみます。

終点のノードまで複数のノードで繋がっている場合は repeat(・・・).until(<終点ノードの条件>) で探せます。

エッジの条件を指定しない場合は repeat(__.out()).until(・・・) で取得できます。(__ はクラス名です)

パスを取得するには path() を使います。 Path からは objects() でパスに含まれるノード(やエッジ)を List<Object> で取得できます。

なお、以下の処理でトランザクションは必要ありませんが、一応トランザクションを使っています。 (JanusGraph をマルチスレッドで使うケースではトランザクションが必要になったので)

find-data-simple.groovy
@Grapes([
    @Grab('org.apache.tinkerpop:neo4j-gremlin:3.2.5'),
    @Grab('org.neo4j:neo4j-tinkerpop-api-impl:0.6-3.2.2'),
    @Grab('org.slf4j:slf4j-nop:1.7.25'),
    @GrabExclude('org.codehaus.groovy:groovy-xml'),
    @GrabExclude('org.codehaus.groovy:groovy-swing'),
    @GrabExclude('org.codehaus.groovy:groovy-jsr223')
])
import org.apache.tinkerpop.gremlin.structure.util.GraphFactory
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__

def conf = args[0]
def start = args[1]
def end = args[2]

def toStr = {
    "${it.label()}[${it.id()}]{${it.properties().join(', ')}}"
}

GraphFactory.open(conf).withAutoCloseable { g ->
    g.tx().withAutoCloseable {
        def p = g.traversal().V()
            .has('oid', start) // 始点ノードの条件
            .repeat(__.out())
            .until(__.has('oid', end)) // 終点ノードの条件
            .path()

        p.each {
            println it.objects().collect(toStr).join(' -> ')
        }
    }
}
実行結果

user2 から service2.post へのパスを取得してみます。

> groovy find-data-simple.groovy setting.properties user2 service2.post

User[2]{vp[oid->user2], vp[name->user2]} -> Group[4]{vp[oid->group1], vp[name->group1]} -> Operation[9]{vp[oid->service2.post], vp[name->post]}

__.out() を使った事で、パスにはノードの情報だけが含まれエッジの情報を含んでいません。

(c-2) 複数エッジ(条件あり)

エッジの条件を指定するには repeat でエッジも指定します。

A ノードと B ノードが C エッジで繋がっている (A)-[C]->(B) のような状態で、A の __.outE() が C エッジで、C エッジの inV() が B ノードとなります。

そのため、repeat(__.outE().as('e').inV()) とすれば、複数の外向きエッジで繋がっているノードを検索する事ができます。

ここで as(<ステップラベル名>) を使って、該当するエッジにラベル名を付けておき、where での条件判定(PERMIT エッジを含むかどうか)に使います。

__.select('e') の結果は GraphTraversal<Vertex, ArrayList<Edge>> のようになるので __.select('e').hasLabel(・・・) とはできません。 (ClassCastException になります)

unfold() で GraphTraversal<Vertex, ArrayList> を GraphTraversal<Vertex, Edge> にして hasLabel を使えば、該当のラベルを持つエッジを含んでいるかどうかを条件判定できます。

find-data.groovy
・・・
GraphFactory.open(conf).withAutoCloseable { g ->
    g.tx().withAutoCloseable {
        def p = g.traversal().V()
            .has('oid', start) // 始点ノードの条件
            .repeat(__.outE().as('e').inV())
            .until(__.has('oid', end)) // 終点ノードの条件
            .where(__.select('e').unfold().hasLabel('PERMIT')) // PERMIT エッジを含んでいるかどうかの判定
            .path()

        p.each {
            println it.objects().collect(toStr).join(' -> ')
        }
    }
}
実行結果
> groovy find-data.groovy setting.properties user2 service2.post

User[2]{vp[oid->user2], vp[name->user2]} -> PART_OF[4]{} -> Group[4]{vp[oid->group1], vp[name->group1]} -> PERMIT[10]{} -> Operation[9]{vp[oid->service2.post], vp[name->post]}

__.outE() を使った事でエッジの情報もパスに含まれるようになりました。

アクセス制御リストをグラフDBで構築 - Neo4j

柔軟性のあるアクセス制御を考えた際に、アクセス制御リスト(ACL)を有向グラフで表現すればどうだろうかと思い、グラフDBの Neo4j を使って試してみました。

概要

アクセス制御リストを有向グラフで表現し、アクセスの許可・拒否を以下のように判定する事にします。

  • アクセスの主体からアクセス対象へのパス(経路)が存在すればそのアクセスを許可する

例えば、user1 が service1 へアクセスできる事(アクセス許可が与えられている)を以下のように表現する事にします。

(a)

f:id:fits:20170703224725p:plain

ただ、これだと自由度が高すぎると思うのでルールを加えます。

  • アクセスの主体となる全てのノードは Principals ノードへ所属させる
  • アクセスの対象となる全てのノードは Resources ノードへ所属させる
  • アクセスの主体同士と対象同士でエッジ(辺)の向きを調整

これに基づいてノードやエッジをいくつか追加してみた結果が以下です。

(b)

f:id:fits:20170703224809p:plain

admin ユーザーは Resources に属する全てのリソース(service1, service2, service2 の post や get)へアクセスでき、group1 へ所属する user2 は service2.post へアクセスできる事を表現しています。

Neo4j で検証

それでは実際に Neo4j を使って検証してみます。

Neo4j ではエッジ(辺)に該当する部分をリレーションシップと呼ぶようですが、以降もエッジと呼ぶことにします。

(1) Neo4j サーバー起動

まずは Neo4j のサーバーを起動しておきます。 Neo4j の bin ディレクトリに neo4j コマンドがあるので、これを使って起動します。

今回は neo4j console で Neo4j サーバーを起動しますが、neo4j start で実行する方法もあります。

Neo4j サーバー起動
> neo4j console

サーバーが起動した後、http://localhost:7474/ へ Web ブラウザでアクセスすると管理画面が表示されるので、初期パスワードに neo4j と入力して初回のログイン処理を行っておきます。

管理画面では Cypher と呼ばれるクエリ言語を使ってデータを操作できるようになっているので、これより Cypher でデータ操作していきます。

(2) グラフデータの作成

先ほどの (a) のグラフ(user1 と service1 ノードを PERMIT エッジで繋ぐ)は以下のような Cypher クエリで作成できます。

1. (a) のデータ作成 Cypher クエリ
CREATE (u:User{oid:"user1",name:"user1"})-[r:PERMIT]->(s:Service{oid:"service1",name:"service1"})
RETURN u,r,s

f:id:fits:20170703224725p:plain

CREATE でノードやエッジを作成でき、ノードの内容は (変数名:ラベル{プロパティ名:値, ・・・})、エッジの内容は [変数名:関連名{プロパティ名:値, ・・・}] のように指定できるようです。

A から B ノードへ向いたエッジを作成する際は (A)-[エッジ]->(B) のようにします

RETURN を使うと指定した変数の内容が返され、管理画面では RETURN の内容を SVG で表示してくれます。※

 ※ Cypher クエリ実行毎の左メニューの "Graph" を選択すると SVG のグラフ表示で、
    "Code" を選択すると Response の JSON を確認できます

ノードやエッジには一意の id が付与されますが、ノードを指定するのに不便なので、今回は oid という独自プロパティを設定するようにしました。

管理画面では name プロパティの値を表示するようなので name も設定しています。

次に、(a) の状態に対してノードやエッジを追加して (b) の状態にしていきます。

今度はいくつかのステップに分けてデータを追加します。

まずは principals, resources, admin を作成し、user2 と PART_OF エッジで繋いだ group1 を作成しました。

2. principals, resources, admin, user2, group1 の追加
CREATE (:Principals{name:"principals"}),(:Resources{name:"resources"})
CREATE (:User{oid:"admin",name:"admin"})
CREATE (:User{oid:"user2",name:"user2"})-[:PART_OF]->(:Group{oid:"group1",name:"group1"})

次は get と post というメソッドを持った service2 を追加します。 ここでは service2 を作った後で、METHOD エッジで繋がった get と post を作成するようにしてみました。

3. get と post を持つ service2 の追加
CREATE (s2:Service{oid:"service2",name:"service2"})
MERGE (s2)-[:METHOD]->(:Operation{oid:"service2.get",name:"get"})
MERGE (s2)-[:METHOD]->(:Operation{oid:"service2.post",name:"post"})

次に PERMIT エッジを作成していくつかのノードを繋ぎます。

4. group1 から service2.post への PERMIT を設定
MATCH (g1:Group{oid:"group1"}),(m1:Operation{oid:"service2.post"})
MERGE (g1)-[:PERMIT]->(m1)
5. admin から resources への PERMIT を設定
MATCH (au:User{oid:"admin"}),(r:Resources)
MERGE (au)-[:PERMIT]->(r)

これまでに作成した全ての User と Group が principals へ所属するように、全ての Service が resources へ所属するようにエッジで繋ぎます。

6. principals と resources へのエッジをそれぞれ設定
MATCH (u:User),(g:Group),(s:Service),(p:Principals),(r:Resources)
MERGE (u)-[:PART_OF]->(p)
MERGE (g)-[:PART_OF]->(p)
MERGE (r)-[:RESOURCE]->(s)

最後に全てのデータを表示します。

全表示

MATCH (a) RETURN a

f:id:fits:20170703224809p:plain

(3) アクセス許可の判定(経路の探索)

このデータに対して、どのような Cypher クエリを使って経路探索(= アクセス許可の確認)すればよいのか検証していきます。

単独エッジ

まずは、単純に user1 から service1 へ PERMIT で繋がっているかどうかは、以下のクエリで確認できます。

user1 から service1 へ PERMIT で直接繋がっているものを取得
MATCH (u:User{oid:"user1"})-[:PERMIT]->(s:Service{oid:"service1"})
RETURN u,s

f:id:fits:20170703224957p:plain

このクエリでは PERMIT エッジで直接繋がっているケースにしか対応できないので汎用的には使えません。

複数エッジ(条件なし)

次に、PERMIT で繋がっているかどうかは気にしないで、user2 から service2.post へのパス(経路)を取得してみます。

エッジの条件を [*] とする事で、複数のエッジで繋がっているパスを取得できます。

また、MATCH で指定したパスを変数(以下の path)へ設定して、これを RETURN するようにすればパスの内容を取得できます。

user2 から service2.post へのパスを取得
MATCH path=(:User{oid:"user2"})-[*]->(:Operation{oid:"service2.post"})
RETURN path

f:id:fits:20170703225057p:plain

複数エッジ(条件あり)

上記のクエリに対して、PERMIT のエッジを少なくとも 1つ含むという条件を加えるには whereany が使えます。

relationships(変数) を使って変数の中からエッジのみを抽出する事が可能です。 (nodes(変数) を使えばノードだけを抽出できる)

type(変数) を使えばラベル名を取得できますので、これを使えば PERMIT かどうかの判定が可能です。

user2 から service2.post へのパス(PERMIT を含む)を取得
MATCH path=(:User{oid:"user2"})-[*]->(:Operation{oid:"service2.post"})
WHERE any(r IN relationships(path) WHERE type(r) = "PERMIT")
RETURN path

f:id:fits:20170703225057p:plain

このクエリなら汎用的に使えそうです。

ノードのラベル(今回の User, Service 等)を特定する必要が無ければ、以下のようにプロパティの条件だけを指定する事も可能です。

admin から service1 へのパス(PERMIT を含む)を取得
MATCH path=({oid:"admin"})-[*]->({oid:"service1"})
WHERE any(r IN relationships(path) WHERE type(r) = "PERMIT")
RETURN path

f:id:fits:20170703225200p:plain

user1 と service1 もこの方法で取得できます。

user1 から service1 へのパス(PERMIT を含む)を取得
MATCH path=(:User{oid:"user1"})-[*]->({oid:"service1"})
WHERE any(r IN relationships(path) WHERE type(r) = "PERMIT")
RETURN path

f:id:fits:20170703225231p:plain

最後に、パスが繋がっていない場合も確認しておきます。 当然ながら結果は空となりました。

user2 から service1 へのパス(PERMIT を含む)を取得
MATCH path=(u:User{oid:"user2"})-[*]->({oid:"service1"})
WHERE any(r IN relationships(path) WHERE type(r) = "PERMIT")
RETURN path

f:id:fits:20170703225256p:plain

備考

relationships を使う以外に [rs *] のように変数を使う方法でも同じ結果が得られます。

user2 から service2.post へのパス(PERMIT を含む)を取得 - deprecated
MATCH path=(:User{oid:"user2"})-[rs *]->(:Operation{oid:"service2.post"})
WHERE any(r IN rs WHERE type(r) = "PERMIT")
RETURN path

こちらの方がシンプルなように思うのですが、このクエリでは Binding relationships to a list in a variable length pattern is deprecated と警告されましたので、使わない方が無難かもしれません。

Docker で F# アプリケーションを作成

Microsoft による .NET Core の公式 Docker イメージを使って、単純な F# のコンソールアプリケーションを作成してみました。

今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170619/

(1) Docker コンテナの実行

まずは .NET Core の Docker コンテナを実行しておきます。

.NET Core コンテナの実行
$ docker run -it --rm microsoft/dotnet /bin/bash
root@・・・:/# 

現時点では 1.1.2-sdk が latest となっており、デフォルトで netcoreapp1.1 のアプリケーションを作成する事になります。

(2) F# コンソールプロジェクトの作成

.NET Core SDK では dotnet コマンド (.NET Command Line Tools) を使ってアプリケーションを開発できます。

dotnet new コマンドで様々なプロジェクトの雛型を作成でき、デフォルトでは以下のようなプロジェクトに対応しています。

プロジェクトの雛型一覧
root@・・・# dotnet new -all

・・・
Templates                 Short Name       Language      Tags
-----------------------------------------------------------------------
Console Application       console          [C#], F#      Common/Console
Class library             classlib         [C#], F#      Common/Library
Unit Test Project         mstest           [C#], F#      Test/MSTest
xUnit Test Project        xunit            [C#], F#      Test/xUnit
ASP.NET Core Empty        web              [C#]          Web/Empty
ASP.NET Core Web App      mvc              [C#], F#      Web/MVC
ASP.NET Core Web API      webapi           [C#]          Web/WebAPI
Nuget Config              nugetconfig                    Config
Web Config                webconfig                      Config
Solution File             sln                            Solution
・・・

今回は F# のコンソールアプリケーションを作成します。

適当なディレクトリを作成・移動し、dotnet new console を実行します。 F# の場合は -lang オプションで指定します。 (デフォルトは C#

F# コンソールプロジェクトの作成
root@・・・# mkdir sample
root@・・・# cd sample
root@・・・# dotnet new console -lang F#

以下のファイルが生成されました。

Program.fs
open System

[<EntryPoint>]
let main argv =
    printfn "Hello World from F#!"
    0 // return an integer exit code
sample.fsproj
<Project Sdk="FSharp.NET.Sdk;Microsoft.NET.Sdk">
  <PropertyGroup>
    <OutputType>Exe</OutputType>
    <TargetFramework>netcoreapp1.1</TargetFramework>
  </PropertyGroup>
  <ItemGroup>
    <Compile Include="Program.fs" />
  </ItemGroup>
  <ItemGroup>
    <PackageReference Include="FSharp.Core" Version="4.1.*" />
    <PackageReference Include="FSharp.NET.Sdk" Version="1.0.*" PrivateAssets="All" />
  </ItemGroup>
</Project>

(3) ビルドと実行

とりあえず、このままビルドして実行してみます。

まずは dotnet restore を実行してプロジェクトの依存関係を復元します。

これにより依存関係などが定義された obj/project.assets.json ファイルが作られます。

リストア
root@・・・# dotnet restore

・・・
  Installing FSharp.Compiler.Tools 4.1.17.
  Installing FSharp.NET.Sdk 1.0.5.
  Installing FSharp.Core 4.1.17.
  ・・・
  Installed:
      3 package(s) to /sample/sample.fsproj

ビルドは dotnet build で行います。 なお、dotnet build しなくても dotnet run すればビルドも実施してくれるようです。

ビルド
root@・・・# dotnet build

・・・
Build succeeded.
    0 Warning(s)
    0 Error(s)

dotnet run で実行します。

実行
root@・・・# dotnet run

Hello World from F#!

(4) Chiron パッケージの追加

次に、Chiron という F# で JSON を処理するためのパッケージを使ってみます。

依存パッケージは dotnet add package <パッケージ名> で追加できますが、(現時点で)デフォルトの Chiron 6.2.1 を追加しようとすると以下のようにエラーとなりました。

Chiron パッケージ(6.2.1)追加 - 失敗
root@・・・# dotnet add package chiron

・・・
log  : Installing Chiron 6.2.1.
error: Package Chiron 6.2.1 is not compatible with netcoreapp1.1 (.NETCoreApp,Version=v1.1). Package Chiron 6.2.1 supports:
error:   - net40 (.NETFramework,Version=v4.0)
error:   - portable-net45+win8+wp8+wpa81 (.NETPortable,Version=v0.0,Profile=Profile259)
・・・
error: One or more packages are incompatible with .NETCoreApp,Version=v1.1.
error: Package 'chiron' is incompatible with 'all' frameworks in project '/sample/sample.fsproj'.

netcoreapp1.1 (.fsproj の TargetFramework で設定) を Chiron 6.2.1 がサポートしてない事が原因のようです。

nuget の Chiron のページ を見てみると .NETStandard 1.6 に対応したバージョン 7.0.0-alpha-170410 がありました。

netcoreapp1.1 は .NETStandard 1.6 をサポートしているので、このバージョンなら使えそうです。

add package では -v オプションでバージョンを指定できるので、以下のようにバージョンを指定して Chiron を追加します。

Chiron パッケージ(7.0.0-alpha-170410)追加 - 成功
root@・・・# dotnet add package chiron -v 7.0.0-alpha-170410

・・・
log  : Installing FParsecCS 1.0.3-alpha-170404.
log  : Installing FParsec 1.0.3-alpha-170404.
log  : Installing Chiron 7.0.0-alpha-170410.
info : Package 'chiron' is compatible with all the specified frameworks in project '/sample/sample.fsproj'.
info : PackageReference for package 'chiron' version '7.0.0-alpha-170410' added to file '/sample/sample.fsproj'.

今度は成功し、.fsproj は以下のようになりました。

sample.fsproj (Chiron 追加後)
<Project Sdk="FSharp.NET.Sdk;Microsoft.NET.Sdk">
  <PropertyGroup>
    <OutputType>Exe</OutputType>
    <TargetFramework>netcoreapp1.1</TargetFramework>
  </PropertyGroup>
  <ItemGroup>
    <Compile Include="Program.fs" />
  </ItemGroup>
  <ItemGroup>
    <PackageReference Include="chiron" Version="7.0.0-alpha-170410" />
    <PackageReference Include="FSharp.Core" Version="4.1.*" />
    <PackageReference Include="FSharp.NET.Sdk" Version="1.0.*" PrivateAssets="All" />
  </ItemGroup>
</Project>

(5) Chiron サンプルアプリケーションの実装

動作確認のため Chiron を使った単純な処理を実装してみました。 (Chiron 7 ではモジュール構成等が変わっているようでした)

Program.fs
open System
open Chiron.Inference

[<EntryPoint>]
let main argv =
    let d = Map ["a", 123]
    let res = Json.serialize d

    printfn "json = %s" res
    0

(6) Chiron サンプルアプリケーションのビルドと実行

(3) と同様にリストアしてから実行します。

リストア
root@・・・# dotnet restore

  Restoring packages for /sample/sample.fsproj...
  ・・・
ビルド
root@・・・# dotnet build

・・・
Build succeeded.
    0 Warning(s)
    0 Error(s)
実行
root@・・・# dotnet run

json = {"a":123}

Docker で Haskell アプリケーションを作成 - MongoDB 利用

MongoDB へ接続する Haskell アプリケーションを Docker で作成してみました。

以下の Docker イメージを使用します。

ビルドツールは stack を使って、MongoDB への接続には以下のライブラリを使います。

今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170529/

(1) Docker コンテナの実行

Docker で MongoDB と Haskell のコンテナを実行します。

コンテナ間の連携には、deprecated となっている Link 機能(–link)は使わずにユーザー定義ネットワークを使う事にします。

(1.1) bridge ネットワークの作成

bridge ネットワークを新規作成します。

$ docker network create --driver bridge br1

(1.2) MongoDB コンテナの実行

作成したネットワークへ参加するように --net=<ネットワーク名> を使って MongoDB を docker run します。

$ docker run -d --name mongo1 --net=br1 mongo

(1.3) Haskell コンテナの実行

Haskell も同様に docker run します。

ここでは Docker ホスト側の /vagrant/work をコンテナの /work へマウントしています。

$ docker run -it --name hs1 --net=br1 -v /vagrant/work:/work haskell /bin/bash
root@・・・# 

確認のため Haskell コンテナ内から MongoDB のコンテナへ ping してみます。

ping
root@・・・# ping mongo1
PING mongo1 (172.18.0.2): 56 data bytes
64 bytes from 172.18.0.2: icmp_seq=0 ttl=64 time=0.640 ms
64 bytes from 172.18.0.2: icmp_seq=1 ttl=64 time=0.257 ms
・・・

(2) Haskell アプリケーションの作成

Haskell のコンテナで MongoDB へ接続するアプリケーションを作成します。 (以降の処理は (1.3) で起動した Haskell コンテナ内で実施します)

(2.1) プロジェクトのひな型作成

stack new <プロジェクト名> [<テンプレート名>] でプロジェクトのひな型を作ります。

プロジェクト作成
root@・・・# cd /work

root@・・・# stack new sample1
Downloading template "new-template" to create project "sample1" in sample1/ ...
・・・

プロジェクト名のディレクトリが作られ、各種ファイルが生成されます。

(2.2) 依存ライブラリの取得

MongoDB driver for Haskell を使用するため、<プロジェクト名>.cabal ファイルの library/build-depends へ mongoDB を追記します。

今回は Control.Monad.Trans も使うので mtl も追記しています。

sample1/sample1.cabal
・・・
library
  ・・・
  build-depends:       base >= 4.7 && < 5
                     , mongoDB
                     , mtl
  default-language:    Haskell2010
・・・

stack build でビルドすると未取得の依存ライブラリをダウンロードします。

プロジェクトのビルド(依存ライブラリの取得)
root@・・・# cd sample1

root@・・・# stack build --allow-different-user
・・・
mtl-2.2.1: download
mtl-2.2.1: configure
mtl-2.2.1: build
mtl-2.2.1: copy/register
・・・

ここで --allow-different-user オプションを付けていますが、今回のケースではこのオプションを付ける必要がありました。(-v でマウントしたディレクトリを使用した事が原因だと思われる)

なお、–allow-different-user 無しで stack build すると以下のようになりました。

root@・・・# stack build
You are not the owner of '/work/sample1/'. Aborting to protect file permissions.Retry with '--allow-different-user' to disable this precaution.

(2.3) MongoDB 接続アプリケーションの実装

とりあえず someFunc という関数名をそのまま使う事にします。(変更する場合は app/Main.hs も変更します)

OverloadedStringsExtendedDefaultRules の言語拡張が最低限必要となるようです。

MongoDB driver for Haskell では、MongoDB に対する処理を Action m a (MonadIO m) で定義し access 関数で処理すればよさそうです。

ここで Actiontype Action = ReaderT MongoContext と定義されています。

MongoDB のコレクションへ複数ドキュメントを追加するには insertManyinsertMany_ 関数が使えます。(ドキュメント追加の用途では insertinsertAll 関数等もあります)

関数名の最後の _ の有無は、作成したドキュメントの _id の値を返すかどうかの違いのようです。(_ の付いている方は返さない)

今回は _id の値は不要なので insertMany_ の方を使いました。

MongoDB へ登録するドキュメントは [<項目名> =: <値>, ・・・] という形式で定義できます。

find 関数の結果は Action m Cursor なので、rest 関数(Cursor -> Action m [Document]) をバインド(>>=)して Action m [Document] を取得しています。

接続先の MongoDB ホスト名は MONGO_HOST 環境変数から取得するようにしてみました。

sample1/src/Lib.hs
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ExtendedDefaultRules #-}

module Lib
    ( someFunc
    ) where

import System.Environment
import Control.Monad.Trans (lift)
import Database.MongoDB 

someFunc :: IO ()
someFunc = do
    mongo <- mongoHost
    -- MongoDB 接続
    pipe <- connect (host mongo)

    access pipe master "sample" proc

    close pipe

mongoHost :: IO String
mongoHost = getEnv "MONGO_HOST"

proc :: Action IO ()
proc = do
    -- items コレクションへドキュメント追加
    insertMany_ "items" [
        ["name" =: "item1", "value" =: 1],
        ["name" =: "item2", "value" =: 2] ]

    allItems >>= printDocs

-- items コレクションの全ドキュメント取得
allItems :: Action IO [Document]
allItems = rest =<< find ( select [] "items" )

-- ドキュメントの出力
printDocs :: [Document] -> Action IO ()
printDocs docs = lift $ mapM_ print docs

(2.4) ビルドと実行

それでは、ビルドして実行します。

ビルド
root@・・・# stack build --allow-different-user
・・・
/work/sample1/.stack-work/install/x86_64-linux/lts-8.15/8.0.2/bin
Registering sample1-0.1.0.0...

.stack-work ディレクトリへ(ビルドの)成果物が生成されます。

実行するには stack exec <実行ファイル名> で実行するか、実行ファイル(例 .stack-work/install/x86_64-linux/lts-8.15/8.0.2/bin/sample1-exe)を直接実行します。

実行ファイル名はデフォルトで <プロジェクト名>-exe となるようです。

今回は環境変数から MongoDB のホスト名を取得するようにしたので、MONGO_HOST 環境変数へ MongoDB のコンテナ名を設定してから実行します。

実行
root@・・・# export MONGO_HOST=mongo1

root@・・・# stack exec sample1-exe --allow-different-user
[ _id: 592a7ffb2139612699000000, name: "item1", value: 1]
[ _id: 592a7ffb2139612699000001, name: "item2", value: 2]

D3.js で HAR ファイルから散布図を作成

前回、HAR (HTTP ARchive) ファイルから Python で作成した散布図を D3.js を使って SVG として作ってみました。

ソースは http://github.com/fits/try_samples/tree/master/blog/20170515/

はじめに

Node.js で D3.js を使用するために d3jsdom をインストールしておきます。

依存ライブラリのインストー
npm install --save d3 jsdom
package.json
{
  ・・・
  "dependencies": {
    "d3": "^4.8.0",
    "jsdom": "^10.1.0"
  }
}

散布図(SVG)の作成

標準入力から HAR ファイルの内容(JSON)を受け取り、time と bodySize の該当位置へ円を描画する事で散布図を表現してみました。

円の色をサブタイプ毎に分けることで色分けしています。

ついでに、ツールチップの表示やマウスイベント処理を追加してみました。

Web ブラウザで SVG ファイルを開いた際にマウスイベントを処理できるように attr を使ってマウスイベント処理(該当する円のサイズを変更)を設定しています。

index.js
const d3 = require('d3');

const jsdom = require('jsdom');
const { JSDOM } = jsdom;

const w = 800;
const h = 500;
const margin = { left: 80, top: 20, bottom: 50, right: 200 };
const legendMargin = { top: 30 };

const fontSize = '12pt';
const circleRadius = 5;

// X軸(time)の値の範囲
const xDomain = [0, 5000];
// Y軸(bodySize)の値の範囲
const yDomain = [0, 180000];

const colorMap = {
    'javascript': 'blue',
    'x-javascript': 'blue',
    'json': 'green',
    'gif': 'tomato',
    'jpeg': 'red',
    'png': 'pink',
    'html': 'lime',
    'css': 'turquoise'
};

const dom = new JSDOM();
const document = dom.window.document;

const toSubType = mtype => mtype.split(';')[0].split('/').pop();
const toColor = stype => colorMap[stype] ? colorMap[stype] : 'black';

process.stdin.resume();

let json = '';

process.stdin.on('data', chunk => json += chunk);

process.stdin.on('end', () => {
    const data = JSON.parse(json);

    const df = data.log.entries.map( d => {
        return {
            'url': d.request.url,
            'subType': toSubType(d.response.content.mimeType),
            'bodySize': d.response.bodySize,
            'time': d.time
        };
    });

    const svg = d3.select(document.body)
        .append('svg')
            .attr('xmlns', 'http://www.w3.org/2000/svg')
            .attr('width', w + margin.left + margin.right)
            .attr('height', h + margin.top + margin.bottom)
            .append('g')
                .attr('transform', `translate(${margin.left}, ${margin.top})`);

    const x = d3.scaleLinear().range([0, w]).domain(xDomain);
    const y = d3.scaleLinear().range([h, 0]).domain(yDomain);

    const xAxis = d3.axisBottom(x);
    const yAxis = d3.axisLeft(y);

    // X軸
    svg.append('g')
        .attr('transform', `translate(0, ${h})`)
        .call(xAxis);
    // Y軸
    svg.append('g')
        .call(yAxis);

    // X軸ラベル
    svg.append('text')
        .attr('x', w / 2)
        .attr('y', h + 40)
        .style('font-size', fontSize)
        .text('Time (ms)');
    // Y軸ラベル
    svg.append('text')
        .attr('x', -h / 2)
        .attr('y', -(margin.left) / 1.5)
        .style('font-size', fontSize)
        .attr('transform', 'rotate(-90)')
        .text('Body Size');

    // 円
    const point = svg.selectAll('circle')
        .data(df)
        .enter()
            .append('circle');
    // 円の設定
    point.attr('class', d => d.subType)
        .attr('cx', d => x(d.time))
        .attr('cy', d => y(d.bodySize))
        .attr('r', circleRadius)
        .attr('fill', d => toColor(d.subType))
        .append('title') // ツールチップの設定
            .text(d => d.url);

    // 凡例
    const legend = svg.selectAll('.legend')
        .data(d3.entries(colorMap))
        .enter()
            .append('g')
                .attr('class', 'legend')
                .attr('transform', (d, i) => {
                    const left = w + margin.left;
                    const top = margin.top + i * legendMargin.top;
                    return `translate(${left}, ${top})`;
                });

    legend.append('circle')
        .attr('r', circleRadius)
        .attr('fill', d => d.value);

    legend.append('text')
        .attr('x', circleRadius * 2)
        .attr('y', 4)
        .style('font-size', fontSize)
        // マウスイベント処理(該当する円のサイズを変更)
        .attr('onmouseover', d => 
            `document.querySelectorAll('circle.${d.key}').forEach(d => d.setAttribute('r', ${circleRadius} * 2))`)
        .attr('onmouseout', d => 
            `document.querySelectorAll('circle.${d.key}').forEach(d => d.setAttribute('r', ${circleRadius}))`)
        // 凡例のラベル
        .text(d => d.key);

    // SVG の出力
    console.log(document.body.innerHTML);
});
実行例
node index.js < a.har > a.svg

実行結果例

a.svg

f:id:fits:20170515220405p:plain

b.svg

f:id:fits:20170515220428p:plain

Python で HAR ファイルから散布図を作成

Web サイトの構成を可視化するため、Python で HAR (HTTP ARchive) ファイルをパースし散布図を試しに作ってみました。

今回作成した散布図の内容は以下の通りです。

  • 横軸は time (処理時間 (ミリ秒))
  • 縦軸は bodySize (レスポンスボディのサイズ)
  • MIME タイプのサブタイプ別に色分け

今回は Anaconda に予め含まれているライブラリを使用する事にします。

今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170514/

はじめに

HAR ファイルは ChromeFirefox の開発ツールで取得できますが、簡易なものは PhantomJS の examples/netsniff.js で取得できるので今回はこれを使いました。

HAR の取得
phantomjs netsniff.js <URL>

netsniff.js では console.log で HAR の内容を出力するようになっています。

HAR ファイルの取得例
phantomjs netsniff.js https://www.・・・ > a.har

HAR ファイルは以下のような JSON 形式のファイルとなっています。

HAR ファイル例
{
  "log": {
    ・・・
    "entries": [
      {
        ・・・
        "time": 4474,
        "request": {
          ・・・
        },
        "response": {
          ・・・
          "bodySize": 473,
          "content": {
            "size": 473,
            "mimeType": "text/html;charset=UTF-8"
          }
        },
        "cache": {},
        "timings": {
          ・・・
          "wait": 3813,
          "receive": 661,
          "ssl": -1
        },
        ・・・
      },
      ・・・
    ]
  }
}

netsniff.js では、timings 内の wait と receive 以外の値は固定値(0 か -1)が設定されるようになっています。

1. seaborn の pairplot 使用

まずは、seaborn の pairplot を使って散布図を描画してみます。 (pairplot はデフォルトが散布図になっているようです)

pairplot は基本的にデータ内の数値変数の全ての組み合わせをグラフ化するものですが、x_varsy_vars を指定する事で特定の組み合わせに限定したグラフだけを描画する事も可能です。

色分けする項目は hue で指定できるようになっています。

複数のグラフをグリッド表示する事を想定しているため、個々のグラフのデフォルトサイズが小さく設定されていますが、size で変更する事が可能です。

har_scatter_plot1.py
import sys
import json
import codecs
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

# HAR ファイルの読み込みと Python オブジェクト化
data = json.load(codecs.open(sys.argv[1], 'r', 'utf-8'))

to_subtype = lambda mtype: mtype.split(';')[0].split('/')[-1]
# 特定の項目を抜き出す処理
convert = lambda d: {
    'subType': to_subtype(d['response']['content']['mimeType']),
    'bodySize': d['response']['bodySize'],
    'time': d['time']
}
# DataFrame 化
df = pd.DataFrame(list(map(convert, data['log']['entries'])))

# 散布図の描画
sns.pairplot(df, hue = 'subType', x_vars = 'time', y_vars = 'bodySize', size = 10)

imgfile = "%s_1.png" % sys.argv[1]

# グラフの保存
plt.savefig(imgfile)
実行例
python har_scatter_plot1.py a.har

実行結果例

a.har_1.png

f:id:fits:20170514210328p:plain

b.har_1.png

f:id:fits:20170514210341p:plain

2. サブタイプ毎に plot

pairplot の結果ではサブタイプ毎の色が固定されておらず、複数の Web ページを比較する用途には適していません。

そこで、サブタイプ毎の散布図を重ねて描画してみました。

har_scatter_plot2.py
import sys
import json
import codecs
import pandas as pd
import matplotlib.pyplot as plt

・・・
# サブタイプと色のマッピング
color_map = {
    'javascript': 'blue',
    'x-javascript': 'blue',
    'json': 'green',
    'gif': 'tomato',
    'jpeg': 'red',
    'png': 'pink',
    'html': 'lime',
    'css': 'turquoise'
}

df = pd.DataFrame(list(map(convert, data['log']['entries'])))

ax = plt.figure().add_subplot(1, 1, 1)

for (k, c) in color_map.items():
    # 指定サブタイプのデータを抽出
    sdf = df[df['subType'] == k]

    if not sdf.empty:
        # c の色で散布図(scatter)を描画
        sdf.plot('time', 'bodySize', 'scatter', ax, c = c, label = k)

imgfile = "%s_2.png" % sys.argv[1]

plt.savefig(imgfile)
実行例
python har_scatter_plot2.py a.har

実行結果例

a.har_2.png

f:id:fits:20170514210415p:plain

b.har_2.png

f:id:fits:20170514210424p:plain

Go で Kafka の Consumer クライアント

下記ライブラリをそれぞれ使って Go 言語で Apache Kafka の Consumer クライアントを作成してみました。

今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170506/

Sarama の場合

まずは sarama を使ってみます。

準備

GOPATH 環境変数を設定して sarama を go get しておきます。

sarama の取得例
> go get github.com/Shopify/sarama

サンプル作成

sarama でメッセージを受信するには以下のように実装すれば良さそうです。

  • (1) sarama.NewConsumerConsumer を作成
  • (2) Consumer の ConsumePartition で指定のトピック・パーティションからメッセージを受信するための PartitionConsumer を作成
  • (3) PartitionConsumer の Messages で取得したチャネルからメッセージを受信

(2) で sarama.OffsetOldest を指定する事で Kafka に残っている最も古いオフセットからメッセージを取得します。

for select を使って継続的に Kafka からメッセージを受信するようにしていますが、このままだと処理が終了しないので Ctrl + c で終了するように os.Signal のチャネルを使っています。

src/consumer-sample/main.go
package main

import (
    "fmt"
    "log"
    "io"
    "os"
    "os/signal"
    "github.com/Shopify/sarama"
)

func close(trg io.Closer) {
    if err := trg.Close(); err != nil {
        log.Fatalln(err)
    }
}

func makeStopChannel() chan os.Signal {
    // Ctrl + c のチャネル設定
    ch := make(chan os.Signal, 1)
    signal.Notify(ch, os.Interrupt)

    return ch
}

func main() {
    // (1)
    consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)

    if err != nil {
        panic(err)
    }

    defer close(consumer)
    // (2)
    ptConsumer, err := consumer.ConsumePartition(os.Args[1], 0, sarama.OffsetOldest)

    if err != nil {
        panic(err)
    }

    defer close(ptConsumer)

    ch := makeStopChannel()

    for {
        select {
            // (3)
            case msg := <- ptConsumer.Messages():
                fmt.Printf("topic: %s, offset: %d, key: %s, value: %s\n", 
                    msg.Topic, msg.Offset, msg.Key, msg.Value)
            case <- ch:
                // 終了
                return
        }
    }
}

ビルドと実行

ビルド例
> cd src/consumer-sample
> go build

動作確認のため、前回 に組み込み実行した Kafka へ接続してみます。

実行例
> consumer-sample sample1

topic: sample1, offset: 0, key: a, value: 123
topic: sample1, offset: 1, key: b, value: 456

Ctrl + c を入力して終了します。

グループへ属していないのでオフセットが保存されず、実行の度に上記のような出力となります。

備考 - Enter キーで終了する方法

makeStopChannel の処理内容を以下のように変えてみると、Ctrl + c ではなく Enter キーの入力で終了するようにできます。

src/consumer-sample2/main.go
package main

import (
    ・・・
    "bufio"
    "github.com/Shopify/sarama"
)

・・・

func makeStopChannel() chan string {
    ch := make(chan string)
    reader := bufio.NewReader(os.Stdin)

    go func() {
        s, _ := reader.ReadString('\n')
        ch <- s
    }()

    return ch
}

func main() {
    ・・・
}

Sarama Cluster の場合

次に sarama-cluster を使ってみます。

sarama-cluster は sarama の拡張で、グループなどへの参加を sarama よりも簡単に実施できるようになっています。 (内部的には sarama の JoinGroupRequest 等を使ってグループを処理しています)

準備

GOPATH 環境変数を設定して sarama-cluster を go get しておきます。

sarama-cluster の取得例
> go get github.com/bsm/sarama-cluster

サンプル作成

sarama-cluster の場合は sarama よりもシンプルで、NewConsumerConsumer を作り、Messages で取得したチャネルからメッセージを受信できます。

また、MarkOffset を使ってオフセットを保存します。

src/consumer-group-sample/main.go
package main

import (
    "fmt"
    "log"
    "io"
    "os"
    "os/signal"
    "github.com/Shopify/sarama"
    cluster "github.com/bsm/sarama-cluster"
)

・・・

func main() {
    config := cluster.NewConfig()
    config.Group.Return.Notifications = true
    config.Consumer.Return.Errors = true
    config.Consumer.Offsets.Initial = sarama.OffsetOldest

    topics := []string{os.Args[1]}
    group := os.Args[2]

    consumer, err := cluster.NewConsumer([]string{"localhost:9092"}, group, topics, config)

    if err != nil {
        panic(err)
    }

    defer close(consumer)

    ch := makeStopChannel()

    for {
        select {
            case msg, more := <- consumer.Messages():
                if more {
                    fmt.Printf("topic: %s, offset: %d, key: %s, value: %s\n", 
                            msg.Topic, msg.Offset, msg.Key, msg.Value)

                    // オフセットの保存
                    consumer.MarkOffset(msg, "")
                }
            case err, more := <- consumer.Errors():
                if more {
                    log.Printf("Error: %+v\n", err.Error())
                }
            case ntf, more := <- consumer.Notifications():
                if more {
                    log.Printf("Notification: %+v\n", ntf)
                }
            case <- ch:
                return
        }
    }
}

ビルドと実行

ビルド例
> cd src/consumer-group-sample
> go build

Kafka へ接続して動作確認してみます。

実行例
> consumer-group-sample sample1 g3

・・・ Notification: &{Claimed:map[sample1:[0]] Released:map[] Current:map[sample1:[0]]}
topic: sample1, offset: 0, key: a, value: 123
topic: sample1, offset: 1, key: b, value: 456

クラスター実行

同一グループに所属する Consumer クライアント(上記サンプル)を複数実行し、メッセージの分散受信を試してみます。

Kafka では 1つのパーティションは 1つの Consumer に割り当てられるようで(グループ毎)、パーティションが 1つしかない状態で同一グループに属する Consumer を増やしても分散の効果はありません。

そこで、まずはパーティション数を増やしておきます。

既存トピックのパーティション数を変更するには kafka-topics コマンド ※ が使えます。 --alter オプションを使って --partitions へ変更後のパーティション数を指定します。

 ※ kafka-topics コマンドは kafka.admin.TopicCommand クラスの
    main メソッドを呼び出しているだけです
パーティション数を 3つへ変更する例
> kafka-topics --alter --zookeeper 127.0.0.1:2181 --topic sample1 --partitions 3

WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

同一グループ cg1 に所属する Consumer クライアントを 3つ実行して、メッセージをいくつか送信してみたところ、以下のような結果となりました。

A
> consumer-group-sample sample1 cg1

・・・ Notification: &{Claimed:map[sample1:[0 1 2]] Released:map[] Current:map[sample1:[0 1 2]]}
topic: sample1, offset: 0, key: a, value: 123
topic: sample1, offset: 1, key: b, value: 456
・・・ Notification: &{Claimed:map[sample1:[]] Released:map[sample1:[2]] Current:map[sample1:[0 1]]}
・・・ Notification: &{Claimed:map[sample1:[]] Released:map[sample1:[1]] Current:map[sample1:[0]]}
topic: sample1, offset: 2, key: f, value: 678
B
> consumer-group-sample sample1 cg1

・・・ Notification: &{Claimed:map[sample1:[2]] Released:map[] Current:map[sample1:[2]]}
・・・ Notification: &{Claimed:map[sample1:[1]] Released:map[sample1:[2]] Current:map[sample1:[1]]}
topic: sample1, offset: 0, key: c, value: 789
C
> consumer-group-sample sample1 cg1

・・・ Notification: &{Claimed:map[sample1:[2]] Released:map[] Current:map[sample1:[2]]}
topic: sample1, offset: 0, key: d, value: 012
topic: sample1, offset: 1, key: e, value: 345

Notification を見るとそれぞれのクライアントへ割り当てられているパーティションが変化している事を確認できます。

例えば、A は最初 0 1 2 の 3つのパーティションが割り当てられていましたが、B や C の追加に伴って減っていき最終的にパーティション 0 のみとなっています。

B はパーティション 2 に割り当てられていましたが、C の追加によってパーティション 1 に変更されています。