Go で Kafka の Consumer クライアント

下記ライブラリをそれぞれ使って Go 言語で Apache Kafka の Consumer クライアントを作成してみました。

今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170506/

Sarama の場合

まずは sarama を使ってみます。

準備

GOPATH 環境変数を設定して sarama を go get しておきます。

sarama の取得例
> go get github.com/Shopify/sarama

サンプル作成

sarama でメッセージを受信するには以下のように実装すれば良さそうです。

  • (1) sarama.NewConsumerConsumer を作成
  • (2) Consumer の ConsumePartition で指定のトピック・パーティションからメッセージを受信するための PartitionConsumer を作成
  • (3) PartitionConsumer の Messages で取得したチャネルからメッセージを受信

(2) で sarama.OffsetOldest を指定する事で Kafka に残っている最も古いオフセットからメッセージを取得します。

for select を使って継続的に Kafka からメッセージを受信するようにしていますが、このままだと処理が終了しないので Ctrl + c で終了するように os.Signal のチャネルを使っています。

src/consumer-sample/main.go
package main

import (
    "fmt"
    "log"
    "io"
    "os"
    "os/signal"
    "github.com/Shopify/sarama"
)

func close(trg io.Closer) {
    if err := trg.Close(); err != nil {
        log.Fatalln(err)
    }
}

func makeStopChannel() chan os.Signal {
    // Ctrl + c のチャネル設定
    ch := make(chan os.Signal, 1)
    signal.Notify(ch, os.Interrupt)

    return ch
}

func main() {
    // (1)
    consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)

    if err != nil {
        panic(err)
    }

    defer close(consumer)
    // (2)
    ptConsumer, err := consumer.ConsumePartition(os.Args[1], 0, sarama.OffsetOldest)

    if err != nil {
        panic(err)
    }

    defer close(ptConsumer)

    ch := makeStopChannel()

    for {
        select {
            // (3)
            case msg := <- ptConsumer.Messages():
                fmt.Printf("topic: %s, offset: %d, key: %s, value: %s\n", 
                    msg.Topic, msg.Offset, msg.Key, msg.Value)
            case <- ch:
                // 終了
                return
        }
    }
}

ビルドと実行

ビルド例
> cd src/consumer-sample
> go build

動作確認のため、前回 に組み込み実行した Kafka へ接続してみます。

実行例
> consumer-sample sample1

topic: sample1, offset: 0, key: a, value: 123
topic: sample1, offset: 1, key: b, value: 456

Ctrl + c を入力して終了します。

グループへ属していないのでオフセットが保存されず、実行の度に上記のような出力となります。

備考 - Enter キーで終了する方法

makeStopChannel の処理内容を以下のように変えてみると、Ctrl + c ではなく Enter キーの入力で終了するようにできます。

src/consumer-sample2/main.go
package main

import (
    ・・・
    "bufio"
    "github.com/Shopify/sarama"
)

・・・

func makeStopChannel() chan string {
    ch := make(chan string)
    reader := bufio.NewReader(os.Stdin)

    go func() {
        s, _ := reader.ReadString('\n')
        ch <- s
    }()

    return ch
}

func main() {
    ・・・
}

Sarama Cluster の場合

次に sarama-cluster を使ってみます。

sarama-cluster は sarama の拡張で、グループなどへの参加を sarama よりも簡単に実施できるようになっています。 (内部的には sarama の JoinGroupRequest 等を使ってグループを処理しています)

準備

GOPATH 環境変数を設定して sarama-cluster を go get しておきます。

sarama-cluster の取得例
> go get github.com/bsm/sarama-cluster

サンプル作成

sarama-cluster の場合は sarama よりもシンプルで、NewConsumerConsumer を作り、Messages で取得したチャネルからメッセージを受信できます。

また、MarkOffset を使ってオフセットを保存します。

src/consumer-group-sample/main.go
package main

import (
    "fmt"
    "log"
    "io"
    "os"
    "os/signal"
    "github.com/Shopify/sarama"
    cluster "github.com/bsm/sarama-cluster"
)

・・・

func main() {
    config := cluster.NewConfig()
    config.Group.Return.Notifications = true
    config.Consumer.Return.Errors = true
    config.Consumer.Offsets.Initial = sarama.OffsetOldest

    topics := []string{os.Args[1]}
    group := os.Args[2]

    consumer, err := cluster.NewConsumer([]string{"localhost:9092"}, group, topics, config)

    if err != nil {
        panic(err)
    }

    defer close(consumer)

    ch := makeStopChannel()

    for {
        select {
            case msg, more := <- consumer.Messages():
                if more {
                    fmt.Printf("topic: %s, offset: %d, key: %s, value: %s\n", 
                            msg.Topic, msg.Offset, msg.Key, msg.Value)

                    // オフセットの保存
                    consumer.MarkOffset(msg, "")
                }
            case err, more := <- consumer.Errors():
                if more {
                    log.Printf("Error: %+v\n", err.Error())
                }
            case ntf, more := <- consumer.Notifications():
                if more {
                    log.Printf("Notification: %+v\n", ntf)
                }
            case <- ch:
                return
        }
    }
}

ビルドと実行

ビルド例
> cd src/consumer-group-sample
> go build

Kafka へ接続して動作確認してみます。

実行例
> consumer-group-sample sample1 g3

・・・ Notification: &{Claimed:map[sample1:[0]] Released:map[] Current:map[sample1:[0]]}
topic: sample1, offset: 0, key: a, value: 123
topic: sample1, offset: 1, key: b, value: 456

