Groovy で Kafka を組み込み実行
Groovy で Apache Kafka を組み込み実行してみました。
今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170410/
Kafka 組み込み実行
Kafka の実行には ZooKeeper が必要なため、ZooKeeper と Kafka の両方を組み込み実行します。
ZooKeeperServerMain を実行 (initializeAndRun
) すると処理をブロックしてしまい、後続の処理を実行できないので、今回は別スレッドで実行するようにしました。
initializeAndRun メソッドは以下のように配列の要素数によって引数の解釈が異なるようです。
引数の数 | 引数の内容 |
---|---|
1つ | 設定ファイルのパス |
2つ以上 | ポート番号, データディレクトリ, tickTime, maxClientCnxns |
kafka_embed.groovy
@Grab('org.apache.kafka:kafka_2.12:0.10.2.0') @Grab('org.apache.zookeeper:zookeeper:3.5.2-alpha') import kafka.server.KafkaServerStartable import org.apache.zookeeper.server.ZooKeeperServerMain def zkPort = '2181' def zkDir = 'zk-tmp' def kafkaDir = 'kafka-logs' def zk = new ZooKeeperServerMain() Thread.start { // ZooKeeper の実行 zk.initializeAndRun([zkPort, zkDir] as String[]) } def kafkaProps = new Properties() kafkaProps.setProperty('zookeeper.connect', "localhost:${zkPort}") kafkaProps.setProperty('log.dir', kafkaDir) def kafka = KafkaServerStartable.fromProps(kafkaProps) // Kafka の実行 kafka.startup() println 'startup ...' System.in.read() kafka.shutdown() zk.shutdown()
Groovy のデフォルト設定ではメモリ不足で起動に失敗したため、JAVA_OPTS
環境変数で最大メモリサイズを変更して実行します。
実行
> set JAVA_OPTS=-Xmx512m > groovy kafka_embed.groovy startup ・・・
備考
ZooKeeper を組み込み実行するには、Apache Curator の org.apache.curator.test.TestingServer
を使う方法もあります。
TestingServer の close 時に ZooKeeper のデータディレクトリを削除しないようにするには InstanceSpec
を使います。 (コンストラクタの第5引数を false にすると削除しなくなる)
kafka_embed_curator.groovy
@Grab('org.apache.kafka:kafka_2.12:0.10.2.0') @Grapes([ @Grab('org.apache.curator:curator-test:3.3.0'), @GrabExclude('io.netty#netty:3.7.0.Final') ]) import kafka.server.KafkaServerStartable import org.apache.curator.test.TestingServer import org.apache.curator.test.InstanceSpec def zkPort = 2181 def zkDir = 'zk-tmp' def kafkaDir = 'kafka-logs' // close 時にデータディレクトリを残すように false を指定 def spec = new InstanceSpec(new File(zkDir), zkPort, -1, -1, false, -1) def props = new Properties() props.setProperty('zookeeper.connect', "localhost:${zkPort}") props.setProperty('log.dir', kafkaDir) // 第2引数を true にするとコンストラクタ内で start メソッドを実行する new TestingServer(spec, false).withCloseable { zk -> zk.start() def kafka = KafkaServerStartable.fromProps(props) kafka.startup() println 'startup ...' System.in.read() kafka.shutdown() zk.stop() }
Kafka クライアント
ついでに、Kafka の各種クライアント処理も Groovy で実装してみます。
a. KafkaProducer でメッセージ送信
まずは KafkaProducer を使った Kafka へのメッセージ送信処理です。
メッセージはトピックへ送信する事になり、トピックが未作成の場合は自動的に作成されます。(kafka.admin.TopicCommand
等でトピックを事前に作成しておく事も可能)
bootstrap.servers
で接続先の Kafka を指定します。
kafka_client_producer.groovy
@Grab('org.apache.kafka:kafka-clients:0.10.2.0') @Grab('org.slf4j:slf4j-simple:1.7.24') import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.clients.producer.ProducerRecord def topic = args[0] def key = args[1] def value = args[2] def props = new Properties() props.setProperty('bootstrap.servers', 'localhost:9092') props.setProperty('key.serializer', 'org.apache.kafka.common.serialization.StringSerializer') props.setProperty('value.serializer', 'org.apache.kafka.common.serialization.StringSerializer') new KafkaProducer(props).withCloseable { producer -> def res = producer.send(new ProducerRecord(topic, key, value)) println "***** result: ${res.get()}" }
KafkaProducer 実行例
> groovy kafka_client_producer.groovy sample1 a 123 ・・・ ***** result: sample1-0@0 ・・・ > groovy kafka_client_producer.groovy sample1 b 456 ・・・ ***** result: sample1-0@1 ・・・
b. KafkaConsumer でメッセージ受信
次に KafkaConsumer でメッセージを受信してみます。
トピックを subscribe
する事でメッセージを受信します。
デフォルトでは subscribe 後に送信されたメッセージを受信する事になります。
送信済みのメッセージも受信するには auto.offset.reset
へ earliest
を設定します。
Kafka では group.id
で指定したグループ ID 毎にメッセージの Offset (どのメッセージまでを受信したか) が管理されます。
複数のクライアントが同一グループ ID に属している場合は、その中の 1つがメッセージを受信する事になるようです。
kafka_client_consumer.groovy
@Grab('org.apache.kafka:kafka-clients:0.10.2.0') @Grab('org.slf4j:slf4j-simple:1.7.24') import org.apache.kafka.clients.consumer.KafkaConsumer import java.util.concurrent.CountDownLatch import java.util.concurrent.Executors def topic = args[0] def group = args[1] def props = new Properties() props.setProperty('bootstrap.servers', 'localhost:9092') props.setProperty('group.id', group) props.setProperty('key.deserializer', 'org.apache.kafka.common.serialization.StringDeserializer') props.setProperty('value.deserializer', 'org.apache.kafka.common.serialization.StringDeserializer') props.setProperty('auto.offset.reset', 'earliest') def stopLatch = new CountDownLatch(1) def es = Executors.newSingleThreadExecutor() es.submit { new KafkaConsumer(props).withCloseable { consumer -> consumer.subscribe([topic]) while(stopLatch.count > 0) { def records = consumer.poll(1000) records.each { println "***** result: ${it}" } } } } System.in.read() stopLatch.countDown() es.shutdown()
KafkaConsumer 実行例
> groovy kafka_client_consumer.groovy sample1 g1 ・・・ ***** result: ConsumerRecord(topic = sample1, partition = 0, offset = 0, CreateTime = 1491758385860, checksum = 1240240547, serialized key size = 1, serialized value size = 3, key = a, value = 123) ***** result: ConsumerRecord(topic = sample1, partition = 0, offset = 1, CreateTime = 1491758400116, checksum = 728766236, serialized key size = 1, serialized value size = 3, key = b, value = 456)
c. KafkaStreams でメッセージ受信
KafkaStreams を使ってメッセージを受信します。
グループ ID は application.id
で指定するようです。
kafka_client_stream.groovy
@Grab('org.apache.kafka:kafka-streams:0.10.2.0') @Grab('org.slf4j:slf4j-simple:1.7.24') import org.apache.kafka.streams.KafkaStreams import org.apache.kafka.streams.kstream.KStreamBuilder import org.apache.kafka.common.serialization.Serdes def topic = args[0] def group = args[1] def props = new Properties() props.put('application.id', group) props.put('bootstrap.servers', 'localhost:9092') props.put('key.serde', Serdes.String().class) props.put('value.serde', Serdes.String().class) def builder = new KStreamBuilder() builder.stream(topic).print() def streams = new KafkaStreams(builder, props) streams.start() System.in.read() streams.close()
動作確認のため、未使用のグループ ID を使って実行します。
KafkaStreams 実行例
> groovy kafka_client_stream.groovy sample1 g2 ・・・ [KSTREAM-SOURCE-0000000000]: a , 123 [KSTREAM-SOURCE-0000000000]: b , 456
d. KafkaConsumerGroupService で Offset 確認
KafkaConsumerGroupService を使ってグループ ID 毎の Offset を確認してみます。
kafka_group_offset.groovy
@Grab('org.apache.kafka:kafka_2.12:0.10.2.0') import kafka.admin.ConsumerGroupCommand.ConsumerGroupCommandOptions import kafka.admin.ConsumerGroupCommand.KafkaConsumerGroupService def group = args[0] def params = ['--bootstrap-server', 'localhost:9092', '--group', group] as String[] def opts = new ConsumerGroupCommandOptions(params) def svc = new KafkaConsumerGroupService(opts) def res = svc.describeGroup() res._2.foreach { it.foreach { st -> println "topic = ${st.topic.value}, offset = ${st.offset.value}, partition = ${st.partition.value}" } } svc.close()
KafkaConsumerGroupService 実行例
> groovy kafka_group_offset.groovy g1 topic = sample1, offset = 2, partition = 0