Esper で簡単なイベントを処理する

オープンソースの CEP(Complex Event Processing = 複合イベント処理) ツールである Esper の Java 実装を使って簡単なイベント処理を試してみる。

使用した環境は以下の通り。

  • Esper 2.3.0

事前準備として、Web サイトから Esper for Java 2.3.0 のアーカイブをダウンロードし、適当なディレクトリに解凍しておく。

Esper 概要

まだ、リファレンスをあまり詳しく見てないのだが。

Esper は BPM/BAM(Business Activity Monitoring) やネットワーク/アプリケーションモニタリングからセンサーネットワークまで、幅広い用途で使われている事を想定されており、大量のイベントストリームを分析・処理するためのツール。
SQL によく似た EPL(Event Proccesing Language) と呼ばれるクエリでイベントの処理を記述できる。

また、イベント定義に以下のようなものが利用できるようになっており、

  • POJO
  • Map クラス(java.util.Map)
  • XML DOM

イベントのプロパティとして以下のようなものが使える。

  • Simple: 単一の値
  • Indexed: 配列
  • Mapped: マップ
  • Nested: 入れ子

例えば、以下のような EPL では、SampleEvent の point プロパティ(Simple)が 5以上のものが処理対象とされる。

select * from SampleEvent(point >= 5)
select * from SampleEvent where point >= 5

イベント定義

今回は、POJO で簡単なイベントを定義してみる。POJO ではイベントプロパティを getter メソッドとして定義する。

sample/event/SampleEvent.java
package sample.event;

public class SampleEvent {

    private String name;
    private int point;

    public SampleEvent(String name, int point) {
        this.name = name;
        this.point = point;
    }

    public String getName() {
        return this.name;
    }

    public int getPoint() {
        return this.point;
    }
}

イベント処理

次に、イベント処理部分を作成する。

  • Configuration でイベント名とイベントクラスとのマッピングを定義し、EPServiceProvider の取得に使用(EPL 内でイベント名が使えるようになる)
  • EPAdministrator オブジェクトの createEPL メソッドを実行し、EPL から EPStatement オブジェクトを生成
  • EPStatement にイベントリスナーを設定(EPL に合致したイベントが発生した場合のみイベントリスナーが実行)
  • EPRuntime オブジェクトの sendEvent メソッドでイベント送信
sample/SampleProcessor.java
package sample;

import com.espertech.esper.client.Configuration;
import com.espertech.esper.client.EPServiceProvider;
import com.espertech.esper.client.EPServiceProviderManager;
import com.espertech.esper.client.EPStatement;
import com.espertech.esper.client.UpdateListener;
import com.espertech.esper.event.EventBean;

import sample.event.SampleEvent;

public class SampleProcessor {

    public static void main(String[] args) {

        //イベント名とイベントクラスのマッピング定義
        Configuration config = new Configuration();
        config.addEventTypeAlias("SampleEvent", SampleEvent.class);
    
        EPServiceProvider serv = EPServiceProviderManager.getDefaultProvider(config);

        //EPL から EPStatement オブジェクトを生成
        EPStatement st = serv.getEPAdministrator().createEPL("select * from SampleEvent(point >= 5)");
        //以下でも可
        //EPStatement st = serv.getEPAdministrator().createEPL("select * from SampleEvent where point >= 5");

        //イベントリスナーの設定
        st.addListener(new UpdateListener() {
            //イベント処理(point が 5以上のもののみ処理される)
            public void update(EventBean[] newEvents, EventBean[] oldEvents) {
                if (newEvents != null && newEvents.length > 0) {
                    System.out.println("event : " + newEvents[0].get("name") + ", " + newEvents[0].get("point"));
                }
            }
        });

        //イベント送信
        for (int i = 0; i < 10; i++) {
            int val = (int)(Math.random() * 10);
            //ランダムな値を設定したイベント送信
            serv.getEPRuntime().sendEvent(new SampleEvent("test" + i, val));
        }
    }
}

ビルド・実行は以下のビルドファイルを使って Gant で実行するようにした。

build.gant

(注)ESPER_HOME 環境変数に Esper のホームディレクトリを設定しておく

includeTargets << gant.targets.Clean

Ant.property(environment: "env")
esperHome = Ant.antProject.properties."env.ESPER_HOME"

destDir = "dest"
classesDir = "$destDir/classes"

cleanDirectory << destDir

target("default": "") {
    compile()
}

target(init: "") {
    path(id: "project.classpath") {
        pathelement(path: classesDir)

        fileset(dir: esperHome) {
            include(name: "**/*.jar")
        }
    }
}

target(compile: "") {
    depends(init)

    Ant.mkdir(dir: classesDir)

    Ant.javac(srcdir: "src", destdir: classesDir) {
        classpath(refid: "project.classpath")
    }
}

target(run: "") {
    depends(compile)

    Ant.java(classname: "sample.SampleProcessor", fork: true) {
        classpath(refid: "project.classpath")
    }
}

上記ファイルを Gant で実行すると、以下のような結果が出力され、EPL で指定したように point が 5以上の SampleEvent だけが出力(イベントリスナーで処理)されている事が確認できる。

実行結果
>gant run
・・・
     [java] event : test1, 9
     [java] event : test3, 8
     [java] event : test4, 8
     [java] event : test7, 6
     [java] event : test8, 6