クラスター実行

同一グループに所属する Consumer クライアント(上記サンプル)を複数実行し、メッセージの分散受信を試してみます。

Kafka では 1つのパーティションは 1つの Consumer に割り当てられるようで(グループ毎)、パーティションが 1つしかない状態で同一グループに属する Consumer を増やしても分散の効果はありません。

そこで、まずはパーティション数を増やしておきます。

既存トピックのパーティション数を変更するには kafka-topics コマンド ※ が使えます。 --alter オプションを使って --partitions へ変更後のパーティション数を指定します。

 ※ kafka-topics コマンドは kafka.admin.TopicCommand クラスの
    main メソッドを呼び出しているだけです
パーティション数を 3つへ変更する例
> kafka-topics --alter --zookeeper 127.0.0.1:2181 --topic sample1 --partitions 3

WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

同一グループ cg1 に所属する Consumer クライアントを 3つ実行して、メッセージをいくつか送信してみたところ、以下のような結果となりました。

A
> consumer-group-sample sample1 cg1

・・・ Notification: &{Claimed:map[sample1:[0 1 2]] Released:map[] Current:map[sample1:[0 1 2]]}
topic: sample1, offset: 0, key: a, value: 123
topic: sample1, offset: 1, key: b, value: 456
・・・ Notification: &{Claimed:map[sample1:[]] Released:map[sample1:[2]] Current:map[sample1:[0 1]]}
・・・ Notification: &{Claimed:map[sample1:[]] Released:map[sample1:[1]] Current:map[sample1:[0]]}
topic: sample1, offset: 2, key: f, value: 678
B
> consumer-group-sample sample1 cg1

・・・ Notification: &{Claimed:map[sample1:[2]] Released:map[] Current:map[sample1:[2]]}
・・・ Notification: &{Claimed:map[sample1:[1]] Released:map[sample1:[2]] Current:map[sample1:[1]]}
topic: sample1, offset: 0, key: c, value: 789
C
> consumer-group-sample sample1 cg1

・・・ Notification: &{Claimed:map[sample1:[2]] Released:map[] Current:map[sample1:[2]]}
topic: sample1, offset: 0, key: d, value: 012
topic: sample1, offset: 1, key: e, value: 345

Notification を見るとそれぞれのクライアントへ割り当てられているパーティションが変化している事を確認できます。

例えば、A は最初 0 1 2 の 3つのパーティションが割り当てられていましたが、B や C の追加に伴って減っていき最終的にパーティション 0 のみとなっています。

B はパーティション 2 に割り当てられていましたが、C の追加によってパーティション 1 に変更されています。

Groovy で Kafka を組み込み実行

Groovy で Apache Kafka を組み込み実行してみました。

今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170410/

Kafka 組み込み実行

Kafka の実行には ZooKeeper が必要なため、ZooKeeper と Kafka の両方を組み込み実行します。

ZooKeeperServerMain を実行 (initializeAndRun) すると処理をブロックしてしまい、後続の処理を実行できないので、今回は別スレッドで実行するようにしました。

initializeAndRun メソッドは以下のように配列の要素数によって引数の解釈が異なるようです。

引数の数 引数の内容
1つ 設定ファイルのパス
2つ以上 ポート番号, データディレクトリ, tickTime, maxClientCnxns
kafka_embed.groovy
@Grab('org.apache.kafka:kafka_2.12:0.10.2.0')
@Grab('org.apache.zookeeper:zookeeper:3.5.2-alpha')
import kafka.server.KafkaServerStartable
import org.apache.zookeeper.server.ZooKeeperServerMain

def zkPort = '2181'
def zkDir = 'zk-tmp'
def kafkaDir = 'kafka-logs'

def zk = new ZooKeeperServerMain()

Thread.start {
    // ZooKeeper の実行
    zk.initializeAndRun([zkPort, zkDir] as String[])
}

def kafkaProps = new Properties()
kafkaProps.setProperty('zookeeper.connect', "localhost:${zkPort}")
kafkaProps.setProperty('log.dir', kafkaDir)

def kafka = KafkaServerStartable.fromProps(kafkaProps)
// Kafka の実行
kafka.startup()

println 'startup ...'

System.in.read()

kafka.shutdown()
zk.shutdown()

Groovy のデフォルト設定ではメモリ不足で起動に失敗したため、JAVA_OPTS 環境変数で最大メモリサイズを変更して実行します。

実行
> set JAVA_OPTS=-Xmx512m
> groovy kafka_embed.groovy

startup ・・・

備考

ZooKeeper を組み込み実行するには、Apache Curatororg.apache.curator.test.TestingServer を使う方法もあります。

TestingServer の close 時に ZooKeeper のデータディレクトリを削除しないようにするには InstanceSpec を使います。 (コンストラクタの第5引数を false にすると削除しなくなる)

kafka_embed_curator.groovy
@Grab('org.apache.kafka:kafka_2.12:0.10.2.0')
@Grapes([
    @Grab('org.apache.curator:curator-test:3.3.0'),
    @GrabExclude('io.netty#netty:3.7.0.Final')
])
import kafka.server.KafkaServerStartable
import org.apache.curator.test.TestingServer
import org.apache.curator.test.InstanceSpec

def zkPort = 2181
def zkDir = 'zk-tmp'
def kafkaDir = 'kafka-logs'

