読者です 読者をやめる 読者になる 読者になる

Groovy で Dempsy を単独実行1

以前、Groovy で Storm を使う にて Storm を Groovy から使ってみましたが、今回は同様のフレームワークである Dempsy を Groovy で単独実行してみました。

サンプルソースは http://github.com/fits/try_samples/tree/master/blog/20131130/

はじめに

Dempsy アプリケーションは基本的に下記のように実装します。

  • (1) メッセージクラスの作成
  • (2) メッセージ処理クラスの作成
  • (3) アダプタークラスの作成
  • (4) アプリケーション定義の構築
  • (5) Dempsy の構築と実行

Dempsy の基本動作は以下のようになっているようです。

  • メッセージクラスのキー値毎にメッセージ処理クラスがインスタンス化される(clone() でクローンが作られる)
  • アダプタークラスで Dispatcher にメッセージを dispatch() すると該当キーの担当メッセージ処理インスタンスへメッセージが渡される
Adaptor -> Dispatcher -> MessageProcessor

また、KeySource の指定有無によってもメッセージ処理クラスのインスタンス化されるタイミングが異なります。

KeySource の指定 メッセージ処理クラスのインスタンス化タイミング
新しくキーが発生する度にその都度インスタンスが作られる
起動時に処理するキー数に応じたインスタンスが作られる

Dempsy アプリケーションの実装

前回の Spark でも実装したお金の数え上げ処理を今回も実装してみることにします。

ここで、Dempsy はリアルタイムにメッセージを処理するためのフレームワークですので、固定的なファイルの内容を処理するのは本来の用途とは異なる点にご注意ください。

(1) メッセージクラスの作成

まずはメッセージクラスを作成します。 メッセージクラスは以下のように作成します。

  • キー値を返す getter メソッドに @MessageKey を付ける

なお、下記では @TupleConstructor を使っていますが、これはコンストラクタの定義を省略するためのもので Dempsy とは特に関係ありません。

@TupleConstructor
class Money {
    String moneyText

    @MessageKey
    String getMoneyText() {
        moneyText
    }
}

(2) メッセージ処理クラスの作成

メッセージ処理クラスは以下のように作成します。

  • @MessageProcessor を付与
  • Cloneable インターフェースを実装
  • メッセージを処理するメソッドに @MessageHandler を付与

これで必要最小限のメッセージ処理クラスを実装できますが、下記のような追加処理も実装できます。

  • アクティブ化の処理を行うメソッドに @Activation を付与 (引数にキーの値が渡される)
  • パッシブ化の処理を行うメソッドに @Passivation を付与
  • 出力処理を行うメソッドに @Output を付与

下記サンプルでは Cloneable インターフェースを実装する代わりに Groovy の @AutoClone を使用しています。

また、キーとカウント数を出力するために @Activation と @Output を使っています。

@MessageProcessor
@AutoClone
class MoneyCount {
    private long count = 0
    private String key

    // アクティブ化の処理
    @Activation
    void setMoneyKey(String key) {
        println 'MoneyCount.activation'
        this.key = key
    }

    // メッセージの処理
    @MessageHandler
    void countMoney(Money money) {
        // カウントアップ
        count++
    }

    // 結果出力
    @Output
    void printResults() {
        println "key: ${key}, count: ${count}"
    }
}

(3) アダプタークラスの作成

アダプタークラスは以下のように作成します。

  • Adaptor インターフェースを実装
  • setDispatcher() メソッドで設定された Dispatcher オブジェクトへメッセージを dispatch() する

下記では指定されたファイルの行毎にメッセージを作って Dispatcher オブジェクトへ dispatch しています。

class MoneyAdaptor implements Adaptor {
    Dispatcher dispatcher
    private File inputFile

    MoneyAdaptor(File inputFile) {
        this.inputFile = inputFile
    }

    void start() {
        println 'MoneyAdaptor.start ...'

        inputFile.eachLine { line ->
            dispatcher.dispatch(new Money(line))
        }
    }

