Groovy で Dempsy を単独実行1
以前、Groovy で Storm を使う にて Storm を Groovy から使ってみましたが、今回は同様のフレームワークである Dempsy を Groovy で単独実行してみました。
サンプルソースは http://github.com/fits/try_samples/tree/master/blog/20131130/
はじめに
Dempsy アプリケーションは基本的に下記のように実装します。
- (1) メッセージクラスの作成
- (2) メッセージ処理クラスの作成
- (3) アダプタークラスの作成
- (4) アプリケーション定義の構築
- (5) Dempsy の構築と実行
Dempsy の基本動作は以下のようになっているようです。
- メッセージクラスのキー値毎にメッセージ処理クラスがインスタンス化される(
clone()
でクローンが作られる) - アダプタークラスで
Dispatcher
にメッセージをdispatch()
すると該当キーの担当メッセージ処理インスタンスへメッセージが渡される
Adaptor -> Dispatcher -> MessageProcessor
また、KeySource の指定有無によってもメッセージ処理クラスのインスタンス化されるタイミングが異なります。
KeySource の指定 | メッセージ処理クラスのインスタンス化タイミング |
---|---|
無 | 新しくキーが発生する度にその都度インスタンスが作られる |
有 | 起動時に処理するキー数に応じたインスタンスが作られる |
Dempsy アプリケーションの実装
前回の Spark でも実装したお金の数え上げ処理を今回も実装してみることにします。
ここで、Dempsy はリアルタイムにメッセージを処理するためのフレームワークですので、固定的なファイルの内容を処理するのは本来の用途とは異なる点にご注意ください。
(1) メッセージクラスの作成
まずはメッセージクラスを作成します。 メッセージクラスは以下のように作成します。
- キー値を返す getter メソッドに
@MessageKey
を付ける
なお、下記では @TupleConstructor
を使っていますが、これはコンストラクタの定義を省略するためのもので Dempsy とは特に関係ありません。
@TupleConstructor class Money { String moneyText @MessageKey String getMoneyText() { moneyText } }
(2) メッセージ処理クラスの作成
メッセージ処理クラスは以下のように作成します。
@MessageProcessor
を付与Cloneable
インターフェースを実装- メッセージを処理するメソッドに
@MessageHandler
を付与
これで必要最小限のメッセージ処理クラスを実装できますが、下記のような追加処理も実装できます。
- アクティブ化の処理を行うメソッドに
@Activation
を付与 (引数にキーの値が渡される) - パッシブ化の処理を行うメソッドに
@Passivation
を付与 - 出力処理を行うメソッドに
@Output
を付与
下記サンプルでは Cloneable インターフェースを実装する代わりに Groovy の @AutoClone
を使用しています。
また、キーとカウント数を出力するために @Activation と @Output を使っています。
@MessageProcessor @AutoClone class MoneyCount { private long count = 0 private String key // アクティブ化の処理 @Activation void setMoneyKey(String key) { println 'MoneyCount.activation' this.key = key } // メッセージの処理 @MessageHandler void countMoney(Money money) { // カウントアップ count++ } // 結果出力 @Output void printResults() { println "key: ${key}, count: ${count}" } }
(3) アダプタークラスの作成
アダプタークラスは以下のように作成します。
Adaptor
インターフェースを実装setDispatcher()
メソッドで設定されたDispatcher
オブジェクトへメッセージをdispatch()
する
下記では指定されたファイルの行毎にメッセージを作って Dispatcher オブジェクトへ dispatch しています。
class MoneyAdaptor implements Adaptor { Dispatcher dispatcher private File inputFile MoneyAdaptor(File inputFile) { this.inputFile = inputFile } void start() { println 'MoneyAdaptor.start ...' inputFile.eachLine { line -> dispatcher.dispatch(new Money(line)) } } void stop() { println 'MoneyAdaptor.stop' } }
(4) アプリケーション定義の構築
アプリケーション定義 ApplicationDefinition
は以下のようにして構築します。
- メッセージ処理クラスやアダプタークラスをインスタンス化して
ClusterDefinition
へ設定 ClusterDefinition
をApplicationDefinition
へ追加@Output
の出力処理を行うためにClusterDefinition
へOutputExecuter
を設定
なお、Dempsy には終了時に 1度だけ結果を出力するような OutputExecuter が用意されていなかったので、下記では自前で実装し設定しています。
def mp = new ClusterDefinition('mp', new MoneyCount()) // 終了時に結果を出力する OutputExecuter を設定 mp.outputExecuter = [ setOutputInvoker: { invoker -> println 'OutputExecuter.setOutputInvoker' this.invoker = invoker }, start: { -> println 'OutputExecuter.start' }, stop: { -> println 'OutputExecuter.stop' // 処理結果を出力 invoker.invokeOutput() } ] as OutputExecuter def app = new ApplicationDefinition('money-count').add( new ClusterDefinition('adaptor', new MoneyAdaptor(new File(args[0]))), mp )
(5) Dempsy の構築と実行
最後に Dempsy をインスタンス化し、(4) のアプリケーション定義と下記のような構成を設定すれば start()
メソッドで単独実行できます。 (分散実行の場合は設定するクラス構成が多少変わります)
プロパティ名 | クラス |
---|---|
clusterSessionFactory | LocalClusterSessionFactory |
clusterCheck | AlwaysInCurrentCluster |
defaultRoutingStrategy | DecentralizedRoutingStrategy |
defaultSerializer | KryoSerializer |
defaultStatsCollectorFactory | StatsCollectorFactoryCoda |
defaultTransport | BlockingQueueTransport |
なお、今回のサンプルでは処理を開始すると自動的には終了しません。
終了時に結果を出力させたいので、ShutdownHook を使って Dempsy の stop()
メソッドを呼び出すようにしています。
def dempsy = new Dempsy() dempsy.applicationDefinitions = [app] dempsy.clusterSessionFactory = new LocalClusterSessionFactory() dempsy.clusterCheck = new AlwaysInCurrentCluster() dempsy.defaultRoutingStrategy = new DecentralizedRoutingStrategy(1, 1) dempsy.defaultSerializer = new KryoSerializer() dempsy.defaultStatsCollectorFactory = new StatsCollectorFactoryCoda() dempsy.defaultTransport = new BlockingQueueTransport() // 開始 dempsy.start() Runtime.runtime.addShutdownHook { -> println 'shutdown ...' // 終了 dempsy.stop() }
また、lib-dempsyspring モジュールを使えば Dempsy-localVm.xml ファイルにて Dempsy 単独実行用の構成が定義されているので、自前で clusterSessionFactory 等の設定を行う必要はありません。
実行
それでは実行してみます。
> groovy money_count.groovy input_sample.txt ・・・ OutputExecuter.setOutputInvoker OutputExecuter.start MoneyAdaptor.start ... MoneyCount.activation MoneyCount.activation MoneyCount.activation MoneyCount.activation MoneyCount.activation MoneyCount.activation MoneyCount.activation MoneyCount.activation MoneyCount.activation
実行すると上記のログが出力された状態になります。
ここで、input_sample.txt では 9種類のキーを用いており、キー数に応じた MoneyCount.activation が出力されている事が分かります。
とりあえず、自動的には終了しませんので Ctrl + c
で終了します。
そうすると、Adaptor と OutputExecuter が終了し、個々の MessageProcessor の結果が出力されます。
shutdown ... MoneyAdaptor.stop OutputExecuter.stop key: 1, count: 2 key: 100, count: 2 key: 10000, count: 2 key: 500, count: 1 key: 1000, count: 3 key: 2000, count: 1 key: 5, count: 3 key: 10, count: 2 key: 50, count: 1
money_count.groovy
ソース全体は下記のようになっています。
@Grab('net.dempsy:lib-dempsyimpl:0.7.9') @Grab('org.slf4j:slf4j-log4j12:1.7.5') import com.nokia.dempsy.annotations.Activation import com.nokia.dempsy.annotations.MessageKey import com.nokia.dempsy.annotations.MessageProcessor import com.nokia.dempsy.annotations.MessageHandler import com.nokia.dempsy.annotations.Output import com.nokia.dempsy.Adaptor import com.nokia.dempsy.Dempsy import com.nokia.dempsy.Dispatcher import com.nokia.dempsy.config.ApplicationDefinition import com.nokia.dempsy.config.ClusterDefinition import com.nokia.dempsy.output.OutputExecuter import com.nokia.dempsy.cluster.invm.LocalClusterSessionFactory import com.nokia.dempsy.router.AlwaysInCurrentCluster import com.nokia.dempsy.router.DecentralizedRoutingStrategy import com.nokia.dempsy.serialization.kryo.KryoSerializer import com.nokia.dempsy.monitoring.coda.StatsCollectorFactoryCoda import com.nokia.dempsy.messagetransport.blockingqueue.BlockingQueueTransport import groovy.transform.* // (1) メッセージクラスの作成 @TupleConstructor class Money { String moneyText @MessageKey String getMoneyText() { moneyText } } // (2) メッセージ処理クラスの作成 @MessageProcessor @AutoClone class MoneyCount { private long count = 0 private String key @Activation void setMoneyKey(String key) { println 'MoneyCount.activation' this.key = key } @MessageHandler void countMoney(Money money) { count++ } @Output void printResults() { println "key: ${key}, count: ${count}" } } // (3) アダプタークラスの作成 class MoneyAdaptor implements Adaptor { Dispatcher dispatcher private File inputFile MoneyAdaptor(File inputFile) { this.inputFile = inputFile } void start() { println 'MoneyAdaptor.start ...' inputFile.eachLine { line -> dispatcher.dispatch(new Money(line)) } } void stop() { println 'MoneyAdaptor.stop' } } // (4) アプリケーション定義オブジェクトの構築 def mp = new ClusterDefinition('mp', new MoneyCount()) // 終了時に結果を出力する OutputExecuter を設定 mp.outputExecuter = [ setOutputInvoker: { invoker -> println 'OutputExecuter.setOutputInvoker' this.invoker = invoker }, start: { -> println 'OutputExecuter.start' }, stop: { -> println 'OutputExecuter.stop' invoker.invokeOutput() } ] as OutputExecuter def app = new ApplicationDefinition('money-count').add( new ClusterDefinition('adaptor', new MoneyAdaptor(new File(args[0]))), mp ) // (5) Dempsy オブジェクトの構築と実行 def dempsy = new Dempsy() dempsy.applicationDefinitions = [app] dempsy.clusterSessionFactory = new LocalClusterSessionFactory() dempsy.clusterCheck = new AlwaysInCurrentCluster() dempsy.defaultRoutingStrategy = new DecentralizedRoutingStrategy(1, 1) dempsy.defaultSerializer = new KryoSerializer() dempsy.defaultStatsCollectorFactory = new StatsCollectorFactoryCoda() dempsy.defaultTransport = new BlockingQueueTransport() dempsy.start() Runtime.runtime.addShutdownHook { -> println 'shutdown ...' dempsy.stop() }