Go で Kafka の Consumer クライアント

下記ライブラリをそれぞれ使って Go 言語で Apache Kafka の Consumer クライアントを作成してみました。

今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170506/

Sarama の場合

まずは sarama を使ってみます。

準備

GOPATH 環境変数を設定して sarama を go get しておきます。

sarama の取得例
> go get github.com/Shopify/sarama

サンプル作成

sarama でメッセージを受信するには以下のように実装すれば良さそうです。

  • (1) sarama.NewConsumerConsumer を作成
  • (2) Consumer の ConsumePartition で指定のトピック・パーティションからメッセージを受信するための PartitionConsumer を作成
  • (3) PartitionConsumer の Messages で取得したチャネルからメッセージを受信

(2) で sarama.OffsetOldest を指定する事で Kafka に残っている最も古いオフセットからメッセージを取得します。

for select を使って継続的に Kafka からメッセージを受信するようにしていますが、このままだと処理が終了しないので Ctrl + c で終了するように os.Signal のチャネルを使っています。

src/consumer-sample/main.go
package main

import (
    "fmt"
    "log"
    "io"
    "os"
    "os/signal"
    "github.com/Shopify/sarama"
)

func close(trg io.Closer) {
    if err := trg.Close(); err != nil {
        log.Fatalln(err)
    }
}

func makeStopChannel() chan os.Signal {
    // Ctrl + c のチャネル設定
    ch := make(chan os.Signal, 1)
    signal.Notify(ch, os.Interrupt)

    return ch
}

func main() {
    // (1)
    consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)

    if err != nil {
        panic(err)
    }

    defer close(consumer)
    // (2)
    ptConsumer, err := consumer.ConsumePartition(os.Args[1], 0, sarama.OffsetOldest)

    if err != nil {
        panic(err)
    }

    defer close(ptConsumer)

    ch := makeStopChannel()

    for {
        select {
            // (3)
            case msg := <- ptConsumer.Messages():
                fmt.Printf("topic: %s, offset: %d, key: %s, value: %s\n", 
                    msg.Topic, msg.Offset, msg.Key, msg.Value)
            case <- ch:
                // 終了
                return
        }
    }
}

ビルドと実行

ビルド例
> cd src/consumer-sample
> go build

動作確認のため、前回 に組み込み実行した Kafka へ接続してみます。

実行例
> consumer-sample sample1

topic: sample1, offset: 0, key: a, value: 123
topic: sample1, offset: 1, key: b, value: 456

Ctrl + c を入力して終了します。

グループへ属していないのでオフセットが保存されず、実行の度に上記のような出力となります。

備考 - Enter キーで終了する方法

makeStopChannel の処理内容を以下のように変えてみると、Ctrl + c ではなく Enter キーの入力で終了するようにできます。

src/consumer-sample2/main.go
package main

import (
    ・・・
    "bufio"
    "github.com/Shopify/sarama"
)

・・・

func makeStopChannel() chan string {
    ch := make(chan string)
    reader := bufio.NewReader(os.Stdin)

    go func() {
        s, _ := reader.ReadString('\n')
        ch <- s
    }()

    return ch
}

func main() {
    ・・・
}

Sarama Cluster の場合

次に sarama-cluster を使ってみます。

sarama-cluster は sarama の拡張で、グループなどへの参加を sarama よりも簡単に実施できるようになっています。 (内部的には sarama の JoinGroupRequest 等を使ってグループを処理しています)

準備

GOPATH 環境変数を設定して sarama-cluster を go get しておきます。

sarama-cluster の取得例
> go get github.com/bsm/sarama-cluster

サンプル作成

sarama-cluster の場合は sarama よりもシンプルで、NewConsumerConsumer を作り、Messages で取得したチャネルからメッセージを受信できます。

また、MarkOffset を使ってオフセットを保存します。

src/consumer-group-sample/main.go
package main

import (
    "fmt"
    "log"
    "io"
    "os"
    "os/signal"
    "github.com/Shopify/sarama"
    cluster "github.com/bsm/sarama-cluster"
)

・・・