    void stop() {
        println 'MoneyAdaptor.stop'
    }
}

(4) アプリケーション定義の構築

アプリケーション定義 ApplicationDefinition は以下のようにして構築します。

  • メッセージ処理クラスやアダプタークラスをインスタンス化して ClusterDefinition へ設定
  • ClusterDefinitionApplicationDefinition へ追加
  • @Output の出力処理を行うために ClusterDefinitionOutputExecuter を設定

なお、Dempsy には終了時に 1度だけ結果を出力するような OutputExecuter が用意されていなかったので、下記では自前で実装し設定しています。

def mp = new ClusterDefinition('mp', new MoneyCount())
// 終了時に結果を出力する OutputExecuter を設定
mp.outputExecuter = [
    setOutputInvoker: { invoker ->
        println 'OutputExecuter.setOutputInvoker'
        this.invoker = invoker
    },
    start: { ->
        println 'OutputExecuter.start'
    },
    stop: { ->
        println 'OutputExecuter.stop'
        // 処理結果を出力
        invoker.invokeOutput()
    }
] as OutputExecuter

def app = new ApplicationDefinition('money-count').add(
    new ClusterDefinition('adaptor', new MoneyAdaptor(new File(args[0]))),
    mp
)

(5) Dempsy の構築と実行

最後に Dempsy をインスタンス化し、(4) のアプリケーション定義と下記のような構成を設定すれば start() メソッドで単独実行できます。 (分散実行の場合は設定するクラス構成が多少変わります)

プロパティ名 クラス
clusterSessionFactory LocalClusterSessionFactory
clusterCheck AlwaysInCurrentCluster
defaultRoutingStrategy DecentralizedRoutingStrategy
defaultSerializer KryoSerializer
defaultStatsCollectorFactory StatsCollectorFactoryCoda
defaultTransport BlockingQueueTransport

なお、今回のサンプルでは処理を開始すると自動的には終了しません。

終了時に結果を出力させたいので、ShutdownHook を使って Dempsy の stop() メソッドを呼び出すようにしています。

def dempsy = new Dempsy()

dempsy.applicationDefinitions = [app]
dempsy.clusterSessionFactory = new LocalClusterSessionFactory()
dempsy.clusterCheck = new AlwaysInCurrentCluster()
dempsy.defaultRoutingStrategy = new DecentralizedRoutingStrategy(1, 1)
dempsy.defaultSerializer = new KryoSerializer()
dempsy.defaultStatsCollectorFactory = new StatsCollectorFactoryCoda()
dempsy.defaultTransport = new BlockingQueueTransport()

// 開始
dempsy.start()

Runtime.runtime.addShutdownHook { ->
    println 'shutdown ...'
    // 終了
    dempsy.stop()
}

また、lib-dempsyspring モジュールを使えば Dempsy-localVm.xml ファイルにて Dempsy 単独実行用の構成が定義されているので、自前で clusterSessionFactory 等の設定を行う必要はありません。

実行

それでは実行してみます。

> groovy money_count.groovy input_sample.txt
・・・
OutputExecuter.setOutputInvoker
OutputExecuter.start
MoneyAdaptor.start ...
MoneyCount.activation
MoneyCount.activation
MoneyCount.activation
MoneyCount.activation
MoneyCount.activation
MoneyCount.activation
MoneyCount.activation
MoneyCount.activation
MoneyCount.activation

実行すると上記のログが出力された状態になります。

ここで、input_sample.txt では 9種類のキーを用いており、キー数に応じた MoneyCount.activation が出力されている事が分かります。

とりあえず、自動的には終了しませんので Ctrl + c で終了します。 そうすると、Adaptor と OutputExecuter が終了し、個々の MessageProcessor の結果が出力されます。

