Groovy で Kafka を組み込み実行

Groovy で Apache Kafka を組み込み実行してみました。

今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170410/

Kafka 組み込み実行

Kafka の実行には ZooKeeper が必要なため、ZooKeeper と Kafka の両方を組み込み実行します。

ZooKeeperServerMain を実行 (initializeAndRun) すると処理をブロックしてしまい、後続の処理を実行できないので、今回は別スレッドで実行するようにしました。

initializeAndRun メソッドは以下のように配列の要素数によって引数の解釈が異なるようです。

引数の数 引数の内容
1つ 設定ファイルのパス
2つ以上 ポート番号, データディレクトリ, tickTime, maxClientCnxns
kafka_embed.groovy
@Grab('org.apache.kafka:kafka_2.12:0.10.2.0')
@Grab('org.apache.zookeeper:zookeeper:3.5.2-alpha')
import kafka.server.KafkaServerStartable
import org.apache.zookeeper.server.ZooKeeperServerMain

def zkPort = '2181'
def zkDir = 'zk-tmp'
def kafkaDir = 'kafka-logs'

def zk = new ZooKeeperServerMain()

Thread.start {
    // ZooKeeper の実行
    zk.initializeAndRun([zkPort, zkDir] as String[])
}

def kafkaProps = new Properties()
kafkaProps.setProperty('zookeeper.connect', "localhost:${zkPort}")
kafkaProps.setProperty('log.dir', kafkaDir)

def kafka = KafkaServerStartable.fromProps(kafkaProps)
// Kafka の実行
kafka.startup()

println 'startup ...'

System.in.read()

kafka.shutdown()
zk.shutdown()

Groovy のデフォルト設定ではメモリ不足で起動に失敗したため、JAVA_OPTS 環境変数で最大メモリサイズを変更して実行します。

実行
> set JAVA_OPTS=-Xmx512m
> groovy kafka_embed.groovy

startup ・・・

備考

ZooKeeper を組み込み実行するには、Apache Curatororg.apache.curator.test.TestingServer を使う方法もあります。

TestingServer の close 時に ZooKeeper のデータディレクトリを削除しないようにするには InstanceSpec を使います。 (コンストラクタの第5引数を false にすると削除しなくなる)

kafka_embed_curator.groovy
@Grab('org.apache.kafka:kafka_2.12:0.10.2.0')
@Grapes([
    @Grab('org.apache.curator:curator-test:3.3.0'),
    @GrabExclude('io.netty#netty:3.7.0.Final')
])
import kafka.server.KafkaServerStartable
import org.apache.curator.test.TestingServer
import org.apache.curator.test.InstanceSpec

def zkPort = 2181
def zkDir = 'zk-tmp'
def kafkaDir = 'kafka-logs'

// close 時にデータディレクトリを残すように false を指定
def spec = new InstanceSpec(new File(zkDir), zkPort, -1, -1, false, -1)

def props = new Properties()
props.setProperty('zookeeper.connect', "localhost:${zkPort}")
props.setProperty('log.dir', kafkaDir)

// 第2引数を true にするとコンストラクタ内で start メソッドを実行する
new TestingServer(spec, false).withCloseable { zk ->
    zk.start()

    def kafka = KafkaServerStartable.fromProps(props)

    kafka.startup()

    println 'startup ...'

    System.in.read()

    kafka.shutdown()

    zk.stop()
}

Kafka クライアント

ついでに、Kafka の各種クライアント処理も Groovy で実装してみます。

a. KafkaProducer でメッセージ送信

まずは KafkaProducer を使った Kafka へのメッセージ送信処理です。

メッセージはトピックへ送信する事になり、トピックが未作成の場合は自動的に作成されます。(kafka.admin.TopicCommand 等でトピックを事前に作成しておく事も可能)

bootstrap.servers で接続先の Kafka を指定します。

kafka_client_producer.groovy
@Grab('org.apache.kafka:kafka-clients:0.10.2.0')
@Grab('org.slf4j:slf4j-simple:1.7.24')
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord

def topic = args[0]
def key = args[1]
def value = args[2]

def props = new Properties()

props.setProperty('bootstrap.servers', 'localhost:9092')
props.setProperty('key.serializer', 'org.apache.kafka.common.serialization.StringSerializer')
props.setProperty('value.serializer', 'org.apache.kafka.common.serialization.StringSerializer')

new KafkaProducer(props).withCloseable { producer ->

    def res = producer.send(new ProducerRecord(topic, key, value))

    println "***** result: ${res.get()}"
}
KafkaProducer 実行例
> groovy kafka_client_producer.groovy sample1 a 123