// close 時にデータディレクトリを残すように false を指定
def spec = new InstanceSpec(new File(zkDir), zkPort, -1, -1, false, -1)

def props = new Properties()
props.setProperty('zookeeper.connect', "localhost:${zkPort}")
props.setProperty('log.dir', kafkaDir)

// 第2引数を true にするとコンストラクタ内で start メソッドを実行する
new TestingServer(spec, false).withCloseable { zk ->
    zk.start()

    def kafka = KafkaServerStartable.fromProps(props)

    kafka.startup()

    println 'startup ...'

    System.in.read()

    kafka.shutdown()

    zk.stop()
}

Kafka クライアント

ついでに、Kafka の各種クライアント処理も Groovy で実装してみます。

a. KafkaProducer でメッセージ送信

まずは KafkaProducer を使った Kafka へのメッセージ送信処理です。

メッセージはトピックへ送信する事になり、トピックが未作成の場合は自動的に作成されます。(kafka.admin.TopicCommand 等でトピックを事前に作成しておく事も可能)

bootstrap.servers で接続先の Kafka を指定します。

kafka_client_producer.groovy
@Grab('org.apache.kafka:kafka-clients:0.10.2.0')
@Grab('org.slf4j:slf4j-simple:1.7.24')
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord

def topic = args[0]
def key = args[1]
def value = args[2]

def props = new Properties()

props.setProperty('bootstrap.servers', 'localhost:9092')
props.setProperty('key.serializer', 'org.apache.kafka.common.serialization.StringSerializer')
props.setProperty('value.serializer', 'org.apache.kafka.common.serialization.StringSerializer')

new KafkaProducer(props).withCloseable { producer ->

    def res = producer.send(new ProducerRecord(topic, key, value))

    println "***** result: ${res.get()}"
}
KafkaProducer 実行例
> groovy kafka_client_producer.groovy sample1 a 123

・・・
***** result: sample1-0@0
・・・

> groovy kafka_client_producer.groovy sample1 b 456

・・・
***** result: sample1-0@1
・・・

b. KafkaConsumer でメッセージ受信

次に KafkaConsumer でメッセージを受信してみます。

トピックを subscribe する事でメッセージを受信します。

デフォルトでは subscribe 後に送信されたメッセージを受信する事になります。 送信済みのメッセージも受信するには auto.offset.resetearliest を設定します。

Kafka では group.id で指定したグループ ID 毎にメッセージの Offset (どのメッセージまでを受信したか) が管理されます。

複数のクライアントが同一グループ ID に属している場合は、その中の 1つがメッセージを受信する事になるようです。

kafka_client_consumer.groovy
@Grab('org.apache.kafka:kafka-clients:0.10.2.0')
@Grab('org.slf4j:slf4j-simple:1.7.24')
import org.apache.kafka.clients.consumer.KafkaConsumer
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executors

def topic = args[0]
def group = args[1]

def props = new Properties()

props.setProperty('bootstrap.servers', 'localhost:9092')
props.setProperty('group.id', group)
props.setProperty('key.deserializer', 'org.apache.kafka.common.serialization.StringDeserializer')
props.setProperty('value.deserializer', 'org.apache.kafka.common.serialization.StringDeserializer')
props.setProperty('auto.offset.reset', 'earliest')

def stopLatch = new CountDownLatch(1)

def es = Executors.newSingleThreadExecutor()

es.submit {
    new KafkaConsumer(props).withCloseable { consumer ->

        consumer.subscribe([topic])

        while(stopLatch.count > 0) {
            def records = consumer.poll(1000)

            records.each {
                println "***** result: ${it}"
            }
        }
    }
}

System.in.read()

stopLatch.countDown()

es.shutdown()
KafkaConsumer 実行例
> groovy kafka_client_consumer.groovy sample1 g1

・・・
***** result: ConsumerRecord(topic = sample1, partition = 0, offset = 0, CreateTime = 1491758385860, checksum = 1240240547, serialized key size = 1, serialized value size = 3, key = a, value = 123)
***** result: ConsumerRecord(topic = sample1, partition = 0, offset = 1, CreateTime = 1491758400116, checksum = 728766236, serialized key size = 1, serialized value size = 3, key = b, value = 456)

c. KafkaStreams でメッセージ受信

KafkaStreams を使ってメッセージを受信します。

グループ ID は application.id で指定するようです。

kafka_client_stream.groovy
@Grab('org.apache.kafka:kafka-streams:0.10.2.0')
@Grab('org.slf4j:slf4j-simple:1.7.24')
import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.kstream.KStreamBuilder
import org.apache.kafka.common.serialization.Serdes

def topic = args[0]
def group = args[1]

def props = new Properties()

props.put('application.id', group)
props.put('bootstrap.servers', 'localhost:9092')
props.put('key.serde', Serdes.String().class)
props.put('value.serde', Serdes.String().class)

def builder = new KStreamBuilder()
builder.stream(topic).print()

def streams = new KafkaStreams(builder, props)

streams.start()

System.in.read()

streams.close()

動作確認のため、未使用のグループ ID を使って実行します。

KafkaStreams 実行例
> groovy kafka_client_stream.groovy sample1 g2

・・・
[KSTREAM-SOURCE-0000000000]: a , 123
[KSTREAM-SOURCE-0000000000]: b , 456

d. KafkaConsumerGroupService で Offset 確認

KafkaConsumerGroupService を使ってグループ ID 毎の Offset を確認してみます。

