TinkerPop でグラフ操作 - Groovy
前回、Neo4j の Cypher を使って実施したグラフ操作を Apache TinkerPop を使って Groovy (@Grab
を使用)で実装してみました。
- Groovy 2.5.0 beta1
- Apache TinkerPop 3.2
Apache TinkerPop はグラフ処理のためのフレームワークで、Neo4j 等の様々なグラフ DB ※ に対して共通のインターフェースを提供します。
※ グラフ DB だけではなく、 Cassandra、HBase、DynamoDB 等をサポートする ライブラリも提供されています
ソースは http://github.com/fits/try_samples/tree/master/blog/20170718/
a. 設定ファイル
TinkerPop にはインメモリーの TinkerGraph が用意されていますが、前回と同様に Neo4j を使う事にします。
ただ、前回と違って Neo4j をサーバー起動せずに組み込み実行します。
TinkerPop には Graph
オブジェクトを汎用的に生成する手段として org.apache.tinkerpop.gremlin.structure.util.GraphFactory
が用意されています。
GraphFactory.open(<設定ファイル>)
を使えば、依存ライブラリと設定ファイルを差し替えて DB を切り替える事もできそうなので、今回はこの方法を使います。
Neo4j を組み込み利用する場合、gremlin.graph
に org.apache.tinkerpop.gremlin.neo4j.structure.Neo4jGraph
を設定して gremlin.neo4j.directory
に DB ファイルを出力するディレクトリを指定します。 (Neo4j をサーバー起動した場合の data/databases/graph.db)
setting.properties
gremlin.graph=org.apache.tinkerpop.gremlin.neo4j.structure.Neo4jGraph gremlin.neo4j.directory=neo4jdb
b. グラフデータ作成
前回 と同様のデータを作成する処理を実装してみます。
Groovy で実行する際の注意点として、neo4j-tinkerpop-api-impl 等は依存ライブラリとして Groovy 2.4.11 のライブラリを含んでおり、このバージョン以外の groovy コマンドで実行すると org.codehaus.groovy.control.MultipleCompilationErrorsException
が発生してしまいます。
そこで今回は、Groovy 2.5.0 beta1 で実行できるように @GrabExclude
を使って groovy-xml
等を除くようにしています。
グラフの操作は GraphFactory.open()
で取得した Graph
オブジェクトに対して実施します。
ノードの追加は addVertex
、エッジの追加は addEdge
メソッドで行う事ができ、property
メソッドで任意の属性を設定できます。
トランザクションは tx
メソッドで開始します。※
※ TinkerGraph のように tx メソッドをサポートしていないものもありますので (その場合に tx() を呼び出すとエラーになる) 実際は tx のサポート有無をチェックしてから 呼び出すようにした方が安全だと思います (例) if (g.features().graph().supportsTransactions()) { g.tx().withAutoCloseable { t -> ・・・ } }
add-data.groovy
@Grapes([ @Grab('org.apache.tinkerpop:neo4j-gremlin:3.2.5'), @Grab('org.neo4j:neo4j-tinkerpop-api-impl:0.6-3.2.2'), @Grab('org.slf4j:slf4j-nop:1.7.25'), @GrabExclude('org.codehaus.groovy:groovy-xml'), @GrabExclude('org.codehaus.groovy:groovy-swing'), @GrabExclude('org.codehaus.groovy:groovy-jsr223') ]) import org.apache.tinkerpop.gremlin.structure.util.GraphFactory def conf = args[0] def addNode = { g, type, id, name = id -> def res = g.addVertex(type) res.property('oid', id) res.property('name', name) res } def createData = { g -> def p = addNode(g, 'Principals', 'principals') def u1 = addNode(g, 'User', 'user1') def u2 = addNode(g, 'User', 'user2') def ad = addNode(g, 'User', 'admin') def g1 = addNode(g, 'Group', 'group1') [u1, u2, ad, g1].each { it.addEdge('PART_OF', p) } u2.addEdge('PART_OF', g1) def r = addNode(g, 'Resources', 'resources') def s1 = addNode(g, 'Service', 'service1') def s2 = addNode(g, 'Service', 'service2') def s2o1 = addNode(g, 'Operation', 'service2.get', 'get') def s2o2 = addNode(g, 'Operation', 'service2.post', 'post') [s2o1, s2o2].each { s2.addEdge('METHOD', it) } [s1, s2].each { r.addEdge('RESOURCE', it) } u1.addEdge('PERMIT', s1) g1.addEdge('PERMIT', s2o2) ad.addEdge('PERMIT', r) } GraphFactory.open(conf).withAutoCloseable { g -> g.tx().withAutoCloseable { tx -> createData(g) tx.commit() } }
withAutoCloseable
は AutoCloseable
のリソースをクローズするための Groovy の機能です(TinkerPop の API ではありません)
実行結果
> groovy add-data.groovy setting.properties
c. 経路の探索
前回 の経路探索の処理を TinkerPop の API で実装してみます。
traversal().V()
でノードを対象とした GraphTraversal<Vertex,Vertex>
を取得でき、has(<プロパティ名>, <プロパティ値>)
等を使ってノードの条件を指定できます。
(c-1) 複数エッジ(条件なし)
まず、エッジは気にせずに指定ノードから指定ノードまでのパス(経路)を取得する処理を実装してみます。
終点のノードまで複数のノードで繋がっている場合は repeat(・・・).until(<終点ノードの条件>)
で探せます。
エッジの条件を指定しない場合は repeat(__.out()).until(・・・)
で取得できます。(__
はクラス名です)
パスを取得するには path()
を使います。
Path
からは objects()
でパスに含まれるノード(やエッジ)を List<Object>
で取得できます。
なお、以下の処理でトランザクションは必要ありませんが、一応トランザクションを使っています。 (JanusGraph をマルチスレッドで使うケースではトランザクションが必要になったので)
find-data-simple.groovy
@Grapes([ @Grab('org.apache.tinkerpop:neo4j-gremlin:3.2.5'), @Grab('org.neo4j:neo4j-tinkerpop-api-impl:0.6-3.2.2'), @Grab('org.slf4j:slf4j-nop:1.7.25'), @GrabExclude('org.codehaus.groovy:groovy-xml'), @GrabExclude('org.codehaus.groovy:groovy-swing'), @GrabExclude('org.codehaus.groovy:groovy-jsr223') ]) import org.apache.tinkerpop.gremlin.structure.util.GraphFactory import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__ def conf = args[0] def start = args[1] def end = args[2] def toStr = { "${it.label()}[${it.id()}]{${it.properties().join(', ')}}" } GraphFactory.open(conf).withAutoCloseable { g -> g.tx().withAutoCloseable { def p = g.traversal().V() .has('oid', start) // 始点ノードの条件 .repeat(__.out()) .until(__.has('oid', end)) // 終点ノードの条件 .path() p.each { println it.objects().collect(toStr).join(' -> ') } } }
実行結果
user2 から service2.post へのパスを取得してみます。
> groovy find-data-simple.groovy setting.properties user2 service2.post User[2]{vp[oid->user2], vp[name->user2]} -> Group[4]{vp[oid->group1], vp[name->group1]} -> Operation[9]{vp[oid->service2.post], vp[name->post]}
__.out()
を使った事で、パスにはノードの情報だけが含まれエッジの情報を含んでいません。
(c-2) 複数エッジ(条件あり)
エッジの条件を指定するには repeat
でエッジも指定します。
A ノードと B ノードが C エッジで繋がっている (A)-[C]->(B)
のような状態で、A の __.outE()
が C エッジで、C エッジの inV()
が B ノードとなります。
そのため、repeat(__.outE().as('e').inV())
とすれば、複数の外向きエッジで繋がっているノードを検索する事ができます。
ここで as(<ステップラベル名>)
を使って、該当するエッジにラベル名を付けておき、where
での条件判定(PERMIT エッジを含むかどうか)に使います。
__.select('e')
の結果は GraphTraversal<Vertex, ArrayList<Edge>>
のようになるので __.select('e').hasLabel(・・・)
とはできません。 (ClassCastException になります)
unfold()
で GraphTraversal<Vertex, ArrayListGraphTraversal<Vertex, Edge>
にして hasLabel
を使えば、該当のラベルを持つエッジを含んでいるかどうかを条件判定できます。
find-data.groovy
・・・ GraphFactory.open(conf).withAutoCloseable { g -> g.tx().withAutoCloseable { def p = g.traversal().V() .has('oid', start) // 始点ノードの条件 .repeat(__.outE().as('e').inV()) .until(__.has('oid', end)) // 終点ノードの条件 .where(__.select('e').unfold().hasLabel('PERMIT')) // PERMIT エッジを含んでいるかどうかの判定 .path() p.each { println it.objects().collect(toStr).join(' -> ') } } }
実行結果
> groovy find-data.groovy setting.properties user2 service2.post User[2]{vp[oid->user2], vp[name->user2]} -> PART_OF[4]{} -> Group[4]{vp[oid->group1], vp[name->group1]} -> PERMIT[10]{} -> Operation[9]{vp[oid->service2.post], vp[name->post]}
__.outE()
を使った事でエッジの情報もパスに含まれるようになりました。
アクセス制御リストをグラフDBで構築 - Neo4j
柔軟性のあるアクセス制御を考えた際に、アクセス制御リスト(ACL)を有向グラフで表現すればどうだろうかと思い、グラフDBの Neo4j を使って試してみました。
概要
アクセス制御リストを有向グラフで表現し、アクセスの許可・拒否を以下のように判定する事にします。
- アクセスの主体からアクセス対象へのパス(経路)が存在すればそのアクセスを許可する
例えば、user1 が service1 へアクセスできる事(アクセス許可が与えられている)を以下のように表現する事にします。
(a)
ただ、これだと自由度が高すぎると思うのでルールを加えます。
- アクセスの主体となる全てのノードは Principals ノードへ所属させる
- アクセスの対象となる全てのノードは Resources ノードへ所属させる
- アクセスの主体同士と対象同士でエッジ(辺)の向きを調整
これに基づいてノードやエッジをいくつか追加してみた結果が以下です。
(b)
admin ユーザーは Resources に属する全てのリソース(service1, service2, service2 の post や get)へアクセスでき、group1 へ所属する user2 は service2.post へアクセスできる事を表現しています。
Neo4j で検証
それでは実際に Neo4j を使って検証してみます。
Neo4j ではエッジ(辺)に該当する部分をリレーションシップと呼ぶようですが、以降もエッジと呼ぶことにします。
(1) Neo4j サーバー起動
まずは Neo4j のサーバーを起動しておきます。 Neo4j の bin ディレクトリに neo4j コマンドがあるので、これを使って起動します。
今回は neo4j console
で Neo4j サーバーを起動しますが、neo4j start
で実行する方法もあります。
Neo4j サーバー起動
> neo4j console
サーバーが起動した後、http://localhost:7474/
へ Web ブラウザでアクセスすると管理画面が表示されるので、初期パスワードに neo4j
と入力して初回のログイン処理を行っておきます。
管理画面では Cypher と呼ばれるクエリ言語を使ってデータを操作できるようになっているので、これより Cypher でデータ操作していきます。
(2) グラフデータの作成
先ほどの (a) のグラフ(user1 と service1 ノードを PERMIT エッジで繋ぐ)は以下のような Cypher クエリで作成できます。
1. (a) のデータ作成 Cypher クエリ
CREATE (u:User{oid:"user1",name:"user1"})-[r:PERMIT]->(s:Service{oid:"service1",name:"service1"}) RETURN u,r,s
CREATE
でノードやエッジを作成でき、ノードの内容は (変数名:ラベル{プロパティ名:値, ・・・})
、エッジの内容は [変数名:関連名{プロパティ名:値, ・・・}]
のように指定できるようです。
A から B ノードへ向いたエッジを作成する際は (A)-[エッジ]->(B)
のようにします
RETURN
を使うと指定した変数の内容が返され、管理画面では RETURN の内容を SVG で表示してくれます。※
※ Cypher クエリ実行毎の左メニューの "Graph" を選択すると SVG のグラフ表示で、 "Code" を選択すると Response の JSON を確認できます
ノードやエッジには一意の id
が付与されますが、ノードを指定するのに不便なので、今回は oid
という独自プロパティを設定するようにしました。
管理画面では name
プロパティの値を表示するようなので name も設定しています。
次に、(a) の状態に対してノードやエッジを追加して (b) の状態にしていきます。
今度はいくつかのステップに分けてデータを追加します。
まずは principals, resources, admin を作成し、user2 と PART_OF エッジで繋いだ group1 を作成しました。
2. principals, resources, admin, user2, group1 の追加
CREATE (:Principals{name:"principals"}),(:Resources{name:"resources"}) CREATE (:User{oid:"admin",name:"admin"}) CREATE (:User{oid:"user2",name:"user2"})-[:PART_OF]->(:Group{oid:"group1",name:"group1"})
次は get と post というメソッドを持った service2 を追加します。 ここでは service2 を作った後で、METHOD エッジで繋がった get と post を作成するようにしてみました。
3. get と post を持つ service2 の追加
CREATE (s2:Service{oid:"service2",name:"service2"}) MERGE (s2)-[:METHOD]->(:Operation{oid:"service2.get",name:"get"}) MERGE (s2)-[:METHOD]->(:Operation{oid:"service2.post",name:"post"})
次に PERMIT エッジを作成していくつかのノードを繋ぎます。
4. group1 から service2.post への PERMIT を設定
MATCH (g1:Group{oid:"group1"}),(m1:Operation{oid:"service2.post"}) MERGE (g1)-[:PERMIT]->(m1)
5. admin から resources への PERMIT を設定
MATCH (au:User{oid:"admin"}),(r:Resources) MERGE (au)-[:PERMIT]->(r)
これまでに作成した全ての User と Group が principals へ所属するように、全ての Service が resources へ所属するようにエッジで繋ぎます。
6. principals と resources へのエッジをそれぞれ設定
MATCH (u:User),(g:Group),(s:Service),(p:Principals),(r:Resources) MERGE (u)-[:PART_OF]->(p) MERGE (g)-[:PART_OF]->(p) MERGE (r)-[:RESOURCE]->(s)
最後に全てのデータを表示します。
全表示
MATCH (a) RETURN a
(3) アクセス許可の判定(経路の探索)
このデータに対して、どのような Cypher クエリを使って経路探索(= アクセス許可の確認)すればよいのか検証していきます。
単独エッジ
まずは、単純に user1 から service1 へ PERMIT で繋がっているかどうかは、以下のクエリで確認できます。
user1 から service1 へ PERMIT で直接繋がっているものを取得
MATCH (u:User{oid:"user1"})-[:PERMIT]->(s:Service{oid:"service1"}) RETURN u,s
このクエリでは PERMIT エッジで直接繋がっているケースにしか対応できないので汎用的には使えません。
複数エッジ(条件なし)
次に、PERMIT で繋がっているかどうかは気にしないで、user2 から service2.post へのパス(経路)を取得してみます。
エッジの条件を [*]
とする事で、複数のエッジで繋がっているパスを取得できます。
また、MATCH で指定したパスを変数(以下の path
)へ設定して、これを RETURN するようにすればパスの内容を取得できます。
user2 から service2.post へのパスを取得
MATCH path=(:User{oid:"user2"})-[*]->(:Operation{oid:"service2.post"}) RETURN path
複数エッジ(条件あり)
上記のクエリに対して、PERMIT のエッジを少なくとも 1つ含むという条件を加えるには where
と any
が使えます。
relationships(変数)
を使って変数の中からエッジのみを抽出する事が可能です。 (nodes(変数)
を使えばノードだけを抽出できる)
type(変数)
を使えばラベル名を取得できますので、これを使えば PERMIT かどうかの判定が可能です。
user2 から service2.post へのパス(PERMIT を含む)を取得
MATCH path=(:User{oid:"user2"})-[*]->(:Operation{oid:"service2.post"}) WHERE any(r IN relationships(path) WHERE type(r) = "PERMIT") RETURN path
このクエリなら汎用的に使えそうです。
ノードのラベル(今回の User, Service 等)を特定する必要が無ければ、以下のようにプロパティの条件だけを指定する事も可能です。
admin から service1 へのパス(PERMIT を含む)を取得
MATCH path=({oid:"admin"})-[*]->({oid:"service1"}) WHERE any(r IN relationships(path) WHERE type(r) = "PERMIT") RETURN path
user1 と service1 もこの方法で取得できます。
user1 から service1 へのパス(PERMIT を含む)を取得
MATCH path=(:User{oid:"user1"})-[*]->({oid:"service1"}) WHERE any(r IN relationships(path) WHERE type(r) = "PERMIT") RETURN path
最後に、パスが繋がっていない場合も確認しておきます。 当然ながら結果は空となりました。
user2 から service1 へのパス(PERMIT を含む)を取得
MATCH path=(u:User{oid:"user2"})-[*]->({oid:"service1"}) WHERE any(r IN relationships(path) WHERE type(r) = "PERMIT") RETURN path
備考
relationships を使う以外に [rs *]
のように変数を使う方法でも同じ結果が得られます。
user2 から service2.post へのパス(PERMIT を含む)を取得 - deprecated
MATCH path=(:User{oid:"user2"})-[rs *]->(:Operation{oid:"service2.post"}) WHERE any(r IN rs WHERE type(r) = "PERMIT") RETURN path
こちらの方がシンプルなように思うのですが、このクエリでは Binding relationships to a list in a variable length pattern is deprecated
と警告されましたので、使わない方が無難かもしれません。
Docker で F# アプリケーションを作成
Microsoft による .NET Core の公式 Docker イメージを使って、単純な F# のコンソールアプリケーションを作成してみました。
今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170619/
(1) Docker コンテナの実行
まずは .NET Core の Docker コンテナを実行しておきます。
.NET Core コンテナの実行
$ docker run -it --rm microsoft/dotnet /bin/bash root@・・・:/#
現時点では 1.1.2-sdk が latest となっており、デフォルトで netcoreapp1.1
のアプリケーションを作成する事になります。
(2) F# コンソールプロジェクトの作成
.NET Core SDK では dotnet
コマンド (.NET Command Line Tools) を使ってアプリケーションを開発できます。
dotnet new
コマンドで様々なプロジェクトの雛型を作成でき、デフォルトでは以下のようなプロジェクトに対応しています。
プロジェクトの雛型一覧
root@・・・# dotnet new -all ・・・ Templates Short Name Language Tags ----------------------------------------------------------------------- Console Application console [C#], F# Common/Console Class library classlib [C#], F# Common/Library Unit Test Project mstest [C#], F# Test/MSTest xUnit Test Project xunit [C#], F# Test/xUnit ASP.NET Core Empty web [C#] Web/Empty ASP.NET Core Web App mvc [C#], F# Web/MVC ASP.NET Core Web API webapi [C#] Web/WebAPI Nuget Config nugetconfig Config Web Config webconfig Config Solution File sln Solution ・・・
今回は F# のコンソールアプリケーションを作成します。
適当なディレクトリを作成・移動し、dotnet new console
を実行します。
F# の場合は -lang
オプションで指定します。 (デフォルトは C#)
F# コンソールプロジェクトの作成
root@・・・# mkdir sample root@・・・# cd sample root@・・・# dotnet new console -lang F#
以下のファイルが生成されました。
Program.fs
open System [<EntryPoint>] let main argv = printfn "Hello World from F#!" 0 // return an integer exit code
sample.fsproj
<Project Sdk="FSharp.NET.Sdk;Microsoft.NET.Sdk"> <PropertyGroup> <OutputType>Exe</OutputType> <TargetFramework>netcoreapp1.1</TargetFramework> </PropertyGroup> <ItemGroup> <Compile Include="Program.fs" /> </ItemGroup> <ItemGroup> <PackageReference Include="FSharp.Core" Version="4.1.*" /> <PackageReference Include="FSharp.NET.Sdk" Version="1.0.*" PrivateAssets="All" /> </ItemGroup> </Project>
(3) ビルドと実行
とりあえず、このままビルドして実行してみます。
まずは dotnet restore
を実行してプロジェクトの依存関係を復元します。
これにより依存関係などが定義された obj/project.assets.json
ファイルが作られます。
リストア
root@・・・# dotnet restore ・・・ Installing FSharp.Compiler.Tools 4.1.17. Installing FSharp.NET.Sdk 1.0.5. Installing FSharp.Core 4.1.17. ・・・ Installed: 3 package(s) to /sample/sample.fsproj
ビルドは dotnet build
で行います。
なお、dotnet build しなくても dotnet run すればビルドも実施してくれるようです。
ビルド
root@・・・# dotnet build ・・・ Build succeeded. 0 Warning(s) 0 Error(s)
dotnet run
で実行します。
実行
root@・・・# dotnet run Hello World from F#!
(4) Chiron パッケージの追加
次に、Chiron という F# で JSON を処理するためのパッケージを使ってみます。
依存パッケージは dotnet add package <パッケージ名>
で追加できますが、(現時点で)デフォルトの Chiron 6.2.1 を追加しようとすると以下のようにエラーとなりました。
Chiron パッケージ(6.2.1)追加 - 失敗
root@・・・# dotnet add package chiron ・・・ log : Installing Chiron 6.2.1. error: Package Chiron 6.2.1 is not compatible with netcoreapp1.1 (.NETCoreApp,Version=v1.1). Package Chiron 6.2.1 supports: error: - net40 (.NETFramework,Version=v4.0) error: - portable-net45+win8+wp8+wpa81 (.NETPortable,Version=v0.0,Profile=Profile259) ・・・ error: One or more packages are incompatible with .NETCoreApp,Version=v1.1. error: Package 'chiron' is incompatible with 'all' frameworks in project '/sample/sample.fsproj'.
netcoreapp1.1
(.fsproj の TargetFramework で設定) を Chiron 6.2.1 がサポートしてない事が原因のようです。
nuget の Chiron のページ を見てみると .NETStandard 1.6 に対応したバージョン 7.0.0-alpha-170410 がありました。
netcoreapp1.1 は .NETStandard 1.6 をサポートしているので、このバージョンなら使えそうです。
add package では -v
オプションでバージョンを指定できるので、以下のようにバージョンを指定して Chiron を追加します。
Chiron パッケージ(7.0.0-alpha-170410)追加 - 成功
root@・・・# dotnet add package chiron -v 7.0.0-alpha-170410 ・・・ log : Installing FParsecCS 1.0.3-alpha-170404. log : Installing FParsec 1.0.3-alpha-170404. log : Installing Chiron 7.0.0-alpha-170410. info : Package 'chiron' is compatible with all the specified frameworks in project '/sample/sample.fsproj'. info : PackageReference for package 'chiron' version '7.0.0-alpha-170410' added to file '/sample/sample.fsproj'.
今度は成功し、.fsproj は以下のようになりました。
sample.fsproj (Chiron 追加後)
<Project Sdk="FSharp.NET.Sdk;Microsoft.NET.Sdk"> <PropertyGroup> <OutputType>Exe</OutputType> <TargetFramework>netcoreapp1.1</TargetFramework> </PropertyGroup> <ItemGroup> <Compile Include="Program.fs" /> </ItemGroup> <ItemGroup> <PackageReference Include="chiron" Version="7.0.0-alpha-170410" /> <PackageReference Include="FSharp.Core" Version="4.1.*" /> <PackageReference Include="FSharp.NET.Sdk" Version="1.0.*" PrivateAssets="All" /> </ItemGroup> </Project>
(5) Chiron サンプルアプリケーションの実装
動作確認のため Chiron を使った単純な処理を実装してみました。 (Chiron 7 ではモジュール構成等が変わっているようでした)
Program.fs
open System open Chiron.Inference [<EntryPoint>] let main argv = let d = Map ["a", 123] let res = Json.serialize d printfn "json = %s" res 0
(6) Chiron サンプルアプリケーションのビルドと実行
(3) と同様にリストアしてから実行します。
リストア
root@・・・# dotnet restore Restoring packages for /sample/sample.fsproj... ・・・
ビルド
root@・・・# dotnet build ・・・ Build succeeded. 0 Warning(s) 0 Error(s)
実行
root@・・・# dotnet run json = {"a":123}
Docker で Haskell アプリケーションを作成 - MongoDB 利用
MongoDB へ接続する Haskell アプリケーションを Docker で作成してみました。
以下の Docker イメージを使用します。
ビルドツールは stack を使って、MongoDB への接続には以下のライブラリを使います。
今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170529/
(1) Docker コンテナの実行
Docker で MongoDB と Haskell のコンテナを実行します。
コンテナ間の連携には、deprecated となっている Link 機能(–link)は使わずにユーザー定義ネットワークを使う事にします。
(1.1) bridge ネットワークの作成
bridge ネットワークを新規作成します。
$ docker network create --driver bridge br1
(1.2) MongoDB コンテナの実行
作成したネットワークへ参加するように --net=<ネットワーク名>
を使って MongoDB を docker run します。
$ docker run -d --name mongo1 --net=br1 mongo
(1.3) Haskell コンテナの実行
Haskell も同様に docker run します。
ここでは Docker ホスト側の /vagrant/work をコンテナの /work へマウントしています。
$ docker run -it --name hs1 --net=br1 -v /vagrant/work:/work haskell /bin/bash root@・・・#
確認のため Haskell コンテナ内から MongoDB のコンテナへ ping してみます。
ping 例
root@・・・# ping mongo1 PING mongo1 (172.18.0.2): 56 data bytes 64 bytes from 172.18.0.2: icmp_seq=0 ttl=64 time=0.640 ms 64 bytes from 172.18.0.2: icmp_seq=1 ttl=64 time=0.257 ms ・・・
(2) Haskell アプリケーションの作成
Haskell のコンテナで MongoDB へ接続するアプリケーションを作成します。 (以降の処理は (1.3) で起動した Haskell コンテナ内で実施します)
(2.1) プロジェクトのひな型作成
stack new <プロジェクト名> [<テンプレート名>]
でプロジェクトのひな型を作ります。
プロジェクト作成
root@・・・# cd /work root@・・・# stack new sample1 Downloading template "new-template" to create project "sample1" in sample1/ ... ・・・
プロジェクト名のディレクトリが作られ、各種ファイルが生成されます。
(2.2) 依存ライブラリの取得
MongoDB driver for Haskell を使用するため、<プロジェクト名>.cabal
ファイルの library/build-depends へ mongoDB
を追記します。
今回は Control.Monad.Trans
も使うので mtl
も追記しています。
sample1/sample1.cabal
・・・ library ・・・ build-depends: base >= 4.7 && < 5 , mongoDB , mtl default-language: Haskell2010 ・・・
stack build
でビルドすると未取得の依存ライブラリをダウンロードします。
プロジェクトのビルド(依存ライブラリの取得)
root@・・・# cd sample1 root@・・・# stack build --allow-different-user ・・・ mtl-2.2.1: download mtl-2.2.1: configure mtl-2.2.1: build mtl-2.2.1: copy/register ・・・
ここで --allow-different-user
オプションを付けていますが、今回のケースではこのオプションを付ける必要がありました。(-v でマウントしたディレクトリを使用した事が原因だと思われる)
なお、–allow-different-user 無しで stack build
すると以下のようになりました。
root@・・・# stack build You are not the owner of '/work/sample1/'. Aborting to protect file permissions.Retry with '--allow-different-user' to disable this precaution.
(2.3) MongoDB 接続アプリケーションの実装
とりあえず someFunc
という関数名をそのまま使う事にします。(変更する場合は app/Main.hs も変更します)
OverloadedStrings
と ExtendedDefaultRules
の言語拡張が最低限必要となるようです。
MongoDB driver for Haskell では、MongoDB に対する処理を Action m a
(MonadIO m) で定義し access
関数で処理すればよさそうです。
ここで Action
は type Action = ReaderT MongoContext
と定義されています。
MongoDB のコレクションへ複数ドキュメントを追加するには insertMany
や insertMany_
関数が使えます。(ドキュメント追加の用途では insert
や insertAll
関数等もあります)
関数名の最後の _
の有無は、作成したドキュメントの _id
の値を返すかどうかの違いのようです。(_
の付いている方は返さない)
今回は _id
の値は不要なので insertMany_
の方を使いました。
MongoDB へ登録するドキュメントは [<項目名> =: <値>, ・・・]
という形式で定義できます。
find
関数の結果は Action m Cursor
なので、rest
関数(Cursor -> Action m [Document]
) をバインド(>>=
)して Action m [Document]
を取得しています。
接続先の MongoDB ホスト名は MONGO_HOST
環境変数から取得するようにしてみました。
sample1/src/Lib.hs
{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE ExtendedDefaultRules #-} module Lib ( someFunc ) where import System.Environment import Control.Monad.Trans (lift) import Database.MongoDB someFunc :: IO () someFunc = do mongo <- mongoHost -- MongoDB 接続 pipe <- connect (host mongo) access pipe master "sample" proc close pipe mongoHost :: IO String mongoHost = getEnv "MONGO_HOST" proc :: Action IO () proc = do -- items コレクションへドキュメント追加 insertMany_ "items" [ ["name" =: "item1", "value" =: 1], ["name" =: "item2", "value" =: 2] ] allItems >>= printDocs -- items コレクションの全ドキュメント取得 allItems :: Action IO [Document] allItems = rest =<< find ( select [] "items" ) -- ドキュメントの出力 printDocs :: [Document] -> Action IO () printDocs docs = lift $ mapM_ print docs
(2.4) ビルドと実行
それでは、ビルドして実行します。
ビルド
root@・・・# stack build --allow-different-user ・・・ /work/sample1/.stack-work/install/x86_64-linux/lts-8.15/8.0.2/bin Registering sample1-0.1.0.0...
.stack-work ディレクトリへ(ビルドの)成果物が生成されます。
実行するには stack exec <実行ファイル名>
で実行するか、実行ファイル(例 .stack-work/install/x86_64-linux/lts-8.15/8.0.2/bin/sample1-exe
)を直接実行します。
実行ファイル名はデフォルトで <プロジェクト名>-exe
となるようです。
今回は環境変数から MongoDB のホスト名を取得するようにしたので、MONGO_HOST
環境変数へ MongoDB のコンテナ名を設定してから実行します。
実行
root@・・・# export MONGO_HOST=mongo1 root@・・・# stack exec sample1-exe --allow-different-user [ _id: 592a7ffb2139612699000000, name: "item1", value: 1] [ _id: 592a7ffb2139612699000001, name: "item2", value: 2]
D3.js で HAR ファイルから散布図を作成
前回、HAR (HTTP ARchive) ファイルから Python で作成した散布図を D3.js を使って SVG として作ってみました。
ソースは http://github.com/fits/try_samples/tree/master/blog/20170515/
はじめに
Node.js で D3.js を使用するために d3
と jsdom
をインストールしておきます。
依存ライブラリのインストール
npm install --save d3 jsdom
package.json
{ ・・・ "dependencies": { "d3": "^4.8.0", "jsdom": "^10.1.0" } }
散布図(SVG)の作成
標準入力から HAR ファイルの内容(JSON)を受け取り、time と bodySize の該当位置へ円を描画する事で散布図を表現してみました。
円の色をサブタイプ毎に分けることで色分けしています。
ついでに、ツールチップの表示やマウスイベント処理を追加してみました。
Web ブラウザで SVG ファイルを開いた際にマウスイベントを処理できるように attr
を使ってマウスイベント処理(該当する円のサイズを変更)を設定しています。
index.js
const d3 = require('d3'); const jsdom = require('jsdom'); const { JSDOM } = jsdom; const w = 800; const h = 500; const margin = { left: 80, top: 20, bottom: 50, right: 200 }; const legendMargin = { top: 30 }; const fontSize = '12pt'; const circleRadius = 5; // X軸(time)の値の範囲 const xDomain = [0, 5000]; // Y軸(bodySize)の値の範囲 const yDomain = [0, 180000]; const colorMap = { 'javascript': 'blue', 'x-javascript': 'blue', 'json': 'green', 'gif': 'tomato', 'jpeg': 'red', 'png': 'pink', 'html': 'lime', 'css': 'turquoise' }; const dom = new JSDOM(); const document = dom.window.document; const toSubType = mtype => mtype.split(';')[0].split('/').pop(); const toColor = stype => colorMap[stype] ? colorMap[stype] : 'black'; process.stdin.resume(); let json = ''; process.stdin.on('data', chunk => json += chunk); process.stdin.on('end', () => { const data = JSON.parse(json); const df = data.log.entries.map( d => { return { 'url': d.request.url, 'subType': toSubType(d.response.content.mimeType), 'bodySize': d.response.bodySize, 'time': d.time }; }); const svg = d3.select(document.body) .append('svg') .attr('xmlns', 'http://www.w3.org/2000/svg') .attr('width', w + margin.left + margin.right) .attr('height', h + margin.top + margin.bottom) .append('g') .attr('transform', `translate(${margin.left}, ${margin.top})`); const x = d3.scaleLinear().range([0, w]).domain(xDomain); const y = d3.scaleLinear().range([h, 0]).domain(yDomain); const xAxis = d3.axisBottom(x); const yAxis = d3.axisLeft(y); // X軸 svg.append('g') .attr('transform', `translate(0, ${h})`) .call(xAxis); // Y軸 svg.append('g') .call(yAxis); // X軸ラベル svg.append('text') .attr('x', w / 2) .attr('y', h + 40) .style('font-size', fontSize) .text('Time (ms)'); // Y軸ラベル svg.append('text') .attr('x', -h / 2) .attr('y', -(margin.left) / 1.5) .style('font-size', fontSize) .attr('transform', 'rotate(-90)') .text('Body Size'); // 円 const point = svg.selectAll('circle') .data(df) .enter() .append('circle'); // 円の設定 point.attr('class', d => d.subType) .attr('cx', d => x(d.time)) .attr('cy', d => y(d.bodySize)) .attr('r', circleRadius) .attr('fill', d => toColor(d.subType)) .append('title') // ツールチップの設定 .text(d => d.url); // 凡例 const legend = svg.selectAll('.legend') .data(d3.entries(colorMap)) .enter() .append('g') .attr('class', 'legend') .attr('transform', (d, i) => { const left = w + margin.left; const top = margin.top + i * legendMargin.top; return `translate(${left}, ${top})`; }); legend.append('circle') .attr('r', circleRadius) .attr('fill', d => d.value); legend.append('text') .attr('x', circleRadius * 2) .attr('y', 4) .style('font-size', fontSize) // マウスイベント処理(該当する円のサイズを変更) .attr('onmouseover', d => `document.querySelectorAll('circle.${d.key}').forEach(d => d.setAttribute('r', ${circleRadius} * 2))`) .attr('onmouseout', d => `document.querySelectorAll('circle.${d.key}').forEach(d => d.setAttribute('r', ${circleRadius}))`) // 凡例のラベル .text(d => d.key); // SVG の出力 console.log(document.body.innerHTML); });
実行例
node index.js < a.har > a.svg
実行結果例
a.svg
b.svg
Python で HAR ファイルから散布図を作成
Web サイトの構成を可視化するため、Python で HAR (HTTP ARchive) ファイルをパースし散布図を試しに作ってみました。
今回作成した散布図の内容は以下の通りです。
- 横軸は time (処理時間 (ミリ秒))
- 縦軸は bodySize (レスポンスボディのサイズ)
- MIME タイプのサブタイプ別に色分け
今回は Anaconda に予め含まれているライブラリを使用する事にします。
今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170514/
はじめに
HAR ファイルは Chrome や Firefox の開発ツールで取得できますが、簡易なものは PhantomJS の examples/netsniff.js
で取得できるので今回はこれを使いました。
HAR の取得
phantomjs netsniff.js <URL>
netsniff.js では console.log
で HAR の内容を出力するようになっています。
HAR ファイルの取得例
phantomjs netsniff.js https://www.・・・ > a.har
HAR ファイルは以下のような JSON 形式のファイルとなっています。
HAR ファイル例
{ "log": { ・・・ "entries": [ { ・・・ "time": 4474, "request": { ・・・ }, "response": { ・・・ "bodySize": 473, "content": { "size": 473, "mimeType": "text/html;charset=UTF-8" } }, "cache": {}, "timings": { ・・・ "wait": 3813, "receive": 661, "ssl": -1 }, ・・・ }, ・・・ ] } }
netsniff.js では、timings 内の wait と receive 以外の値は固定値(0 か -1)が設定されるようになっています。
1. seaborn の pairplot 使用
まずは、seaborn の pairplot
を使って散布図を描画してみます。 (pairplot はデフォルトが散布図になっているようです)
pairplot は基本的にデータ内の数値変数の全ての組み合わせをグラフ化するものですが、x_vars
と y_vars
を指定する事で特定の組み合わせに限定したグラフだけを描画する事も可能です。
色分けする項目は hue
で指定できるようになっています。
複数のグラフをグリッド表示する事を想定しているため、個々のグラフのデフォルトサイズが小さく設定されていますが、size
で変更する事が可能です。
har_scatter_plot1.py
import sys import json import codecs import pandas as pd import seaborn as sns import matplotlib.pyplot as plt # HAR ファイルの読み込みと Python オブジェクト化 data = json.load(codecs.open(sys.argv[1], 'r', 'utf-8')) to_subtype = lambda mtype: mtype.split(';')[0].split('/')[-1] # 特定の項目を抜き出す処理 convert = lambda d: { 'subType': to_subtype(d['response']['content']['mimeType']), 'bodySize': d['response']['bodySize'], 'time': d['time'] } # DataFrame 化 df = pd.DataFrame(list(map(convert, data['log']['entries']))) # 散布図の描画 sns.pairplot(df, hue = 'subType', x_vars = 'time', y_vars = 'bodySize', size = 10) imgfile = "%s_1.png" % sys.argv[1] # グラフの保存 plt.savefig(imgfile)
実行例
python har_scatter_plot1.py a.har
実行結果例
a.har_1.png
b.har_1.png
2. サブタイプ毎に plot
pairplot の結果ではサブタイプ毎の色が固定されておらず、複数の Web ページを比較する用途には適していません。
そこで、サブタイプ毎の散布図を重ねて描画してみました。
har_scatter_plot2.py
import sys import json import codecs import pandas as pd import matplotlib.pyplot as plt ・・・ # サブタイプと色のマッピング color_map = { 'javascript': 'blue', 'x-javascript': 'blue', 'json': 'green', 'gif': 'tomato', 'jpeg': 'red', 'png': 'pink', 'html': 'lime', 'css': 'turquoise' } df = pd.DataFrame(list(map(convert, data['log']['entries']))) ax = plt.figure().add_subplot(1, 1, 1) for (k, c) in color_map.items(): # 指定サブタイプのデータを抽出 sdf = df[df['subType'] == k] if not sdf.empty: # c の色で散布図(scatter)を描画 sdf.plot('time', 'bodySize', 'scatter', ax, c = c, label = k) imgfile = "%s_2.png" % sys.argv[1] plt.savefig(imgfile)
実行例
python har_scatter_plot2.py a.har
実行結果例
a.har_2.png
b.har_2.png
Go で Kafka の Consumer クライアント
下記ライブラリをそれぞれ使って Go 言語で Apache Kafka の Consumer クライアントを作成してみました。
今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170506/
Sarama の場合
まずは sarama を使ってみます。
準備
GOPATH 環境変数を設定して sarama を go get しておきます。
sarama の取得例
> go get github.com/Shopify/sarama
サンプル作成
sarama でメッセージを受信するには以下のように実装すれば良さそうです。
- (1)
sarama.NewConsumer
でConsumer
を作成 - (2) Consumer の
ConsumePartition
で指定のトピック・パーティションからメッセージを受信するためのPartitionConsumer
を作成 - (3) PartitionConsumer の
Messages
で取得したチャネルからメッセージを受信
(2) で sarama.OffsetOldest
を指定する事で Kafka に残っている最も古いオフセットからメッセージを取得します。
for select を使って継続的に Kafka からメッセージを受信するようにしていますが、このままだと処理が終了しないので Ctrl + c で終了するように os.Signal
のチャネルを使っています。
src/consumer-sample/main.go
package main import ( "fmt" "log" "io" "os" "os/signal" "github.com/Shopify/sarama" ) func close(trg io.Closer) { if err := trg.Close(); err != nil { log.Fatalln(err) } } func makeStopChannel() chan os.Signal { // Ctrl + c のチャネル設定 ch := make(chan os.Signal, 1) signal.Notify(ch, os.Interrupt) return ch } func main() { // (1) consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil) if err != nil { panic(err) } defer close(consumer) // (2) ptConsumer, err := consumer.ConsumePartition(os.Args[1], 0, sarama.OffsetOldest) if err != nil { panic(err) } defer close(ptConsumer) ch := makeStopChannel() for { select { // (3) case msg := <- ptConsumer.Messages(): fmt.Printf("topic: %s, offset: %d, key: %s, value: %s\n", msg.Topic, msg.Offset, msg.Key, msg.Value) case <- ch: // 終了 return } } }
ビルドと実行
ビルド例
> cd src/consumer-sample > go build
動作確認のため、前回 に組み込み実行した Kafka へ接続してみます。
実行例
> consumer-sample sample1 topic: sample1, offset: 0, key: a, value: 123 topic: sample1, offset: 1, key: b, value: 456
Ctrl + c を入力して終了します。
グループへ属していないのでオフセットが保存されず、実行の度に上記のような出力となります。
備考 - Enter キーで終了する方法
makeStopChannel の処理内容を以下のように変えてみると、Ctrl + c ではなく Enter キーの入力で終了するようにできます。
src/consumer-sample2/main.go
package main import ( ・・・ "bufio" "github.com/Shopify/sarama" ) ・・・ func makeStopChannel() chan string { ch := make(chan string) reader := bufio.NewReader(os.Stdin) go func() { s, _ := reader.ReadString('\n') ch <- s }() return ch } func main() { ・・・ }
Sarama Cluster の場合
次に sarama-cluster を使ってみます。
sarama-cluster は sarama の拡張で、グループなどへの参加を sarama よりも簡単に実施できるようになっています。 (内部的には sarama の JoinGroupRequest 等を使ってグループを処理しています)
準備
GOPATH 環境変数を設定して sarama-cluster を go get しておきます。
sarama-cluster の取得例
> go get github.com/bsm/sarama-cluster
サンプル作成
sarama-cluster の場合は sarama よりもシンプルで、NewConsumer
で Consumer
を作り、Messages
で取得したチャネルからメッセージを受信できます。
また、MarkOffset
を使ってオフセットを保存します。
src/consumer-group-sample/main.go
package main import ( "fmt" "log" "io" "os" "os/signal" "github.com/Shopify/sarama" cluster "github.com/bsm/sarama-cluster" ) ・・・ func main() { config := cluster.NewConfig() config.Group.Return.Notifications = true config.Consumer.Return.Errors = true config.Consumer.Offsets.Initial = sarama.OffsetOldest topics := []string{os.Args[1]} group := os.Args[2] consumer, err := cluster.NewConsumer([]string{"localhost:9092"}, group, topics, config) if err != nil { panic(err) } defer close(consumer) ch := makeStopChannel() for { select { case msg, more := <- consumer.Messages(): if more { fmt.Printf("topic: %s, offset: %d, key: %s, value: %s\n", msg.Topic, msg.Offset, msg.Key, msg.Value) // オフセットの保存 consumer.MarkOffset(msg, "") } case err, more := <- consumer.Errors(): if more { log.Printf("Error: %+v\n", err.Error()) } case ntf, more := <- consumer.Notifications(): if more { log.Printf("Notification: %+v\n", ntf) } case <- ch: return } } }
ビルドと実行
ビルド例
> cd src/consumer-group-sample > go build
Kafka へ接続して動作確認してみます。
実行例
> consumer-group-sample sample1 g3 ・・・ Notification: &{Claimed:map[sample1:[0]] Released:map[] Current:map[sample1:[0]]} topic: sample1, offset: 0, key: a, value: 123 topic: sample1, offset: 1, key: b, value: 456
クラスター実行
同一グループに所属する Consumer クライアント(上記サンプル)を複数実行し、メッセージの分散受信を試してみます。
Kafka では 1つのパーティションは 1つの Consumer に割り当てられるようで(グループ毎)、パーティションが 1つしかない状態で同一グループに属する Consumer を増やしても分散の効果はありません。
そこで、まずはパーティション数を増やしておきます。
既存トピックのパーティション数を変更するには kafka-topics コマンド ※ が使えます。
--alter
オプションを使って --partitions
へ変更後のパーティション数を指定します。
※ kafka-topics コマンドは kafka.admin.TopicCommand クラスの main メソッドを呼び出しているだけです
パーティション数を 3つへ変更する例
> kafka-topics --alter --zookeeper 127.0.0.1:2181 --topic sample1 --partitions 3 WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected Adding partitions succeeded!
同一グループ cg1 に所属する Consumer クライアントを 3つ実行して、メッセージをいくつか送信してみたところ、以下のような結果となりました。
A
> consumer-group-sample sample1 cg1 ・・・ Notification: &{Claimed:map[sample1:[0 1 2]] Released:map[] Current:map[sample1:[0 1 2]]} topic: sample1, offset: 0, key: a, value: 123 topic: sample1, offset: 1, key: b, value: 456 ・・・ Notification: &{Claimed:map[sample1:[]] Released:map[sample1:[2]] Current:map[sample1:[0 1]]} ・・・ Notification: &{Claimed:map[sample1:[]] Released:map[sample1:[1]] Current:map[sample1:[0]]} topic: sample1, offset: 2, key: f, value: 678
B
> consumer-group-sample sample1 cg1 ・・・ Notification: &{Claimed:map[sample1:[2]] Released:map[] Current:map[sample1:[2]]} ・・・ Notification: &{Claimed:map[sample1:[1]] Released:map[sample1:[2]] Current:map[sample1:[1]]} topic: sample1, offset: 0, key: c, value: 789
C
> consumer-group-sample sample1 cg1 ・・・ Notification: &{Claimed:map[sample1:[2]] Released:map[] Current:map[sample1:[2]]} topic: sample1, offset: 0, key: d, value: 012 topic: sample1, offset: 1, key: e, value: 345
Notification を見るとそれぞれのクライアントへ割り当てられているパーティションが変化している事を確認できます。
例えば、A は最初 0 1 2 の 3つのパーティションが割り当てられていましたが、B や C の追加に伴って減っていき最終的にパーティション 0 のみとなっています。