shutdown ...
MoneyAdaptor.stop
OutputExecuter.stop
key: 1, count: 2
key: 100, count: 2
key: 10000, count: 2
key: 500, count: 1
key: 1000, count: 3
key: 2000, count: 1
key: 5, count: 3
key: 10, count: 2
key: 50, count: 1
money_count.groovy

ソース全体は下記のようになっています。

@Grab('net.dempsy:lib-dempsyimpl:0.7.9')
@Grab('org.slf4j:slf4j-log4j12:1.7.5')
import com.nokia.dempsy.annotations.Activation
import com.nokia.dempsy.annotations.MessageKey
import com.nokia.dempsy.annotations.MessageProcessor
import com.nokia.dempsy.annotations.MessageHandler
import com.nokia.dempsy.annotations.Output

import com.nokia.dempsy.Adaptor
import com.nokia.dempsy.Dempsy
import com.nokia.dempsy.Dispatcher

import com.nokia.dempsy.config.ApplicationDefinition
import com.nokia.dempsy.config.ClusterDefinition

import com.nokia.dempsy.output.OutputExecuter

import com.nokia.dempsy.cluster.invm.LocalClusterSessionFactory
import com.nokia.dempsy.router.AlwaysInCurrentCluster
import com.nokia.dempsy.router.DecentralizedRoutingStrategy
import com.nokia.dempsy.serialization.kryo.KryoSerializer
import com.nokia.dempsy.monitoring.coda.StatsCollectorFactoryCoda
import com.nokia.dempsy.messagetransport.blockingqueue.BlockingQueueTransport

import groovy.transform.*

// (1) メッセージクラスの作成
@TupleConstructor
class Money {
    String moneyText

    @MessageKey
    String getMoneyText() {
        moneyText
    }
}

// (2) メッセージ処理クラスの作成
@MessageProcessor
@AutoClone
class MoneyCount {
    private long count = 0
    private String key

    @Activation
    void setMoneyKey(String key) {
        println 'MoneyCount.activation'
        this.key = key
    }

    @MessageHandler
    void countMoney(Money money) {
        count++
    }

    @Output
    void printResults() {
        println "key: ${key}, count: ${count}"
    }
}

// (3) アダプタークラスの作成
class MoneyAdaptor implements Adaptor {
    Dispatcher dispatcher
    private File inputFile

    MoneyAdaptor(File inputFile) {
        this.inputFile = inputFile
    }

    void start() {
        println 'MoneyAdaptor.start ...'

        inputFile.eachLine { line ->
            dispatcher.dispatch(new Money(line))
        }
    }

    void stop() {
        println 'MoneyAdaptor.stop'
    }
}

// (4) アプリケーション定義オブジェクトの構築
def mp = new ClusterDefinition('mp', new MoneyCount())
// 終了時に結果を出力する OutputExecuter を設定
mp.outputExecuter = [
    setOutputInvoker: { invoker ->
        println 'OutputExecuter.setOutputInvoker'
        this.invoker = invoker
    },
    start: { ->
        println 'OutputExecuter.start'
    },
    stop: { ->
        println 'OutputExecuter.stop'
        invoker.invokeOutput()
    }
] as OutputExecuter

def app = new ApplicationDefinition('money-count').add(
    new ClusterDefinition('adaptor', new MoneyAdaptor(new File(args[0]))),
    mp
)

// (5) Dempsy オブジェクトの構築と実行
def dempsy = new Dempsy()

dempsy.applicationDefinitions = [app]
dempsy.clusterSessionFactory = new LocalClusterSessionFactory()
dempsy.clusterCheck = new AlwaysInCurrentCluster()
dempsy.defaultRoutingStrategy = new DecentralizedRoutingStrategy(1, 1)
dempsy.defaultSerializer = new KryoSerializer()
dempsy.defaultStatsCollectorFactory = new StatsCollectorFactoryCoda()
dempsy.defaultTransport = new BlockingQueueTransport()

dempsy.start()

Runtime.runtime.addShutdownHook { ->
    println 'shutdown ...'
    dempsy.stop()
}