kafka_group_offset.groovy
@Grab('org.apache.kafka:kafka_2.12:0.10.2.0')
import kafka.admin.ConsumerGroupCommand.ConsumerGroupCommandOptions
import kafka.admin.ConsumerGroupCommand.KafkaConsumerGroupService

def group = args[0]

def params = ['--bootstrap-server', 'localhost:9092', '--group', group] as String[]

def opts = new ConsumerGroupCommandOptions(params)

def svc = new KafkaConsumerGroupService(opts)

def res = svc.describeGroup()

res._2.foreach {
    it.foreach { st ->
        println "topic = ${st.topic.value}, offset = ${st.offset.value}, partition = ${st.partition.value}"
    }
}

svc.close()
KafkaConsumerGroupService 実行例
> groovy kafka_group_offset.groovy g1

topic = sample1, offset = 2, partition = 0

Java で Apache Beam を使用

前回 と同等の処理を Apache Beam を使って実装してみます。

今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170327/

サンプルアプリケーション

Beam では Pipelineapply メソッドで処理を繋げるようですので、今回は以下のように実装してみました。

  • (1) Count.perElement メソッドを使って要素毎にカウントした KV<String, Long> を取得
  • (2) ToString.kvs メソッドを使って KV の Key と Value の値を連結して文字列化
  • (3) DoFn@ProcessElement を付与したメソッドを実装し (2) で取得した文字列を標準出力

apply メソッドの引数に使用する PTransformorg.apache.beam.sdk.transforms パッケージ下に主要なものが用意されているようです。

標準出力を行うための基本作法が分からなかったので、今回は DoFn を使っています。 (他に MapElements.via(SimpleFunction) を使う方法等も考えられます)

DoFn ではアノテーションを使って処理メソッドを指定するようになっており、入力要素を 1件ずつ処理するための @ProcessElement アノテーションの他にもいくつか用意されているようです。(例えば @Setup@StartBundle 等)

また、アノテーションを付与したメソッドは引数の型等をチェックするようになっています。 (org.apache.beam.sdk.transforms.reflect.DoFnSignatures 等のソース参照)

src/main/java/MoneyCount.java
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.ToString;

public class MoneyCount {
    public static void main(String... args) throws Exception {
        PipelineOptions opt = PipelineOptionsFactory.create();
        Pipeline p = Pipeline.create(opt);

        p.apply(TextIO.Read.from(args[0]))
                .apply(Count.perElement()) // (1)
                .apply(ToString.kvs())     // (2)
                .apply(ParDo.of(new DoFn<String, String>() { // (3)
                    @ProcessElement
                    public void process(ProcessContext ctx) {
                        System.out.println(ctx.element());
                    }
                }));

        p.run().waitUntilFinish();
    }
}

実行

以下のビルド定義ファイルを使って実行します。

今回は DirectRunnerbeam-runners-direct-java) で実行していますが、Apache Spark や Flink 等で実行する方法も用意されています。

build.gradle
apply plugin: 'application'

mainClassName = 'SampleApp'

repositories {
    jcenter()
}

dependencies {
    compile 'org.apache.beam:beam-sdks-java-core:0.6.0'

    runtime 'org.apache.beam:beam-runners-direct-java:0.6.0'
    runtime 'org.slf4j:slf4j-nop:1.7.25'
}

run {
    if (project.hasProperty('args')) {
        args project.args
    }
}

実行結果は以下の通りです。 なお、出力順は実行の度に変化します。

実行結果
> gradle run -q -Pargs=input_sample.txt

10000,2
5,3
2000,1
1000,3
10,2
500,1
100,2
1,2
50,1

input_sample.txt の内容は以下の通りです。

input_sample.txt
100
1
5
50
500
1000
10000
1000
1
10
5
5
10
100
1000
10000
2000

Java 8 で Apache Flink を使用

前回 と同様の処理を Java8 のラムダ式を使って実装してみました。

今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170313/

サンプル

前回 の処理をラムダ式を使って Java で実装すると以下のようになりました。

MoneyCount.java
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;

public class MoneyCount {
    public static void main(String... args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();

        env.readTextFile(args[0])
            .map( w -> new Tuple2<>(w, 1) )
            .groupBy(0)
            .sum(1)
            .print();
    }
}

実行

Flink 1.2.0 では上記のように map 等の引数へラムダ式を使った場合、通常の JDKコンパイルすると実行時にエラーが発生してしまいます。

(a) JDKコンパイルして実行

  • javac 1.8.0_121

以下の Gradle ビルド定義を使って実行してみます。

build.gradle
apply plugin: 'application'

mainClassName = 'MoneyCount'

repositories {
    jcenter()
}

dependencies {
    compile 'org.apache.flink:flink-java:1.2.0'
    runtime 'org.apache.flink:flink-clients_2.11:1.2.0'
}

run {
    if (project.hasProperty('args')) {
        args project.args
    }
}

実行すると以下のようにエラーとなりました。

Tuple2 の型引数が失われている点が問題となっており、Eclipse JDT compiler でコンパイルしなければならないようです。

