Esper で簡単なイベントを処理する
オープンソースの CEP(Complex Event Processing = 複合イベント処理) ツールである Esper の Java 実装を使って簡単なイベント処理を試してみる。
使用した環境は以下の通り。
- Esper 2.3.0
事前準備として、Web サイトから Esper for Java 2.3.0 のアーカイブをダウンロードし、適当なディレクトリに解凍しておく。
Esper 概要
まだ、リファレンスをあまり詳しく見てないのだが。
Esper は BPM/BAM(Business Activity Monitoring) やネットワーク/アプリケーションモニタリングからセンサーネットワークまで、幅広い用途で使われている事を想定されており、大量のイベントストリームを分析・処理するためのツール。
SQL によく似た EPL(Event Proccesing Language) と呼ばれるクエリでイベントの処理を記述できる。
また、イベント定義に以下のようなものが利用できるようになっており、
イベントのプロパティとして以下のようなものが使える。
- Simple: 単一の値
- Indexed: 配列
- Mapped: マップ
- Nested: 入れ子
例えば、以下のような EPL では、SampleEvent の point プロパティ(Simple)が 5以上のものが処理対象とされる。
select * from SampleEvent(point >= 5) select * from SampleEvent where point >= 5
イベント定義
今回は、POJO で簡単なイベントを定義してみる。POJO ではイベントプロパティを getter メソッドとして定義する。
sample/event/SampleEvent.java
package sample.event; public class SampleEvent { private String name; private int point; public SampleEvent(String name, int point) { this.name = name; this.point = point; } public String getName() { return this.name; } public int getPoint() { return this.point; } }
イベント処理
次に、イベント処理部分を作成する。
- Configuration でイベント名とイベントクラスとのマッピングを定義し、EPServiceProvider の取得に使用(EPL 内でイベント名が使えるようになる)
- EPAdministrator オブジェクトの createEPL メソッドを実行し、EPL から EPStatement オブジェクトを生成
- EPStatement にイベントリスナーを設定(EPL に合致したイベントが発生した場合のみイベントリスナーが実行)
- EPRuntime オブジェクトの sendEvent メソッドでイベント送信
sample/SampleProcessor.java
package sample; import com.espertech.esper.client.Configuration; import com.espertech.esper.client.EPServiceProvider; import com.espertech.esper.client.EPServiceProviderManager; import com.espertech.esper.client.EPStatement; import com.espertech.esper.client.UpdateListener; import com.espertech.esper.event.EventBean; import sample.event.SampleEvent; public class SampleProcessor { public static void main(String[] args) { //イベント名とイベントクラスのマッピング定義 Configuration config = new Configuration(); config.addEventTypeAlias("SampleEvent", SampleEvent.class); EPServiceProvider serv = EPServiceProviderManager.getDefaultProvider(config); //EPL から EPStatement オブジェクトを生成 EPStatement st = serv.getEPAdministrator().createEPL("select * from SampleEvent(point >= 5)"); //以下でも可 //EPStatement st = serv.getEPAdministrator().createEPL("select * from SampleEvent where point >= 5"); //イベントリスナーの設定 st.addListener(new UpdateListener() { //イベント処理(point が 5以上のもののみ処理される) public void update(EventBean[] newEvents, EventBean[] oldEvents) { if (newEvents != null && newEvents.length > 0) { System.out.println("event : " + newEvents[0].get("name") + ", " + newEvents[0].get("point")); } } }); //イベント送信 for (int i = 0; i < 10; i++) { int val = (int)(Math.random() * 10); //ランダムな値を設定したイベント送信 serv.getEPRuntime().sendEvent(new SampleEvent("test" + i, val)); } } }
ビルド・実行は以下のビルドファイルを使って Gant で実行するようにした。
build.gant
(注)ESPER_HOME 環境変数に Esper のホームディレクトリを設定しておく
includeTargets << gant.targets.Clean Ant.property(environment: "env") esperHome = Ant.antProject.properties."env.ESPER_HOME" destDir = "dest" classesDir = "$destDir/classes" cleanDirectory << destDir target("default": "") { compile() } target(init: "") { path(id: "project.classpath") { pathelement(path: classesDir) fileset(dir: esperHome) { include(name: "**/*.jar") } } } target(compile: "") { depends(init) Ant.mkdir(dir: classesDir) Ant.javac(srcdir: "src", destdir: classesDir) { classpath(refid: "project.classpath") } } target(run: "") { depends(compile) Ant.java(classname: "sample.SampleProcessor", fork: true) { classpath(refid: "project.classpath") } }
上記ファイルを Gant で実行すると、以下のような結果が出力され、EPL で指定したように point が 5以上の SampleEvent だけが出力(イベントリスナーで処理)されている事が確認できる。
実行結果
>gant run ・・・ [java] event : test1, 9 [java] event : test3, 8 [java] event : test4, 8 [java] event : test7, 6 [java] event : test8, 6