・・・
***** result: sample1-0@0
・・・

> groovy kafka_client_producer.groovy sample1 b 456

・・・
***** result: sample1-0@1
・・・

b. KafkaConsumer でメッセージ受信

次に KafkaConsumer でメッセージを受信してみます。

トピックを subscribe する事でメッセージを受信します。

デフォルトでは subscribe 後に送信されたメッセージを受信する事になります。 送信済みのメッセージも受信するには auto.offset.resetearliest を設定します。

Kafka では group.id で指定したグループ ID 毎にメッセージの Offset (どのメッセージまでを受信したか) が管理されます。

複数のクライアントが同一グループ ID に属している場合は、その中の 1つがメッセージを受信する事になるようです。

kafka_client_consumer.groovy
@Grab('org.apache.kafka:kafka-clients:0.10.2.0')
@Grab('org.slf4j:slf4j-simple:1.7.24')
import org.apache.kafka.clients.consumer.KafkaConsumer
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executors

def topic = args[0]
def group = args[1]

def props = new Properties()

props.setProperty('bootstrap.servers', 'localhost:9092')
props.setProperty('group.id', group)
props.setProperty('key.deserializer', 'org.apache.kafka.common.serialization.StringDeserializer')
props.setProperty('value.deserializer', 'org.apache.kafka.common.serialization.StringDeserializer')
props.setProperty('auto.offset.reset', 'earliest')

def stopLatch = new CountDownLatch(1)

def es = Executors.newSingleThreadExecutor()

es.submit {
    new KafkaConsumer(props).withCloseable { consumer ->

        consumer.subscribe([topic])

        while(stopLatch.count > 0) {
            def records = consumer.poll(1000)

            records.each {
                println "***** result: ${it}"
            }
        }
    }
}

System.in.read()

stopLatch.countDown()

es.shutdown()
KafkaConsumer 実行例
> groovy kafka_client_consumer.groovy sample1 g1

・・・
***** result: ConsumerRecord(topic = sample1, partition = 0, offset = 0, CreateTime = 1491758385860, checksum = 1240240547, serialized key size = 1, serialized value size = 3, key = a, value = 123)
***** result: ConsumerRecord(topic = sample1, partition = 0, offset = 1, CreateTime = 1491758400116, checksum = 728766236, serialized key size = 1, serialized value size = 3, key = b, value = 456)

c. KafkaStreams でメッセージ受信

KafkaStreams を使ってメッセージを受信します。

グループ ID は application.id で指定するようです。

kafka_client_stream.groovy
@Grab('org.apache.kafka:kafka-streams:0.10.2.0')
@Grab('org.slf4j:slf4j-simple:1.7.24')
import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.kstream.KStreamBuilder
import org.apache.kafka.common.serialization.Serdes

def topic = args[0]
def group = args[1]

def props = new Properties()

props.put('application.id', group)
props.put('bootstrap.servers', 'localhost:9092')
props.put('key.serde', Serdes.String().class)
props.put('value.serde', Serdes.String().class)

def builder = new KStreamBuilder()
builder.stream(topic).print()

def streams = new KafkaStreams(builder, props)

streams.start()

System.in.read()

streams.close()

動作確認のため、未使用のグループ ID を使って実行します。

KafkaStreams 実行例
> groovy kafka_client_stream.groovy sample1 g2

・・・
[KSTREAM-SOURCE-0000000000]: a , 123
[KSTREAM-SOURCE-0000000000]: b , 456

d. KafkaConsumerGroupService で Offset 確認

KafkaConsumerGroupService を使ってグループ ID 毎の Offset を確認してみます。

kafka_group_offset.groovy
@Grab('org.apache.kafka:kafka_2.12:0.10.2.0')
import kafka.admin.ConsumerGroupCommand.ConsumerGroupCommandOptions
import kafka.admin.ConsumerGroupCommand.KafkaConsumerGroupService

def group = args[0]

def params = ['--bootstrap-server', 'localhost:9092', '--group', group] as String[]

def opts = new ConsumerGroupCommandOptions(params)

def svc = new KafkaConsumerGroupService(opts)

def res = svc.describeGroup()

res._2.foreach {
    it.foreach { st ->
        println "topic = ${st.topic.value}, offset = ${st.offset.value}, partition = ${st.partition.value}"
    }
}

svc.close()
KafkaConsumerGroupService 実行例
> groovy kafka_group_offset.groovy g1

topic = sample1, offset = 2, partition = 0