Go で Kafka の Consumer クライアント
下記ライブラリをそれぞれ使って Go 言語で Apache Kafka の Consumer クライアントを作成してみました。
今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170506/
Sarama の場合
まずは sarama を使ってみます。
準備
GOPATH 環境変数を設定して sarama を go get しておきます。
sarama の取得例
> go get github.com/Shopify/sarama
サンプル作成
sarama でメッセージを受信するには以下のように実装すれば良さそうです。
- (1)
sarama.NewConsumer
でConsumer
を作成 - (2) Consumer の
ConsumePartition
で指定のトピック・パーティションからメッセージを受信するためのPartitionConsumer
を作成 - (3) PartitionConsumer の
Messages
で取得したチャネルからメッセージを受信
(2) で sarama.OffsetOldest
を指定する事で Kafka に残っている最も古いオフセットからメッセージを取得します。
for select を使って継続的に Kafka からメッセージを受信するようにしていますが、このままだと処理が終了しないので Ctrl + c で終了するように os.Signal
のチャネルを使っています。
src/consumer-sample/main.go
package main import ( "fmt" "log" "io" "os" "os/signal" "github.com/Shopify/sarama" ) func close(trg io.Closer) { if err := trg.Close(); err != nil { log.Fatalln(err) } } func makeStopChannel() chan os.Signal { // Ctrl + c のチャネル設定 ch := make(chan os.Signal, 1) signal.Notify(ch, os.Interrupt) return ch } func main() { // (1) consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil) if err != nil { panic(err) } defer close(consumer) // (2) ptConsumer, err := consumer.ConsumePartition(os.Args[1], 0, sarama.OffsetOldest) if err != nil { panic(err) } defer close(ptConsumer) ch := makeStopChannel() for { select { // (3) case msg := <- ptConsumer.Messages(): fmt.Printf("topic: %s, offset: %d, key: %s, value: %s\n", msg.Topic, msg.Offset, msg.Key, msg.Value) case <- ch: // 終了 return } } }
ビルドと実行
ビルド例
> cd src/consumer-sample > go build
動作確認のため、前回 に組み込み実行した Kafka へ接続してみます。
実行例
> consumer-sample sample1 topic: sample1, offset: 0, key: a, value: 123 topic: sample1, offset: 1, key: b, value: 456
Ctrl + c を入力して終了します。
グループへ属していないのでオフセットが保存されず、実行の度に上記のような出力となります。
備考 - Enter キーで終了する方法
makeStopChannel の処理内容を以下のように変えてみると、Ctrl + c ではなく Enter キーの入力で終了するようにできます。
src/consumer-sample2/main.go
package main import ( ・・・ "bufio" "github.com/Shopify/sarama" ) ・・・ func makeStopChannel() chan string { ch := make(chan string) reader := bufio.NewReader(os.Stdin) go func() { s, _ := reader.ReadString('\n') ch <- s }() return ch } func main() { ・・・ }
Sarama Cluster の場合
次に sarama-cluster を使ってみます。
sarama-cluster は sarama の拡張で、グループなどへの参加を sarama よりも簡単に実施できるようになっています。 (内部的には sarama の JoinGroupRequest 等を使ってグループを処理しています)
準備
GOPATH 環境変数を設定して sarama-cluster を go get しておきます。
sarama-cluster の取得例
> go get github.com/bsm/sarama-cluster
サンプル作成
sarama-cluster の場合は sarama よりもシンプルで、NewConsumer
で Consumer
を作り、Messages
で取得したチャネルからメッセージを受信できます。
また、MarkOffset
を使ってオフセットを保存します。
src/consumer-group-sample/main.go
package main import ( "fmt" "log" "io" "os" "os/signal" "github.com/Shopify/sarama" cluster "github.com/bsm/sarama-cluster" ) ・・・ func main() { config := cluster.NewConfig() config.Group.Return.Notifications = true config.Consumer.Return.Errors = true config.Consumer.Offsets.Initial = sarama.OffsetOldest topics := []string{os.Args[1]} group := os.Args[2] consumer, err := cluster.NewConsumer([]string{"localhost:9092"}, group, topics, config) if err != nil { panic(err) } defer close(consumer) ch := makeStopChannel() for { select { case msg, more := <- consumer.Messages(): if more { fmt.Printf("topic: %s, offset: %d, key: %s, value: %s\n", msg.Topic, msg.Offset, msg.Key, msg.Value) // オフセットの保存 consumer.MarkOffset(msg, "") } case err, more := <- consumer.Errors(): if more { log.Printf("Error: %+v\n", err.Error()) } case ntf, more := <- consumer.Notifications(): if more { log.Printf("Notification: %+v\n", ntf) } case <- ch: return } } }
ビルドと実行
ビルド例
> cd src/consumer-group-sample > go build
Kafka へ接続して動作確認してみます。
実行例
> consumer-group-sample sample1 g3 ・・・ Notification: &{Claimed:map[sample1:[0]] Released:map[] Current:map[sample1:[0]]} topic: sample1, offset: 0, key: a, value: 123 topic: sample1, offset: 1, key: b, value: 456
クラスター実行
同一グループに所属する Consumer クライアント(上記サンプル)を複数実行し、メッセージの分散受信を試してみます。
Kafka では 1つのパーティションは 1つの Consumer に割り当てられるようで(グループ毎)、パーティションが 1つしかない状態で同一グループに属する Consumer を増やしても分散の効果はありません。
そこで、まずはパーティション数を増やしておきます。
既存トピックのパーティション数を変更するには kafka-topics コマンド ※ が使えます。
--alter
オプションを使って --partitions
へ変更後のパーティション数を指定します。
※ kafka-topics コマンドは kafka.admin.TopicCommand クラスの main メソッドを呼び出しているだけです
パーティション数を 3つへ変更する例
> kafka-topics --alter --zookeeper 127.0.0.1:2181 --topic sample1 --partitions 3 WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected Adding partitions succeeded!
同一グループ cg1 に所属する Consumer クライアントを 3つ実行して、メッセージをいくつか送信してみたところ、以下のような結果となりました。
A
> consumer-group-sample sample1 cg1 ・・・ Notification: &{Claimed:map[sample1:[0 1 2]] Released:map[] Current:map[sample1:[0 1 2]]} topic: sample1, offset: 0, key: a, value: 123 topic: sample1, offset: 1, key: b, value: 456 ・・・ Notification: &{Claimed:map[sample1:[]] Released:map[sample1:[2]] Current:map[sample1:[0 1]]} ・・・ Notification: &{Claimed:map[sample1:[]] Released:map[sample1:[1]] Current:map[sample1:[0]]} topic: sample1, offset: 2, key: f, value: 678
B
> consumer-group-sample sample1 cg1 ・・・ Notification: &{Claimed:map[sample1:[2]] Released:map[] Current:map[sample1:[2]]} ・・・ Notification: &{Claimed:map[sample1:[1]] Released:map[sample1:[2]] Current:map[sample1:[1]]} topic: sample1, offset: 0, key: c, value: 789
C
> consumer-group-sample sample1 cg1 ・・・ Notification: &{Claimed:map[sample1:[2]] Released:map[] Current:map[sample1:[2]]} topic: sample1, offset: 0, key: d, value: 012 topic: sample1, offset: 1, key: e, value: 345
Notification を見るとそれぞれのクライアントへ割り当てられているパーティションが変化している事を確認できます。
例えば、A は最初 0 1 2 の 3つのパーティションが割り当てられていましたが、B や C の追加に伴って減っていき最終的にパーティション 0 のみとなっています。