func main() {
    config := cluster.NewConfig()
    config.Group.Return.Notifications = true
    config.Consumer.Return.Errors = true
    config.Consumer.Offsets.Initial = sarama.OffsetOldest

    topics := []string{os.Args[1]}
    group := os.Args[2]

    consumer, err := cluster.NewConsumer([]string{"localhost:9092"}, group, topics, config)

    if err != nil {
        panic(err)
    }

    defer close(consumer)

    ch := makeStopChannel()

    for {
        select {
            case msg, more := <- consumer.Messages():
                if more {
                    fmt.Printf("topic: %s, offset: %d, key: %s, value: %s\n", 
                            msg.Topic, msg.Offset, msg.Key, msg.Value)

                    // オフセットの保存
                    consumer.MarkOffset(msg, "")
                }
            case err, more := <- consumer.Errors():
                if more {
                    log.Printf("Error: %+v\n", err.Error())
                }
            case ntf, more := <- consumer.Notifications():
                if more {
                    log.Printf("Notification: %+v\n", ntf)
                }
            case <- ch:
                return
        }
    }
}

ビルドと実行

ビルド例
> cd src/consumer-group-sample
> go build

Kafka へ接続して動作確認してみます。

実行例
> consumer-group-sample sample1 g3

・・・ Notification: &{Claimed:map[sample1:[0]] Released:map[] Current:map[sample1:[0]]}
topic: sample1, offset: 0, key: a, value: 123
topic: sample1, offset: 1, key: b, value: 456

クラスター実行

同一グループに所属する Consumer クライアント(上記サンプル)を複数実行し、メッセージの分散受信を試してみます。

Kafka では 1つのパーティションは 1つの Consumer に割り当てられるようで(グループ毎)、パーティションが 1つしかない状態で同一グループに属する Consumer を増やしても分散の効果はありません。

そこで、まずはパーティション数を増やしておきます。

既存トピックのパーティション数を変更するには kafka-topics コマンド ※ が使えます。 --alter オプションを使って --partitions へ変更後のパーティション数を指定します。

 ※ kafka-topics コマンドは kafka.admin.TopicCommand クラスの
    main メソッドを呼び出しているだけです
パーティション数を 3つへ変更する例
> kafka-topics --alter --zookeeper 127.0.0.1:2181 --topic sample1 --partitions 3

WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

同一グループ cg1 に所属する Consumer クライアントを 3つ実行して、メッセージをいくつか送信してみたところ、以下のような結果となりました。

A
> consumer-group-sample sample1 cg1

・・・ Notification: &{Claimed:map[sample1:[0 1 2]] Released:map[] Current:map[sample1:[0 1 2]]}
topic: sample1, offset: 0, key: a, value: 123
topic: sample1, offset: 1, key: b, value: 456
・・・ Notification: &{Claimed:map[sample1:[]] Released:map[sample1:[2]] Current:map[sample1:[0 1]]}
・・・ Notification: &{Claimed:map[sample1:[]] Released:map[sample1:[1]] Current:map[sample1:[0]]}
topic: sample1, offset: 2, key: f, value: 678
B
> consumer-group-sample sample1 cg1

・・・ Notification: &{Claimed:map[sample1:[2]] Released:map[] Current:map[sample1:[2]]}
・・・ Notification: &{Claimed:map[sample1:[1]] Released:map[sample1:[2]] Current:map[sample1:[1]]}
topic: sample1, offset: 0, key: c, value: 789
C
> consumer-group-sample sample1 cg1

・・・ Notification: &{Claimed:map[sample1:[2]] Released:map[] Current:map[sample1:[2]]}
topic: sample1, offset: 0, key: d, value: 012
topic: sample1, offset: 1, key: e, value: 345

Notification を見るとそれぞれのクライアントへ割り当てられているパーティションが変化している事を確認できます。

例えば、A は最初 0 1 2 の 3つのパーティションが割り当てられていましたが、B や C の追加に伴って減っていき最終的にパーティション 0 のみとなっています。

B はパーティション 2 に割り当てられていましたが、C の追加によってパーティション 1 に変更されています。