Apache Camel で Esper を使ったイベント処理2 - Time・Length batch window

前回(id:fits:20081203)作成した Apache Camel + Esper コンポーネントの環境を使って、ちょっとだけ高度なイベント処理を試してみる。

とりあえず、Esper のビルトイン Data Window Views の中から以下のようなものを使う。

  • Length batch window
    • "win:length_batch(サイズ指定)" という構文を使用
  • Time batch window
    • "win:time_batch(時間指定)" という構文を使用

Length batch window は指定件数発生したイベントを、Time batch window は指定時間内に発生したイベントをまとめて処理するために使用できる。

サンプルの作成

今回もサンプルは Groovy スクリプトで作成する。

下記のサンプルスクリプトでは Time batch window を使って 1秒毎のイベント発生数を出力、Length batch window を使って条件を満たしたイベントが指定件数発生した際に合計を出力するような処理を実装している。

batch_test.groovy ファイル
import org.apache.camel.Processor
import org.apache.camel.impl.DefaultCamelContext
import org.apache.camel.builder.RouteBuilder

class SampleRoute extends RouteBuilder {
    void configure() {
        from("direct:start").to("esper:test")

        //1秒毎のイベント発生件数と value の合計を出力
        from("esper:test?eql=select count(*) as counter, sum(value) as total from TestEvent.win:time_batch(1 sec)").process({println "[time]   counter: ${it.in.body.counter}, total: ${it.in.body.total}"} as Processor)

        //value が 5以上のイベントが 3件発生したら value の合計を出力
        from("esper:test?eql=select count(*) as counter, sum(value) as total from TestEvent(value >= 5).win:length_batch(3)").process({println "[length] counter: ${it.in.body.counter}, total: ${it.in.body.total}"} as Processor)
    }
}

//イベントクラス
class TestEvent {
    String name
    int value
}

ctx = new DefaultCamelContext()
ctx.addRoutes(new SampleRoute())

template = ctx.createProducerTemplate()

ctx.start()

(1..10).each {
    template.sendBody("direct:start", new TestEvent(name: "test${it}", value: Math.random() * 10))

    Thread.sleep((int)(Math.random() * 1000))
}

Thread.sleep(1000)

ctx.stop()

サンプルの実行

実行結果は以下の通り。

実行結果例
>groovy batch_test.groovy
[time]   counter: 2, total: 9
[time]   counter: 2, total: 12
[length] counter: 3, total: 23
[time]   counter: 3, total: 18
[time]   counter: 3, total: 7
[time]   counter: 0, total: null