読者です 読者をやめる 読者になる 読者になる

Groovy で JBoss Drools を使う2 - CEP機能

Java Groovy CEP

前回 id:fits:20120104 に引き続き、今回は Drools の CEP 機能(Drools Fusion)*1を Groovy で簡単に試してみました。

サンプルソースは http://github.com/fits/try_samples/tree/master/blog/20120105/

CEP用ルール定義(.drl ファイル)

それでは CEP 用のルール定義を行います。

基本的な記法はルールエンジン用の .drl ファイルと変わりませんが、declare を使ってデータタイプを CEP 用に Event 宣言(@role(event) を付与)する点が異なります。

Event 宣言によって Sliding Window *2などが使えるようになります。

今回のサンプルでは以下のような処理を実装してみました。

  • 5千円以上の Order が 2秒以内で続いた場合(厳密には終了時間の差が 2秒以内)に出力
  • over window:time(3s) で 3秒間に発生した Order の totalPrice() 結果を sum と from accumulate を使って合計し、1万5千円以上だった場合に合計金額を出力
order_check.drl
package fits.sample

//Event宣言
declare Order
    @role(event)
end

rule "2秒間に注文金額 5千円以上の処理が終了した場合に出力"
    salience 10
    when
        $beforeOrder : Order($beforeName : name, totalPrice >= 5000)
        Order($name : name, totalPrice >= 5000, this finishes[2s] $beforeOrder)
    then
        System.out.printf("注文(5千円以上) : %s, before = %s \n", $name, $beforeName);
end

rule "直近の3秒間に発生した注文の合計金額が 1万5千円以上の場合に出力"
    when
        Number($res : intValue, intValue >= 15000) from accumulate(
            Order($total : totalPrice) over window:time(3s), sum($total)
        )
    then
        System.out.printf("3秒間の合計金額(1万5千円以上) : %d \n", $res);
end


なお、then ブロック内の処理でセミコロン(;)が必須な点に注意して下さい。(セミコロンが無いと Rule Compilation error が発生します)

CEP 処理の実装

実装内容はルールエンジンのものと特に変わりありません。(実装の説明に関しては id:fits:20120104 参照)

order_check.groovy
package fits.sample

@Grab("org.drools:drools-core:5.4.0.Beta1")
@Grab("org.drools:drools-compiler:5.4.0.Beta1")
@Grab("com.sun.xml.bind:jaxb-xjc:2.2.5-b09")
import org.drools.KnowledgeBaseFactory
import org.drools.builder.KnowledgeBuilderFactory
import org.drools.builder.ResourceType
import org.drools.io.ResourceFactory

class Order {
    String name
    BigDecimal subTotalPrice = BigDecimal.ZERO
    BigDecimal discountPrice = BigDecimal.ZERO

    BigDecimal totalPrice() {
        subTotalPrice.subtract(discountPrice)
    }
}

def builder = KnowledgeBuilderFactory.newKnowledgeBuilder()

builder.add(ResourceFactory.newClassPathResource("order_check.drl", getClass()), ResourceType.DRL)

if (builder.hasErrors()) {
    println builder.errors
    return
}

def base = KnowledgeBaseFactory.newKnowledgeBase()
base.addKnowledgePackages(builder.getKnowledgePackages())

def session = base.newStatefulKnowledgeSession()

(0..<10).each {
    def order = new Order(name: "order${it}", subTotalPrice: new BigDecimal((int)Math.random() * 10000))
    println("order : ${order.name}, ${order.totalPrice()}")

    session.insert(order)

    //CEP処理の実施
    session.fireAllRules()
    Thread.sleep(1000)
}
//セッションの終了
session.dispose()
実行例
> groovy order_check.groovy
order : order0, 2588
order : order1, 4988
order : order2, 1266
order : order3, 3617
order : order4, 9771
order : order5, 5338
注文(5千円以上) : order5, before = order4
3秒間の合計金額(1万5千円以上) : 18726
order : order6, 7270
注文(5千円以上) : order6, before = order5
3秒間の合計金額(1万5千円以上) : 22379
order : order7, 6086
注文(5千円以上) : order7, before = order6
3秒間の合計金額(1万5千円以上) : 18694
order : order8, 9529
注文(5千円以上) : order8, before = order7
3秒間の合計金額(1万5千円以上) : 22885
order : order9, 1833
3秒間の合計金額(1万5千円以上) : 17448

ストリームを使った CEP 処理

Drools には entry point というストリームを扱うための仕組みが用意されています。

下記サンプルでは StatefulKnowledgeSession から取得した WorkingMemoryEntryPoint に対してデータを insert しています。

order_check_stream.groovy(ストリーム使用)
・・・
builder.add(ResourceFactory.newClassPathResource("order_check_stream.drl", getClass()), ResourceType.DRL)
・・・
def session = base.newStatefulKnowledgeSession()
//entry point 取得(order stream は .drl ファイル側で定義する)
def stream = session.getWorkingMemoryEntryPoint("order stream")

(0..<10).each {
    def order = new Order(name: "order${it}", subTotalPrice: new BigDecimal((int)Math.random() * 10000))
    println("order : ${order.name}, ${order.totalPrice()}")

    //entry point に対して insert する
    stream.insert(order)

    //CEP処理の実施
    session.fireAllRules()
    Thread.sleep(1000)
}
//セッションの終了
session.dispose()

.drl ファイルで entry point を扱うには from entry-point "ストリーム名" とします。(ストリーム名は getWorkingMemoryEntryPoint メソッドの呼び出しで使用します)

order_check_stream.drl(ストリーム使用)
・・・
rule "2秒間に注文金額 5千円以上の処理が終了した場合に出力"
    salience 10
    when
        $beforeOrder : Order($beforeName : name, totalPrice >= 5000) from entry-point "order stream"
        Order($name : name, totalPrice >= 5000, this finishes[2s] $beforeOrder) from entry-point "order stream"
    then
        System.out.printf("注文(5千円以上) : %s, before = %s \n", $name, $beforeName);
end

rule "直近の3秒間に発生した注文の合計金額が 1万5千円以上の場合に出力"
    when
        Number($res : intValue, intValue >= 15000) from accumulate(
            Order($total : totalPrice) over window:time(3s) from entry-point "order stream", sum($total)
        )
    then
        System.out.printf("3秒間の合計金額(1万5千円以上) : %d \n", $res);
end

*1:複合イベント処理

*2:over window:time(時間) や over window:length(ウインドウサイズ) の機能