実行結果 ※
> gradle run -q -Pargs=input_sample.txt

Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: 
  The return type of function 'main(MoneyCount.java:10)' could not be determined automatically, due to type erasure. 
  You can give type information hints by using the returns(...) method on the result of the transformation call, 
  or by letting your function implement the 'ResultTypeQueryable' interface.
        at org.apache.flink.api.java.DataSet.getType(DataSet.java:174)
        at org.apache.flink.api.java.DataSet.groupBy(DataSet.java:692)
        at MoneyCount.main(MoneyCount.java:11)
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Tuple2' are missing.
It seems that your compiler has not stored them into the .class file.
Currently, only the Eclipse JDT compiler preserves the type information necessary to use the lambdas feature type-safely.
See the documentation for more information about how to compile jobs containing lambda expressions.
        at org.apache.flink.api.java.typeutils.TypeExtractor.validateLambdaGenericParameter(TypeExtractor.java:1528)
        ・・・

※ 出力結果には改行を適当に加えています

(b) Eclipse JDT Compiler でコンパイル(-genericsignature オプション付)して実行

  • ecj 4.6.1

次に Eclipse JDT Compiler でコンパイルし実行してみます。

Eclipse JDT Compiler は java -jar ecj.jar ・・・ で実行できるので、Gradle で実施する場合は compileJavaforkOptions を使って設定します。

今回のケースでは、Eclipse JDT Compiler を -genericsignature オプション付きで実行する点が重要です。(付けない場合は JDK と同じ結果になります)

build-ecj.gradle
apply plugin: 'application'

mainClassName = 'MoneyCount'

configurations {
    ecj
}

repositories {
    jcenter()
}

dependencies {
    compile 'org.apache.flink:flink-java:1.2.0'
    runtime 'org.apache.flink:flink-clients_2.11:1.2.0'

    ecj 'org.eclipse.jdt.core.compiler:ecj:4.6.1'
    // 以下でも可
    //ecj 'org.eclipse.scout.sdk.deps:ecj:4.6.2'
}

// Eclipse JDT Compiler の設定
compileJava {
    options.fork = true
    // -genericsignature オプションの指定
    options.compilerArgs << '-genericsignature'

    // java -jar ecj.jar を実行するための設定
    options.forkOptions.with {
        executable = 'java'
        jvmArgs = ['-jar', configurations.ecj.asPath]
    }
}

run {
    if (project.hasProperty('args')) {
        args project.args
    }
}

実行結果は以下の通り、正常に実行できるようになりました。

実行結果
> gradle run -q -b build-ecj.gradle -Pargs=input_sample.txt

