Docker で Haskell アプリケーションを作成 - MongoDB 利用
MongoDB へ接続する Haskell アプリケーションを Docker で作成してみました。
以下の Docker イメージを使用します。
ビルドツールは stack を使って、MongoDB への接続には以下のライブラリを使います。
今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170529/
(1) Docker コンテナの実行
Docker で MongoDB と Haskell のコンテナを実行します。
コンテナ間の連携には、deprecated となっている Link 機能(–link)は使わずにユーザー定義ネットワークを使う事にします。
(1.1) bridge ネットワークの作成
bridge ネットワークを新規作成します。
$ docker network create --driver bridge br1
(1.2) MongoDB コンテナの実行
作成したネットワークへ参加するように --net=<ネットワーク名>
を使って MongoDB を docker run します。
$ docker run -d --name mongo1 --net=br1 mongo
(1.3) Haskell コンテナの実行
Haskell も同様に docker run します。
ここでは Docker ホスト側の /vagrant/work をコンテナの /work へマウントしています。
$ docker run -it --name hs1 --net=br1 -v /vagrant/work:/work haskell /bin/bash root@・・・#
確認のため Haskell コンテナ内から MongoDB のコンテナへ ping してみます。
ping 例
root@・・・# ping mongo1 PING mongo1 (172.18.0.2): 56 data bytes 64 bytes from 172.18.0.2: icmp_seq=0 ttl=64 time=0.640 ms 64 bytes from 172.18.0.2: icmp_seq=1 ttl=64 time=0.257 ms ・・・
(2) Haskell アプリケーションの作成
Haskell のコンテナで MongoDB へ接続するアプリケーションを作成します。 (以降の処理は (1.3) で起動した Haskell コンテナ内で実施します)
(2.1) プロジェクトのひな型作成
stack new <プロジェクト名> [<テンプレート名>]
でプロジェクトのひな型を作ります。
プロジェクト作成
root@・・・# cd /work root@・・・# stack new sample1 Downloading template "new-template" to create project "sample1" in sample1/ ... ・・・
プロジェクト名のディレクトリが作られ、各種ファイルが生成されます。
(2.2) 依存ライブラリの取得
MongoDB driver for Haskell を使用するため、<プロジェクト名>.cabal
ファイルの library/build-depends へ mongoDB
を追記します。
今回は Control.Monad.Trans
も使うので mtl
も追記しています。
sample1/sample1.cabal
・・・ library ・・・ build-depends: base >= 4.7 && < 5 , mongoDB , mtl default-language: Haskell2010 ・・・
stack build
でビルドすると未取得の依存ライブラリをダウンロードします。
プロジェクトのビルド(依存ライブラリの取得)
root@・・・# cd sample1 root@・・・# stack build --allow-different-user ・・・ mtl-2.2.1: download mtl-2.2.1: configure mtl-2.2.1: build mtl-2.2.1: copy/register ・・・
ここで --allow-different-user
オプションを付けていますが、今回のケースではこのオプションを付ける必要がありました。(-v でマウントしたディレクトリを使用した事が原因だと思われる)
なお、–allow-different-user 無しで stack build
すると以下のようになりました。
root@・・・# stack build You are not the owner of '/work/sample1/'. Aborting to protect file permissions.Retry with '--allow-different-user' to disable this precaution.
(2.3) MongoDB 接続アプリケーションの実装
とりあえず someFunc
という関数名をそのまま使う事にします。(変更する場合は app/Main.hs も変更します)
OverloadedStrings
と ExtendedDefaultRules
の言語拡張が最低限必要となるようです。
MongoDB driver for Haskell では、MongoDB に対する処理を Action m a
(MonadIO m) で定義し access
関数で処理すればよさそうです。
ここで Action
は type Action = ReaderT MongoContext
と定義されています。
MongoDB のコレクションへ複数ドキュメントを追加するには insertMany
や insertMany_
関数が使えます。(ドキュメント追加の用途では insert
や insertAll
関数等もあります)
関数名の最後の _
の有無は、作成したドキュメントの _id
の値を返すかどうかの違いのようです。(_
の付いている方は返さない)
今回は _id
の値は不要なので insertMany_
の方を使いました。
MongoDB へ登録するドキュメントは [<項目名> =: <値>, ・・・]
という形式で定義できます。
find
関数の結果は Action m Cursor
なので、rest
関数(Cursor -> Action m [Document]
) をバインド(>>=
)して Action m [Document]
を取得しています。
接続先の MongoDB ホスト名は MONGO_HOST
環境変数から取得するようにしてみました。
sample1/src/Lib.hs
{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE ExtendedDefaultRules #-} module Lib ( someFunc ) where import System.Environment import Control.Monad.Trans (lift) import Database.MongoDB someFunc :: IO () someFunc = do mongo <- mongoHost -- MongoDB 接続 pipe <- connect (host mongo) access pipe master "sample" proc close pipe mongoHost :: IO String mongoHost = getEnv "MONGO_HOST" proc :: Action IO () proc = do -- items コレクションへドキュメント追加 insertMany_ "items" [ ["name" =: "item1", "value" =: 1], ["name" =: "item2", "value" =: 2] ] allItems >>= printDocs -- items コレクションの全ドキュメント取得 allItems :: Action IO [Document] allItems = rest =<< find ( select [] "items" ) -- ドキュメントの出力 printDocs :: [Document] -> Action IO () printDocs docs = lift $ mapM_ print docs
(2.4) ビルドと実行
それでは、ビルドして実行します。
ビルド
root@・・・# stack build --allow-different-user ・・・ /work/sample1/.stack-work/install/x86_64-linux/lts-8.15/8.0.2/bin Registering sample1-0.1.0.0...
.stack-work ディレクトリへ(ビルドの)成果物が生成されます。
実行するには stack exec <実行ファイル名>
で実行するか、実行ファイル(例 .stack-work/install/x86_64-linux/lts-8.15/8.0.2/bin/sample1-exe
)を直接実行します。
実行ファイル名はデフォルトで <プロジェクト名>-exe
となるようです。
今回は環境変数から MongoDB のホスト名を取得するようにしたので、MONGO_HOST
環境変数へ MongoDB のコンテナ名を設定してから実行します。
実行
root@・・・# export MONGO_HOST=mongo1 root@・・・# stack exec sample1-exe --allow-different-user [ _id: 592a7ffb2139612699000000, name: "item1", value: 1] [ _id: 592a7ffb2139612699000001, name: "item2", value: 2]
D3.js で HAR ファイルから散布図を作成
前回、HAR (HTTP ARchive) ファイルから Python で作成した散布図を D3.js を使って SVG として作ってみました。
ソースは http://github.com/fits/try_samples/tree/master/blog/20170515/
はじめに
Node.js で D3.js を使用するために d3
と jsdom
をインストールしておきます。
依存ライブラリのインストール
npm install --save d3 jsdom
package.json
{ ・・・ "dependencies": { "d3": "^4.8.0", "jsdom": "^10.1.0" } }
散布図(SVG)の作成
標準入力から HAR ファイルの内容(JSON)を受け取り、time と bodySize の該当位置へ円を描画する事で散布図を表現してみました。
円の色をサブタイプ毎に分けることで色分けしています。
ついでに、ツールチップの表示やマウスイベント処理を追加してみました。
Web ブラウザで SVG ファイルを開いた際にマウスイベントを処理できるように attr
を使ってマウスイベント処理(該当する円のサイズを変更)を設定しています。
index.js
const d3 = require('d3'); const jsdom = require('jsdom'); const { JSDOM } = jsdom; const w = 800; const h = 500; const margin = { left: 80, top: 20, bottom: 50, right: 200 }; const legendMargin = { top: 30 }; const fontSize = '12pt'; const circleRadius = 5; // X軸(time)の値の範囲 const xDomain = [0, 5000]; // Y軸(bodySize)の値の範囲 const yDomain = [0, 180000]; const colorMap = { 'javascript': 'blue', 'x-javascript': 'blue', 'json': 'green', 'gif': 'tomato', 'jpeg': 'red', 'png': 'pink', 'html': 'lime', 'css': 'turquoise' }; const dom = new JSDOM(); const document = dom.window.document; const toSubType = mtype => mtype.split(';')[0].split('/').pop(); const toColor = stype => colorMap[stype] ? colorMap[stype] : 'black'; process.stdin.resume(); let json = ''; process.stdin.on('data', chunk => json += chunk); process.stdin.on('end', () => { const data = JSON.parse(json); const df = data.log.entries.map( d => { return { 'url': d.request.url, 'subType': toSubType(d.response.content.mimeType), 'bodySize': d.response.bodySize, 'time': d.time }; }); const svg = d3.select(document.body) .append('svg') .attr('xmlns', 'http://www.w3.org/2000/svg') .attr('width', w + margin.left + margin.right) .attr('height', h + margin.top + margin.bottom) .append('g') .attr('transform', `translate(${margin.left}, ${margin.top})`); const x = d3.scaleLinear().range([0, w]).domain(xDomain); const y = d3.scaleLinear().range([h, 0]).domain(yDomain); const xAxis = d3.axisBottom(x); const yAxis = d3.axisLeft(y); // X軸 svg.append('g') .attr('transform', `translate(0, ${h})`) .call(xAxis); // Y軸 svg.append('g') .call(yAxis); // X軸ラベル svg.append('text') .attr('x', w / 2) .attr('y', h + 40) .style('font-size', fontSize) .text('Time (ms)'); // Y軸ラベル svg.append('text') .attr('x', -h / 2) .attr('y', -(margin.left) / 1.5) .style('font-size', fontSize) .attr('transform', 'rotate(-90)') .text('Body Size'); // 円 const point = svg.selectAll('circle') .data(df) .enter() .append('circle'); // 円の設定 point.attr('class', d => d.subType) .attr('cx', d => x(d.time)) .attr('cy', d => y(d.bodySize)) .attr('r', circleRadius) .attr('fill', d => toColor(d.subType)) .append('title') // ツールチップの設定 .text(d => d.url); // 凡例 const legend = svg.selectAll('.legend') .data(d3.entries(colorMap)) .enter() .append('g') .attr('class', 'legend') .attr('transform', (d, i) => { const left = w + margin.left; const top = margin.top + i * legendMargin.top; return `translate(${left}, ${top})`; }); legend.append('circle') .attr('r', circleRadius) .attr('fill', d => d.value); legend.append('text') .attr('x', circleRadius * 2) .attr('y', 4) .style('font-size', fontSize) // マウスイベント処理(該当する円のサイズを変更) .attr('onmouseover', d => `document.querySelectorAll('circle.${d.key}').forEach(d => d.setAttribute('r', ${circleRadius} * 2))`) .attr('onmouseout', d => `document.querySelectorAll('circle.${d.key}').forEach(d => d.setAttribute('r', ${circleRadius}))`) // 凡例のラベル .text(d => d.key); // SVG の出力 console.log(document.body.innerHTML); });
実行例
node index.js < a.har > a.svg
実行結果例
a.svg
b.svg
Python で HAR ファイルから散布図を作成
Web サイトの構成を可視化するため、Python で HAR (HTTP ARchive) ファイルをパースし散布図を試しに作ってみました。
今回作成した散布図の内容は以下の通りです。
- 横軸は time (処理時間 (ミリ秒))
- 縦軸は bodySize (レスポンスボディのサイズ)
- MIME タイプのサブタイプ別に色分け
今回は Anaconda に予め含まれているライブラリを使用する事にします。
今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170514/
はじめに
HAR ファイルは Chrome や Firefox の開発ツールで取得できますが、簡易なものは PhantomJS の examples/netsniff.js
で取得できるので今回はこれを使いました。
HAR の取得
phantomjs netsniff.js <URL>
netsniff.js では console.log
で HAR の内容を出力するようになっています。
HAR ファイルの取得例
phantomjs netsniff.js https://www.・・・ > a.har
HAR ファイルは以下のような JSON 形式のファイルとなっています。
HAR ファイル例
{ "log": { ・・・ "entries": [ { ・・・ "time": 4474, "request": { ・・・ }, "response": { ・・・ "bodySize": 473, "content": { "size": 473, "mimeType": "text/html;charset=UTF-8" } }, "cache": {}, "timings": { ・・・ "wait": 3813, "receive": 661, "ssl": -1 }, ・・・ }, ・・・ ] } }
netsniff.js では、timings 内の wait と receive 以外の値は固定値(0 か -1)が設定されるようになっています。
1. seaborn の pairplot 使用
まずは、seaborn の pairplot
を使って散布図を描画してみます。 (pairplot はデフォルトが散布図になっているようです)
pairplot は基本的にデータ内の数値変数の全ての組み合わせをグラフ化するものですが、x_vars
と y_vars
を指定する事で特定の組み合わせに限定したグラフだけを描画する事も可能です。
色分けする項目は hue
で指定できるようになっています。
複数のグラフをグリッド表示する事を想定しているため、個々のグラフのデフォルトサイズが小さく設定されていますが、size
で変更する事が可能です。
har_scatter_plot1.py
import sys import json import codecs import pandas as pd import seaborn as sns import matplotlib.pyplot as plt # HAR ファイルの読み込みと Python オブジェクト化 data = json.load(codecs.open(sys.argv[1], 'r', 'utf-8')) to_subtype = lambda mtype: mtype.split(';')[0].split('/')[-1] # 特定の項目を抜き出す処理 convert = lambda d: { 'subType': to_subtype(d['response']['content']['mimeType']), 'bodySize': d['response']['bodySize'], 'time': d['time'] } # DataFrame 化 df = pd.DataFrame(list(map(convert, data['log']['entries']))) # 散布図の描画 sns.pairplot(df, hue = 'subType', x_vars = 'time', y_vars = 'bodySize', size = 10) imgfile = "%s_1.png" % sys.argv[1] # グラフの保存 plt.savefig(imgfile)
実行例
python har_scatter_plot1.py a.har
実行結果例
a.har_1.png
b.har_1.png
2. サブタイプ毎に plot
pairplot の結果ではサブタイプ毎の色が固定されておらず、複数の Web ページを比較する用途には適していません。
そこで、サブタイプ毎の散布図を重ねて描画してみました。
har_scatter_plot2.py
import sys import json import codecs import pandas as pd import matplotlib.pyplot as plt ・・・ # サブタイプと色のマッピング color_map = { 'javascript': 'blue', 'x-javascript': 'blue', 'json': 'green', 'gif': 'tomato', 'jpeg': 'red', 'png': 'pink', 'html': 'lime', 'css': 'turquoise' } df = pd.DataFrame(list(map(convert, data['log']['entries']))) ax = plt.figure().add_subplot(1, 1, 1) for (k, c) in color_map.items(): # 指定サブタイプのデータを抽出 sdf = df[df['subType'] == k] if not sdf.empty: # c の色で散布図(scatter)を描画 sdf.plot('time', 'bodySize', 'scatter', ax, c = c, label = k) imgfile = "%s_2.png" % sys.argv[1] plt.savefig(imgfile)
実行例
python har_scatter_plot2.py a.har
実行結果例
a.har_2.png
b.har_2.png
Go で Kafka の Consumer クライアント
下記ライブラリをそれぞれ使って Go 言語で Apache Kafka の Consumer クライアントを作成してみました。
今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170506/
Sarama の場合
まずは sarama を使ってみます。
準備
GOPATH 環境変数を設定して sarama を go get しておきます。
sarama の取得例
> go get github.com/Shopify/sarama
サンプル作成
sarama でメッセージを受信するには以下のように実装すれば良さそうです。
- (1)
sarama.NewConsumer
でConsumer
を作成 - (2) Consumer の
ConsumePartition
で指定のトピック・パーティションからメッセージを受信するためのPartitionConsumer
を作成 - (3) PartitionConsumer の
Messages
で取得したチャネルからメッセージを受信
(2) で sarama.OffsetOldest
を指定する事で Kafka に残っている最も古いオフセットからメッセージを取得します。
for select を使って継続的に Kafka からメッセージを受信するようにしていますが、このままだと処理が終了しないので Ctrl + c で終了するように os.Signal
のチャネルを使っています。
src/consumer-sample/main.go
package main import ( "fmt" "log" "io" "os" "os/signal" "github.com/Shopify/sarama" ) func close(trg io.Closer) { if err := trg.Close(); err != nil { log.Fatalln(err) } } func makeStopChannel() chan os.Signal { // Ctrl + c のチャネル設定 ch := make(chan os.Signal, 1) signal.Notify(ch, os.Interrupt) return ch } func main() { // (1) consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil) if err != nil { panic(err) } defer close(consumer) // (2) ptConsumer, err := consumer.ConsumePartition(os.Args[1], 0, sarama.OffsetOldest) if err != nil { panic(err) } defer close(ptConsumer) ch := makeStopChannel() for { select { // (3) case msg := <- ptConsumer.Messages(): fmt.Printf("topic: %s, offset: %d, key: %s, value: %s\n", msg.Topic, msg.Offset, msg.Key, msg.Value) case <- ch: // 終了 return } } }
ビルドと実行
ビルド例
> cd src/consumer-sample > go build
動作確認のため、前回 に組み込み実行した Kafka へ接続してみます。
実行例
> consumer-sample sample1 topic: sample1, offset: 0, key: a, value: 123 topic: sample1, offset: 1, key: b, value: 456
Ctrl + c を入力して終了します。
グループへ属していないのでオフセットが保存されず、実行の度に上記のような出力となります。
備考 - Enter キーで終了する方法
makeStopChannel の処理内容を以下のように変えてみると、Ctrl + c ではなく Enter キーの入力で終了するようにできます。
src/consumer-sample2/main.go
package main import ( ・・・ "bufio" "github.com/Shopify/sarama" ) ・・・ func makeStopChannel() chan string { ch := make(chan string) reader := bufio.NewReader(os.Stdin) go func() { s, _ := reader.ReadString('\n') ch <- s }() return ch } func main() { ・・・ }
Sarama Cluster の場合
次に sarama-cluster を使ってみます。
sarama-cluster は sarama の拡張で、グループなどへの参加を sarama よりも簡単に実施できるようになっています。 (内部的には sarama の JoinGroupRequest 等を使ってグループを処理しています)
準備
GOPATH 環境変数を設定して sarama-cluster を go get しておきます。
sarama-cluster の取得例
> go get github.com/bsm/sarama-cluster
サンプル作成
sarama-cluster の場合は sarama よりもシンプルで、NewConsumer
で Consumer
を作り、Messages
で取得したチャネルからメッセージを受信できます。
また、MarkOffset
を使ってオフセットを保存します。
src/consumer-group-sample/main.go
package main import ( "fmt" "log" "io" "os" "os/signal" "github.com/Shopify/sarama" cluster "github.com/bsm/sarama-cluster" ) ・・・ func main() { config := cluster.NewConfig() config.Group.Return.Notifications = true config.Consumer.Return.Errors = true config.Consumer.Offsets.Initial = sarama.OffsetOldest topics := []string{os.Args[1]} group := os.Args[2] consumer, err := cluster.NewConsumer([]string{"localhost:9092"}, group, topics, config) if err != nil { panic(err) } defer close(consumer) ch := makeStopChannel() for { select { case msg, more := <- consumer.Messages(): if more { fmt.Printf("topic: %s, offset: %d, key: %s, value: %s\n", msg.Topic, msg.Offset, msg.Key, msg.Value) // オフセットの保存 consumer.MarkOffset(msg, "") } case err, more := <- consumer.Errors(): if more { log.Printf("Error: %+v\n", err.Error()) } case ntf, more := <- consumer.Notifications(): if more { log.Printf("Notification: %+v\n", ntf) } case <- ch: return } } }
ビルドと実行
ビルド例
> cd src/consumer-group-sample > go build
Kafka へ接続して動作確認してみます。
実行例
> consumer-group-sample sample1 g3 ・・・ Notification: &{Claimed:map[sample1:[0]] Released:map[] Current:map[sample1:[0]]} topic: sample1, offset: 0, key: a, value: 123 topic: sample1, offset: 1, key: b, value: 456
クラスター実行
同一グループに所属する Consumer クライアント(上記サンプル)を複数実行し、メッセージの分散受信を試してみます。
Kafka では 1つのパーティションは 1つの Consumer に割り当てられるようで(グループ毎)、パーティションが 1つしかない状態で同一グループに属する Consumer を増やしても分散の効果はありません。
そこで、まずはパーティション数を増やしておきます。
既存トピックのパーティション数を変更するには kafka-topics コマンド ※ が使えます。
--alter
オプションを使って --partitions
へ変更後のパーティション数を指定します。
※ kafka-topics コマンドは kafka.admin.TopicCommand クラスの main メソッドを呼び出しているだけです
パーティション数を 3つへ変更する例
> kafka-topics --alter --zookeeper 127.0.0.1:2181 --topic sample1 --partitions 3 WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected Adding partitions succeeded!
同一グループ cg1 に所属する Consumer クライアントを 3つ実行して、メッセージをいくつか送信してみたところ、以下のような結果となりました。
A
> consumer-group-sample sample1 cg1 ・・・ Notification: &{Claimed:map[sample1:[0 1 2]] Released:map[] Current:map[sample1:[0 1 2]]} topic: sample1, offset: 0, key: a, value: 123 topic: sample1, offset: 1, key: b, value: 456 ・・・ Notification: &{Claimed:map[sample1:[]] Released:map[sample1:[2]] Current:map[sample1:[0 1]]} ・・・ Notification: &{Claimed:map[sample1:[]] Released:map[sample1:[1]] Current:map[sample1:[0]]} topic: sample1, offset: 2, key: f, value: 678
B
> consumer-group-sample sample1 cg1 ・・・ Notification: &{Claimed:map[sample1:[2]] Released:map[] Current:map[sample1:[2]]} ・・・ Notification: &{Claimed:map[sample1:[1]] Released:map[sample1:[2]] Current:map[sample1:[1]]} topic: sample1, offset: 0, key: c, value: 789
C
> consumer-group-sample sample1 cg1 ・・・ Notification: &{Claimed:map[sample1:[2]] Released:map[] Current:map[sample1:[2]]} topic: sample1, offset: 0, key: d, value: 012 topic: sample1, offset: 1, key: e, value: 345
Notification を見るとそれぞれのクライアントへ割り当てられているパーティションが変化している事を確認できます。
例えば、A は最初 0 1 2 の 3つのパーティションが割り当てられていましたが、B や C の追加に伴って減っていき最終的にパーティション 0 のみとなっています。
Groovy で Kafka を組み込み実行
Groovy で Apache Kafka を組み込み実行してみました。
今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170410/
Kafka 組み込み実行
Kafka の実行には ZooKeeper が必要なため、ZooKeeper と Kafka の両方を組み込み実行します。
ZooKeeperServerMain を実行 (initializeAndRun
) すると処理をブロックしてしまい、後続の処理を実行できないので、今回は別スレッドで実行するようにしました。
initializeAndRun メソッドは以下のように配列の要素数によって引数の解釈が異なるようです。
引数の数 | 引数の内容 |
---|---|
1つ | 設定ファイルのパス |
2つ以上 | ポート番号, データディレクトリ, tickTime, maxClientCnxns |
kafka_embed.groovy
@Grab('org.apache.kafka:kafka_2.12:0.10.2.0') @Grab('org.apache.zookeeper:zookeeper:3.5.2-alpha') import kafka.server.KafkaServerStartable import org.apache.zookeeper.server.ZooKeeperServerMain def zkPort = '2181' def zkDir = 'zk-tmp' def kafkaDir = 'kafka-logs' def zk = new ZooKeeperServerMain() Thread.start { // ZooKeeper の実行 zk.initializeAndRun([zkPort, zkDir] as String[]) } def kafkaProps = new Properties() kafkaProps.setProperty('zookeeper.connect', "localhost:${zkPort}") kafkaProps.setProperty('log.dir', kafkaDir) def kafka = KafkaServerStartable.fromProps(kafkaProps) // Kafka の実行 kafka.startup() println 'startup ...' System.in.read() kafka.shutdown() zk.shutdown()
Groovy のデフォルト設定ではメモリ不足で起動に失敗したため、JAVA_OPTS
環境変数で最大メモリサイズを変更して実行します。
実行
> set JAVA_OPTS=-Xmx512m > groovy kafka_embed.groovy startup ・・・
備考
ZooKeeper を組み込み実行するには、Apache Curator の org.apache.curator.test.TestingServer
を使う方法もあります。
TestingServer の close 時に ZooKeeper のデータディレクトリを削除しないようにするには InstanceSpec
を使います。 (コンストラクタの第5引数を false にすると削除しなくなる)
kafka_embed_curator.groovy
@Grab('org.apache.kafka:kafka_2.12:0.10.2.0') @Grapes([ @Grab('org.apache.curator:curator-test:3.3.0'), @GrabExclude('io.netty#netty:3.7.0.Final') ]) import kafka.server.KafkaServerStartable import org.apache.curator.test.TestingServer import org.apache.curator.test.InstanceSpec def zkPort = 2181 def zkDir = 'zk-tmp' def kafkaDir = 'kafka-logs' // close 時にデータディレクトリを残すように false を指定 def spec = new InstanceSpec(new File(zkDir), zkPort, -1, -1, false, -1) def props = new Properties() props.setProperty('zookeeper.connect', "localhost:${zkPort}") props.setProperty('log.dir', kafkaDir) // 第2引数を true にするとコンストラクタ内で start メソッドを実行する new TestingServer(spec, false).withCloseable { zk -> zk.start() def kafka = KafkaServerStartable.fromProps(props) kafka.startup() println 'startup ...' System.in.read() kafka.shutdown() zk.stop() }
Kafka クライアント
ついでに、Kafka の各種クライアント処理も Groovy で実装してみます。
a. KafkaProducer でメッセージ送信
まずは KafkaProducer を使った Kafka へのメッセージ送信処理です。
メッセージはトピックへ送信する事になり、トピックが未作成の場合は自動的に作成されます。(kafka.admin.TopicCommand
等でトピックを事前に作成しておく事も可能)
bootstrap.servers
で接続先の Kafka を指定します。
kafka_client_producer.groovy
@Grab('org.apache.kafka:kafka-clients:0.10.2.0') @Grab('org.slf4j:slf4j-simple:1.7.24') import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.clients.producer.ProducerRecord def topic = args[0] def key = args[1] def value = args[2] def props = new Properties() props.setProperty('bootstrap.servers', 'localhost:9092') props.setProperty('key.serializer', 'org.apache.kafka.common.serialization.StringSerializer') props.setProperty('value.serializer', 'org.apache.kafka.common.serialization.StringSerializer') new KafkaProducer(props).withCloseable { producer -> def res = producer.send(new ProducerRecord(topic, key, value)) println "***** result: ${res.get()}" }
KafkaProducer 実行例
> groovy kafka_client_producer.groovy sample1 a 123 ・・・ ***** result: sample1-0@0 ・・・ > groovy kafka_client_producer.groovy sample1 b 456 ・・・ ***** result: sample1-0@1 ・・・
b. KafkaConsumer でメッセージ受信
次に KafkaConsumer でメッセージを受信してみます。
トピックを subscribe
する事でメッセージを受信します。
デフォルトでは subscribe 後に送信されたメッセージを受信する事になります。
送信済みのメッセージも受信するには auto.offset.reset
へ earliest
を設定します。
Kafka では group.id
で指定したグループ ID 毎にメッセージの Offset (どのメッセージまでを受信したか) が管理されます。
複数のクライアントが同一グループ ID に属している場合は、その中の 1つがメッセージを受信する事になるようです。
kafka_client_consumer.groovy
@Grab('org.apache.kafka:kafka-clients:0.10.2.0') @Grab('org.slf4j:slf4j-simple:1.7.24') import org.apache.kafka.clients.consumer.KafkaConsumer import java.util.concurrent.CountDownLatch import java.util.concurrent.Executors def topic = args[0] def group = args[1] def props = new Properties() props.setProperty('bootstrap.servers', 'localhost:9092') props.setProperty('group.id', group) props.setProperty('key.deserializer', 'org.apache.kafka.common.serialization.StringDeserializer') props.setProperty('value.deserializer', 'org.apache.kafka.common.serialization.StringDeserializer') props.setProperty('auto.offset.reset', 'earliest') def stopLatch = new CountDownLatch(1) def es = Executors.newSingleThreadExecutor() es.submit { new KafkaConsumer(props).withCloseable { consumer -> consumer.subscribe([topic]) while(stopLatch.count > 0) { def records = consumer.poll(1000) records.each { println "***** result: ${it}" } } } } System.in.read() stopLatch.countDown() es.shutdown()
KafkaConsumer 実行例
> groovy kafka_client_consumer.groovy sample1 g1 ・・・ ***** result: ConsumerRecord(topic = sample1, partition = 0, offset = 0, CreateTime = 1491758385860, checksum = 1240240547, serialized key size = 1, serialized value size = 3, key = a, value = 123) ***** result: ConsumerRecord(topic = sample1, partition = 0, offset = 1, CreateTime = 1491758400116, checksum = 728766236, serialized key size = 1, serialized value size = 3, key = b, value = 456)
c. KafkaStreams でメッセージ受信
KafkaStreams を使ってメッセージを受信します。
グループ ID は application.id
で指定するようです。
kafka_client_stream.groovy
@Grab('org.apache.kafka:kafka-streams:0.10.2.0') @Grab('org.slf4j:slf4j-simple:1.7.24') import org.apache.kafka.streams.KafkaStreams import org.apache.kafka.streams.kstream.KStreamBuilder import org.apache.kafka.common.serialization.Serdes def topic = args[0] def group = args[1] def props = new Properties() props.put('application.id', group) props.put('bootstrap.servers', 'localhost:9092') props.put('key.serde', Serdes.String().class) props.put('value.serde', Serdes.String().class) def builder = new KStreamBuilder() builder.stream(topic).print() def streams = new KafkaStreams(builder, props) streams.start() System.in.read() streams.close()
動作確認のため、未使用のグループ ID を使って実行します。
KafkaStreams 実行例
> groovy kafka_client_stream.groovy sample1 g2 ・・・ [KSTREAM-SOURCE-0000000000]: a , 123 [KSTREAM-SOURCE-0000000000]: b , 456
d. KafkaConsumerGroupService で Offset 確認
KafkaConsumerGroupService を使ってグループ ID 毎の Offset を確認してみます。
kafka_group_offset.groovy
@Grab('org.apache.kafka:kafka_2.12:0.10.2.0') import kafka.admin.ConsumerGroupCommand.ConsumerGroupCommandOptions import kafka.admin.ConsumerGroupCommand.KafkaConsumerGroupService def group = args[0] def params = ['--bootstrap-server', 'localhost:9092', '--group', group] as String[] def opts = new ConsumerGroupCommandOptions(params) def svc = new KafkaConsumerGroupService(opts) def res = svc.describeGroup() res._2.foreach { it.foreach { st -> println "topic = ${st.topic.value}, offset = ${st.offset.value}, partition = ${st.partition.value}" } } svc.close()
KafkaConsumerGroupService 実行例
> groovy kafka_group_offset.groovy g1 topic = sample1, offset = 2, partition = 0
Java で Apache Beam を使用
前回 と同等の処理を Apache Beam を使って実装してみます。
今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170327/
サンプルアプリケーション
Beam では Pipeline
の apply
メソッドで処理を繋げるようですので、今回は以下のように実装してみました。
- (1)
Count.perElement
メソッドを使って要素毎にカウントしたKV<String, Long>
を取得 - (2)
ToString.kvs
メソッドを使ってKV
の Key と Value の値を連結して文字列化 - (3)
DoFn
で@ProcessElement
を付与したメソッドを実装し (2) で取得した文字列を標準出力
apply
メソッドの引数に使用する PTransform
は org.apache.beam.sdk.transforms
パッケージ下に主要なものが用意されているようです。
標準出力を行うための基本作法が分からなかったので、今回は DoFn
を使っています。 (他に MapElements.via(SimpleFunction)
を使う方法等も考えられます)
DoFn
ではアノテーションを使って処理メソッドを指定するようになっており、入力要素を 1件ずつ処理するための @ProcessElement
アノテーションの他にもいくつか用意されているようです。(例えば @Setup
や @StartBundle
等)
また、アノテーションを付与したメソッドは引数の型等をチェックするようになっています。 (org.apache.beam.sdk.transforms.reflect.DoFnSignatures
等のソース参照)
src/main/java/MoneyCount.java
import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.ToString; public class MoneyCount { public static void main(String... args) throws Exception { PipelineOptions opt = PipelineOptionsFactory.create(); Pipeline p = Pipeline.create(opt); p.apply(TextIO.Read.from(args[0])) .apply(Count.perElement()) // (1) .apply(ToString.kvs()) // (2) .apply(ParDo.of(new DoFn<String, String>() { // (3) @ProcessElement public void process(ProcessContext ctx) { System.out.println(ctx.element()); } })); p.run().waitUntilFinish(); } }
実行
以下のビルド定義ファイルを使って実行します。
今回は DirectRunner
(beam-runners-direct-java
) で実行していますが、Apache Spark や Flink 等で実行する方法も用意されています。
build.gradle
apply plugin: 'application' mainClassName = 'SampleApp' repositories { jcenter() } dependencies { compile 'org.apache.beam:beam-sdks-java-core:0.6.0' runtime 'org.apache.beam:beam-runners-direct-java:0.6.0' runtime 'org.slf4j:slf4j-nop:1.7.25' } run { if (project.hasProperty('args')) { args project.args } }
実行結果は以下の通りです。 なお、出力順は実行の度に変化します。
実行結果
> gradle run -q -Pargs=input_sample.txt 10000,2 5,3 2000,1 1000,3 10,2 500,1 100,2 1,2 50,1
input_sample.txt の内容は以下の通りです。
input_sample.txt
100 1 5 50 500 1000 10000 1000 1 10 5 5 10 100 1000 10000 2000
Java 8 で Apache Flink を使用
前回 と同様の処理を Java8 のラムダ式を使って実装してみました。
今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170313/
サンプル
前回 の処理をラムダ式を使って Java で実装すると以下のようになりました。
MoneyCount.java
import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; public class MoneyCount { public static void main(String... args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); env.readTextFile(args[0]) .map( w -> new Tuple2<>(w, 1) ) .groupBy(0) .sum(1) .print(); } }
実行
Flink 1.2.0 では上記のように map 等の引数へラムダ式を使った場合、通常の JDK でコンパイルすると実行時にエラーが発生してしまいます。
(a) JDK でコンパイルして実行
- javac 1.8.0_121
以下の Gradle ビルド定義を使って実行してみます。
build.gradle
apply plugin: 'application' mainClassName = 'MoneyCount' repositories { jcenter() } dependencies { compile 'org.apache.flink:flink-java:1.2.0' runtime 'org.apache.flink:flink-clients_2.11:1.2.0' } run { if (project.hasProperty('args')) { args project.args } }
実行すると以下のようにエラーとなりました。
Tuple2 の型引数が失われている点が問題となっており、Eclipse JDT compiler でコンパイルしなければならないようです。
実行結果 ※
> gradle run -q -Pargs=input_sample.txt Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'main(MoneyCount.java:10)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface. at org.apache.flink.api.java.DataSet.getType(DataSet.java:174) at org.apache.flink.api.java.DataSet.groupBy(DataSet.java:692) at MoneyCount.main(MoneyCount.java:11) Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Tuple2' are missing. It seems that your compiler has not stored them into the .class file. Currently, only the Eclipse JDT compiler preserves the type information necessary to use the lambdas feature type-safely. See the documentation for more information about how to compile jobs containing lambda expressions. at org.apache.flink.api.java.typeutils.TypeExtractor.validateLambdaGenericParameter(TypeExtractor.java:1528) ・・・
※ 出力結果には改行を適当に加えています
(b) Eclipse JDT Compiler でコンパイル(-genericsignature オプション付)して実行
- ecj 4.6.1
次に Eclipse JDT Compiler でコンパイルし実行してみます。
Eclipse JDT Compiler は java -jar ecj.jar ・・・
で実行できるので、Gradle で実施する場合は compileJava
の forkOptions
を使って設定します。
今回のケースでは、Eclipse JDT Compiler を -genericsignature
オプション付きで実行する点が重要です。(付けない場合は JDK と同じ結果になります)
build-ecj.gradle
apply plugin: 'application' mainClassName = 'MoneyCount' configurations { ecj } repositories { jcenter() } dependencies { compile 'org.apache.flink:flink-java:1.2.0' runtime 'org.apache.flink:flink-clients_2.11:1.2.0' ecj 'org.eclipse.jdt.core.compiler:ecj:4.6.1' // 以下でも可 //ecj 'org.eclipse.scout.sdk.deps:ecj:4.6.2' } // Eclipse JDT Compiler の設定 compileJava { options.fork = true // -genericsignature オプションの指定 options.compilerArgs << '-genericsignature' // java -jar ecj.jar を実行するための設定 options.forkOptions.with { executable = 'java' jvmArgs = ['-jar', configurations.ecj.asPath] } } run { if (project.hasProperty('args')) { args project.args } }
実行結果は以下の通り、正常に実行できるようになりました。
実行結果
> gradle run -q -b build-ecj.gradle -Pargs=input_sample.txt Connected to JobManager at Actor[akka://flink/user/jobmanager_1#1055289406] 03/13/2017 20:40:46 Job execution switched to status RUNNING. ・・・ 03/13/2017 20:40:47 Job execution switched to status FINISHED. (10000,2) (10,2) (100,2) (50,1) (500,1) (1,2) (1000,3) (2000,1) (5,3)
input_sample.txt の内容は以下の通りです。
input_sample.txt
100 1 5 50 500 1000 10000 1000 1 10 5 5 10 100 1000 10000 2000
コンパイル結果の比較
最後に、コンパイル結果(.class ファイル) を CFR で処理(以下のコマンドを適用)して違いを確認しておきます。 (--decodelambdas false
オプションでラムダ式の部分をメソッドとして残すようにしています)
java -jar cfr_0_120.jar MoneyCount.class --decodelambdas false
まずは JDK(javac 1.8.0_121)のコンパイル結果を確認してみます。
(a) JDK でコンパイルした場合(CFR の処理結果)
・・・ public class MoneyCount { public static /* varargs */ void main(String ... args) throws Exception { ・・・ } private static /* synthetic */ Tuple2 lambda$main$95f17bfa$1(String w) throws Exception { return new Tuple2((Object)w, (Object)1); } }
lambda$main$95f17bfa$1 の戻り値の型が Tuple2
となっており、型引数が失われています。(これが実行時のエラー原因)
次に Eclipse JDT Compiler のコンパイル結果を確認してみます。
(b) Eclipse JDT Compiler でコンパイル(-genericsignature オプション付)した場合(CFR の処理結果)
・・・ public class MoneyCount { public static /* varargs */ void main(String ... args) throws Exception { ・・・ } private static /* synthetic */ Tuple2<String, Integer> lambda$0(String w) throws Exception { return new Tuple2((Object)w, (Object)1); } }
lambda$0 の戻り値の型が Tuple2<String, Integer>
となっており、型引数が失われずに残っています。(これが実行に成功した理由)