Connected to JobManager at Actor[akka://flink/user/jobmanager_1#1055289406]
03/13/2017 20:40:46     Job execution switched to status RUNNING.
・・・
03/13/2017 20:40:47     Job execution switched to status FINISHED.
(10000,2)
(10,2)
(100,2)
(50,1)
(500,1)
(1,2)
(1000,3)
(2000,1)
(5,3)

input_sample.txt の内容は以下の通りです。

input_sample.txt
100
1
5
50
500
1000
10000
1000
1
10
5
5
10
100
1000
10000
2000

コンパイル結果の比較

最後に、コンパイル結果(.class ファイル) を CFR で処理(以下のコマンドを適用)して違いを確認しておきます。 (--decodelambdas false オプションでラムダ式の部分をメソッドとして残すようにしています)

java -jar cfr_0_120.jar MoneyCount.class --decodelambdas false

まずは JDK(javac 1.8.0_121)のコンパイル結果を確認してみます。

(a) JDKコンパイルした場合(CFR の処理結果)
・・・
public class MoneyCount {
    public static /* varargs */ void main(String ... args) throws Exception {
        ・・・
    }

    private static /* synthetic */ Tuple2 lambda$main$95f17bfa$1(String w) throws Exception {
        return new Tuple2((Object)w, (Object)1);
    }
}

lambda$main$95f17bfa$1 の戻り値の型が Tuple2 となっており、型引数が失われています。(これが実行時のエラー原因)

次に Eclipse JDT Compiler のコンパイル結果を確認してみます。

(b) Eclipse JDT Compiler でコンパイル(-genericsignature オプション付)した場合(CFR の処理結果)
・・・
public class MoneyCount {
    public static /* varargs */ void main(String ... args) throws Exception {
        ・・・
    }

    private static /* synthetic */ Tuple2<String, Integer> lambda$0(String w) throws Exception {
        return new Tuple2((Object)w, (Object)1);
    }
}

lambda$0 の戻り値の型が Tuple2<String, Integer> となっており、型引数が失われずに残っています。(これが実行に成功した理由)

なお、-genericsignature オプションを付けずにコンパイルすると JDK と同様に型引数が失われます。

Groovy で Apache Flink を使用

Groovy で Apache Spark を使用」と同様の処理を Apache Flink で試してみました。

今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170311/

サンプルスクリプト

今回はローカルで実行するだけなので ExecutionEnvironment.createLocalEnvironment() で取得した LocalEnvironment を使用します。

map メソッドの引数へ Groovy のクロージャを使ったところ、org.apache.flink.api.common.InvalidProgramException: The implementation of the MapFunction is not serializable. ・・・ となってしまい駄目でしたので、MapFunction の実装クラスを定義しました。

その場合、MapFunction の型引数をきちんと指定する必要があります。(そうしないと InvalidTypesException が発生)

なお、flink-clients_2.10 を使用する場合、scala-library の @Grab 定義は不要でした。(flink-clients_2.11 の場合のみ scala-library が必要)

money_count.groovy
@Grapes([
    @Grab('org.apache.flink:flink-java:1.2.0'),
    @GrabExclude('io.netty#netty;3.7.0.Final')
])
@Grab('org.apache.flink:flink-clients_2.11:1.2.0')
@Grab('org.scala-lang:scala-library:2.11.8')
@Grab('org.jboss.netty:netty:3.2.10.Final')
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.java.ExecutionEnvironment
import org.apache.flink.api.java.tuple.Tuple2
import groovy.transform.CompileStatic

// @CompileStatic は必須ではない(無くても動作する)
@CompileStatic
class ToTuple implements MapFunction<String, Tuple2<String, Integer>> {
    Tuple2 map(String v) {
        new Tuple2(v, 1)
    }
}

def env = ExecutionEnvironment.createLocalEnvironment()

env.readTextFile(args[0]).map(new ToTuple()).groupBy(0).sum(1).print()

groupBy メソッドではグルーピング対象とする項目を、sum メソッドでは合計する項目を数値で指定します。

実行

Groovy のデフォルト設定では java.lang.IllegalArgumentException: Size of total memory must be positive. が発生しましたので、JAVA_OPTS 環境変数で最大メモリサイズ (-Xmx) を変更して実行します。

実行結果
> set JAVA_OPTS=-Xmx512m
> groovy money_count.groovy input_sample.txt

Connected to JobManager at Actor[akka://flink/user/jobmanager_1#1033779003]
03/08/2017 00:56:11     Job execution switched to status RUNNING.
・・・
(10000,2)
(10,2)
(100,2)
(50,1)
(500,1)
(1,2)
(1000,3)
(2000,1)
(5,3)

input_sample.txt の内容は以下の通りです。

input_sample.txt
100
1
5
50
500
1000
10000
1000
1
10
5
5
10
100
1000
10000
2000

reveno でイベントソーシング

sourcerer でイベントソーシング」 等と同様の処理を reveno で実装してみました。

今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170306/

はじめに

使用する Gradle ビルド定義ファイルは以下の通りです。

build.gradle
apply plugin: 'application'

mainClassName = 'SampleApp'

repositories {
    jcenter()
}

dependencies {
    compileOnly "org.projectlombok:lombok:1.16.12"
    compile 'org.reveno:reveno-core:1.23'
}

lombok は必須ではありません。

(a) transaction 版

reveno では transaction メソッドを使う方法と transactionAction メソッドを使う方法が用意されているようなので、まずは transaction メソッドを使ってみます。

イベントクラスの作成

各種イベント用のクラスを作成します。

reveno はこれまでに試したフレームワークとは異なり、イベントからエンティティの状態を復元したりはしないのでイベントクラスは必須ではありません。(EventBus へ publishEvent しないのであれば不要)

そのため、reveno の場合はイベントソーシングではなくコマンドソーシングと呼べるのかもしれません。

在庫作成イベント src/main/java/sample/events/InventoryItemCreated.java
package sample.events;

import lombok.Value;

@Value
public class InventoryItemCreated {
    private long id;
}
在庫名の変更イベント src/main/java/sample/events/InventoryItemRenamed.java
package sample.events;

import lombok.Value;

@Value
public class InventoryItemRenamed {
    private long id;
    private String newName;
}
在庫数の変更イベント src/main/java/sample/events/ItemsCheckedInToInventory.java
package sample.events;

import lombok.Value;

@Value
public class ItemsCheckedInToInventory {
    private long id;
    private int count;
}

モデルクラスの作成

エンティティクラスとビュークラスを作成します。

エンティティクラスは状態を保存するため、ビュークラスはクエリーの結果としてエンティティクラスから変換して返す事になります。

エンティティクラス src/main/java/sample/model/InventoryItem.java
package sample.model;

import lombok.Value;

@Value
public class InventoryItem {
    private String name;
    private int count;
}
ビュークラス src/main/java/sample/model/InventoryItemView.java
package sample.model;

import lombok.Value;

@Value
public class InventoryItemView {
    private long id;
    private String name;
    private int count;
}

実行クラスの作成

今回はトランザクションやイベントのハンドリング処理等をこのクラスへ実装する事にしました。

transaction メソッドを使用する場合、文字列でトランザクションのアクションを定義し、アクションの実行時に Map でパラメータを渡せばよさそうです。

永続化したデータは Engine のコンストラクタ引数で指定したディレクトリ内のファイルへ保存されるようになっており、storeremap した内容は tx-xxx ファイルへ、publishEvent した内容は evn-xxx ファイルへ保存されるようです。※

 ※ publishEvent を実行しなかった場合、
    evn-xxx ファイルの内容は空になりました

QueryManager を使ってデータを取得する場合、エンティティを直接取得する事はできないので、viewMapper メソッドを使ってエンティティクラスからビュークラスへのマッピングを設定します。

イベントのハンドリングは events メソッドで取得した EventsManager に対して実施します。

今回は、executeSync のような同期用メソッドのみを使っていますが、非同期用のメソッドも用意されています。

実行クラス src/main/java/SampleApp.java
import lombok.val;

import org.reveno.atp.core.Engine;
import org.reveno.atp.utils.MapUtils;

import sample.model.InventoryItem;
import sample.model.InventoryItemView;
import sample.events.InventoryItemCreated;
import sample.events.InventoryItemRenamed;
import sample.events.ItemsCheckedInToInventory;

public class SampleApp {
    public static void main(String... args) {
        val reveno = new Engine("db");

        // 各種ハンドラやマッピング等の設定を実施
        setUp(reveno);

        reveno.startup();

        // 在庫の作成
        long id = reveno.executeSync("createInventoryItem",
                MapUtils.map("name", "sample1"));

        System.out.println("id: " + id);

        // 在庫数の更新
        reveno.executeSync("checkInItemsToInventory", MapUtils.map("id", id, "count", 5));
        // 在庫数の更新
        reveno.executeSync("checkInItemsToInventory", MapUtils.map("id", id, "count", 3));

        // 検索
        val res = reveno.query().find(InventoryItemView.class, id);

        System.out.println("result: " + res);

        reveno.shutdown();
    }

    private static void setUp(Engine reveno) {
        // エンティティクラスをビュークラスへ変換する設定
        reveno.domain().viewMapper(
            InventoryItem.class,
            InventoryItemView.class,
            (id, e, r) -> new InventoryItemView(id, e.getName(), e.getCount())
        );

        // 在庫の作成処理
        reveno.domain().transaction("createInventoryItem", (t, ctx) -> {
            long id = t.id();
            String name = t.arg("name");
            // エンティティ(状態)の保存
            ctx.repo().store(id, new InventoryItem(name, 0));

            // イベントの発行
            ctx.eventBus().publishEvent(new InventoryItemCreated(id));
            ctx.eventBus().publishEvent(new InventoryItemRenamed(id, name));

        }).uniqueIdFor(InventoryItem.class).command();

        // 在庫数の更新処理
        reveno.domain().transaction("checkInItemsToInventory", (t, ctx) -> {
            long id = t.longArg("id");
            int count = t.intArg("count");
            // エンティティ(状態)の更新
            ctx.repo().remap(id, InventoryItem.class, (rid, state) ->
                    new InventoryItem(state.getName(), state.getCount() + count));
            // イベントの発行
            ctx.eventBus().publishEvent(new ItemsCheckedInToInventory(id, count));
        }).command();

        // InventoryItemCreated イベントのハンドリング設定
        reveno.events().eventHandler(InventoryItemCreated.class, (event, meta) ->
                System.out.println("*** create event: " + event +
                        ", transactionTime: " + meta.getTransactionTime() +
                        ", isRestore: " + meta.isRestore()));
    }
}

実行

gradle run で実行した結果です。

実行結果
> gradle run

・・・
id: 1
result: InventoryItemView(id=1, name=sample1, count=8)
・・・
*** create event: InventoryItemCreated(id=1), transactionTime: 1488718421288, isRestore: false
・・・

(b) transactionAction 版

イベントクラス等は同じものを使用して transactionAction を使った処理を作成してみます。

コマンドクラスの作成

transactionAction ではコマンドクラスを使う事になるので作成します。

id の値はインスタンス化の時点では決定せず、コマンドハンドラ内で設定することになるので、@Wither を使って id の値のみ変更したコピーを返すメソッド (withId) を用意するようにしています。

在庫作成コマンド src/main/java/sample/commands/CreateInventoryItem.java
package sample.commands;

import lombok.Value;
import lombok.experimental.NonFinal;
import lombok.experimental.Wither;

@Value
public class CreateInventoryItem {
    @Wither @NonFinal private long id;
    private String name;
}
在庫数の更新コマンド src/main/java/sample/commands/CheckInItemsToInventory.java
package sample.commands;

import lombok.Value;

@Value
public class CheckInItemsToInventory {
    private long id;
    private int count;
}

実行クラスの作成

command メソッドでコマンドハンドラを設定し、コマンド毎のトランザクションアクションを transactionAction で設定します。

コマンドハンドラ内で executeTxAction へコマンドを渡せば該当するトランザクションアクションが実行されます。

実行クラス src/main/java/SampleApp.java
import lombok.val;

import org.reveno.atp.core.Engine;

import sample.commands.CheckInItemsToInventory;
import sample.commands.CreateInventoryItem;
import sample.model.InventoryItem;
import sample.model.InventoryItemView;
import sample.events.InventoryItemCreated;
import sample.events.InventoryItemRenamed;
import sample.events.ItemsCheckedInToInventory;

public class SampleApp {
    public static void main(String... args) {
        val reveno = new Engine("db");

        setUp(reveno);

        reveno.startup();

        // 在庫の作成
        long id = reveno.executeSync(new CreateInventoryItem(0, "sample1"));

        System.out.println("id: " + id);

        // 在庫数の更新
        reveno.executeSync(new CheckInItemsToInventory(id, 5));
        // 在庫数の更新
        reveno.executeSync(new CheckInItemsToInventory(id, 3));

        // 検索
        val res = reveno.query().find(InventoryItemView.class, id);

        System.out.println("result: " + res);

        reveno.shutdown();
    }

    private static void setUp(Engine reveno) {
        ・・・
        // 在庫作成コマンドのハンドリング設定
        reveno.domain().command(CreateInventoryItem.class, Long.class, (cmd, ctx) -> {
            long id = ctx.id(InventoryItem.class);
            // id を更新してトランザクションアクションを実行
            ctx.executeTxAction(cmd.withId(id));
            return id;
        });

        // 在庫数の更新コマンドのハンドリング設定
        reveno.domain().command(CheckInItemsToInventory.class, (cmd, ctx) -> ctx.executeTxAction(cmd));

        reveno.domain().transactionAction(CreateInventoryItem.class, (act, ctx) -> {
            // エンティティ(状態)の保存
            ctx.repo().store(act.getId(), new InventoryItem(act.getName(), 0));

            // イベントの発行
            ctx.eventBus().publishEvent(new InventoryItemCreated(act.getId()));
            ctx.eventBus().publishEvent(new InventoryItemRenamed(act.getId(), act.getName()));
        });

        reveno.domain().transactionAction(CheckInItemsToInventory.class, (act, ctx) -> {
            // エンティティ(状態)の更新
            ctx.repo().remap(act.getId(), InventoryItem.class, (id, state) ->
                    new InventoryItem(state.getName(), state.getCount() + act.getCount()));

            // イベントの発行
            ctx.eventBus().publishEvent(new ItemsCheckedInToInventory(act.getId(), act.getCount()));
        });

        ・・・
    }
}

実行

gradle run で実行した結果です。

実行結果
> gradle run

・・・
id: 1
*** create event: InventoryItemCreated(id=1), transactionTime: 1488721568341, isRestore: false
result: InventoryItemView(id=1, name=sample1, count=8)
・・・

Groovy で Cassandra を組み込み実行

Groovy で Apache Cassandra を組み込み実行してみました。

今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170227/

組み込み実行

まずは、設定ファイルを用意しておきます。

今回は実行に必要な最小限の設定を行っています。

embed.conf
cluster_name: 'Test Cluster'

listen_address: localhost

commitlog_sync: periodic
commitlog_sync_period_in_ms: 10000

partitioner: org.apache.cassandra.dht.Murmur3Partitioner
endpoint_snitch: SimpleSnitch

seed_provider:
    - class_name: org.apache.cassandra.locator.SimpleSeedProvider
      parameters:
          - seeds: "127.0.0.1"

# CQL クライアントで接続するために必要
start_native_transport: true

ここで、Apache Cassandra 3.10 では thrift クライアントで接続するためのポート 9160 はデフォルトで有効のようですが、CQL クライアント用のポート 9042 を有効化するには start_native_transport の設定が必要でした。

ポート番号 用途
9042 CQL クライアント用
9160 thrift クライアント用

Cassandra を組み込み実行する Groovy スクリプトは以下の通りです。

cassandra.config で設定ファイル、cassandra.storagedir でデータディレクトリのパスを設定、CassandraDaemonインスタンス化して activate します。(deactivate を実行すると停止します)

cassandra_embed.groovy
@Grab('org.apache.cassandra:cassandra-all:3.10')
import org.apache.cassandra.service.CassandraDaemon

def conf = 'embed.yaml'
def dir = new File(args[0])

if (!dir.exists()) {
    dir.mkdirs()
}

System.setProperty('cassandra.config', conf)
System.setProperty('cassandra-foreground', 'true')
System.setProperty('cassandra.storagedir', dir.absolutePath)

def cassandra = new CassandraDaemon()
// 開始
cassandra.activate()

System.in.read()

// 終了
cassandra.deactivate()

実行結果は以下の通りで、特に問題なく起動できました。

実行
> groovy cassandra_embed.groovy data

・・・
21:30:46.493 [main] INFO  o.apache.cassandra.transport.Server - Starting listening for CQL clients on localhost/127.0.0.1:9042 (unencrypted)...
・・・
21:30:46.790 [Thread-1] INFO  o.a.cassandra.thrift.ThriftServer - Listening for thrift clients...

動作確認

CQL クライアントを使って Cassandra への接続確認を行います。

cqlsh 利用

まずは、Cassandra 3.10 に同梱されている cqlsh コマンドを使って、キースペースとテーブルを作成しデータ登録を行います。

ここで、cqlsh (本体は cqlsh.py) の実行には Python の実行環境が必要です。

cqlsh による操作結果
> cqlsh

・・・
Connected to Test Cluster at 127.0.0.1:9042.
[cqlsh 5.0.1 | Cassandra 3.10 | CQL spec 3.4.4 | Native protocol v4]
・・・

cqlsh> CREATE KEYSPACE sample WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor' : 1};

cqlsh> use sample;

cqlsh:sample> CREATE TABLE data (id text PRIMARY KEY, name text, value int);

cqlsh:sample> INSERT INTO data (id, name, value) values ('d1', 'sample1', 1);
cqlsh:sample> INSERT INTO data (id, name, value) values ('d2', 'sample2', 20);
cqlsh:sample> INSERT INTO data (id, name, value) values ('d3', 'sample3', 300);

cqlsh:sample> SELECT * FROM data;

 id | name    | value
----+---------+-------
 d2 | sample2 |    20
 d1 | sample1 |     1
 d3 | sample3 |   300

(3 rows)

Datastax Java Driver for Apache Cassandra 利用

次に、登録したデータを Datastax Java Driver for Apache Cassandra を使って検索してみます。

netty と jffi モジュールで Error grabbing Grapes -- [download failed: ・・・] となったので、@GrabExclude を使って回避しています。

client_sample.groovy
@Grapes([
    @Grab('com.datastax.cassandra:cassandra-driver-core:3.1.4'),
    @GrabExclude('io.netty#netty-handler;4.0.37'),
    @GrabExclude('com.github.jnr#jffi;1.2.10')
])
@Grab('io.netty:netty-all:4.0.44.Final')
@Grab('org.slf4j:slf4j-nop:1.7.23')
import com.datastax.driver.core.Cluster

Cluster.builder().addContactPoint('localhost').build().withCloseable { cluster ->
    cluster.connect('sample').withCloseable { session ->

        def res = session.execute('select * from data')

        res.each {
            println it
        }
    }
}

実行結果は以下の通りです。

実行結果
> groovy client_sample.groovy

Row[d2, sample2, 20]
Row[d1, sample1, 1]
Row[d3, sample3, 300]