Groovy で Dempsy を分散実行

前回前々回に続き、今回は Dempsy を Groovy で分散実行してみます。

サンプルソースは http://github.com/fits/try_samples/tree/master/blog/20131208/

はじめに

Dempsy を分散実行するには ZooKeeper が必要となりますので 「GroovyでZooKeeperを組み込み実行」 で作成した Groovy スクリプトを使う事にします。

また、分散実行するには Dempsy オブジェクトの構成を下記のようにします。 (※ の箇所が単独実行と異なる部分)

Dempsy 分散実行時の構成
プロパティ名 クラス
clusterSessionFactory ZookeeperSessionFactory ※
clusterCheck SpecificClusterCheck ※
defaultRoutingStrategy DecentralizedRoutingStrategy
defaultSerializer KryoSerializer
defaultStatsCollectorFactory StatsCollectorFactoryCoda
defaultTransport TcpTransport ※

Dempsy 分散実行アプリケーション

それでは、前回の KeySource 指定無しのサンプルを元に分散実行への対応を行ってみます。

基本的には Dempsy オブジェクトの構成を変えるだけです。

単独実行と異なり、分散実行では ClusterDefinition 毎にプロセスを起動しますので、実行時引数でクラスター名を指定するようにしています。

なお、dempsy.waitToBeStopped()クラスタadaptor が終了しないようにするための措置です。 (dempsy.waitToBeStopped() が無いと、すぐに終了してしまいます)

money_count_vertx_cluster.groovy
・・・前回と同じ実装・・・

// 実行する ClusterDefinition の名称
def cluster = args[0]

def dempsy = new Dempsy()

dempsy.applicationDefinitions = [app]
dempsy.clusterSessionFactory = new ZookeeperSessionFactory('localhost:2181', 5000)
// 実行する ClusterDefinition を選定
dempsy.clusterCheck = new SpecificClusterCheck(new ClusterId('money-count', cluster))
// mp を 3ノード構成で処理する設定
dempsy.defaultRoutingStrategy = new DecentralizedRoutingStrategy(5, 3)
dempsy.defaultSerializer = new KryoSerializer()
dempsy.defaultStatsCollectorFactory = new StatsCollectorFactoryCoda()
dempsy.defaultTransport = new TcpTransport()

dempsy.start()

Runtime.runtime.addShutdownHook { ->
    println 'shutdown ...'
    dempsy.stop()
}

// Adaptor が終了するのを防止(MessageProcessor の場合は不要)
dempsy.waitToBeStopped()

MessageProcessor (クラスタmp) の実行ノード数などは DecentralizedRoutingStrategy で設定します。

DecentralizedRoutingStrategy のコンストラクタDecentralizedRoutingStrategy(int defaultTotalSlots, int defaultNumNodes) となっており、第1引数でスロット(シャード)数、第2引数でノード数を指定するようになっています。

キー毎の担当スロットが messageKey.hashCode() % defaultTotalSlots で算出され、ノード毎にスロットが割り当てられて処理されます。

今回のサンプルのキー構成でスロットが適切にバラけるように defaultTotalSlots を 5 にしました。

スロット数 5 の場合のキー毎の messageKey.hashCode() % 5 結果
キー名 スロット
1 4
5 3
10 2
50 1
100 0
500 4
1000 3
2000 4
5000 2
10000 1

分散実行

それでは分散実行してみます。

まずは ZooKeeper を実行しておきます。

ZooKeeper 実行
> groovy zk_run.groovy zoo.cfg

次に Adaptor を実行します。

Adaptor 実行
> groovy money_count_vertx_cluster.groovy adaptor
・・・
MoneyAdaptor.start ...
・・・

そして MessageProcessor を実行します。

DecentralizedRoutingStrategy で指定したのは 3ノードですが、ノードが停止した場合の挙動を確認するため今回は 4ノード分のプロセスを実行しました。

(1) mp 実行 (1つ目)
> groovy money_count_vertx_cluster.groovy mp
・・・
2013-12-08 19:42:11,187 [myid:] - INFO  [main-SendThread(localhost:2181):ClientCnxn$SendThread@738] - Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x142d19fda340012, negotiated timeout = 6000
2013-12-08 19:42:15,664 [myid:] - INFO  [main:SimpleThreadPool@270] - Job execution threads will use class loader of thread: main
・・・
(2) mp 実行 (2つ目)
> groovy money_count_vertx_cluster.groovy mp
・・・
(3) mp 実行 (3つ目)
> groovy money_count_vertx_cluster.groovy mp
・・・
(4) mp 実行 (4つ目)
> groovy money_count_vertx_cluster.groovy mp
・・・

結果1 - 全キーへのアクセス

下記のように全キーの URL へアクセスしてカウントアップしてみます。

$ curl http://localhost:8080/1
$ curl http://localhost:8080/10
$ curl http://localhost:8080/100
$ curl http://localhost:8080/1000
$ curl http://localhost:8080/10000
$ curl http://localhost:8080/5000
$ curl http://localhost:8080/500
$ curl http://localhost:8080/50
$ curl http://localhost:8080/5
$ curl http://localhost:8080/2000

(1) ~ (4) の mp の状態と割り当てられたスロットは下記のようになりました。

なお、(4) のノードにはスロットが割り当てられておらずキーも送信されていない事を確認できました。

(1) mp の状態 (スロット 3)
> groovy money_count_vertx_cluster.groovy mp
・・・
MoneyCount.activation : 1000
key: 1000, count: 1
MoneyCount.activation : 5
key: 1000, count: 1
key: 5, count: 1
・・・
(2) mp の状態 (スロット 0, 1)
> groovy money_count_vertx_cluster.groovy mp
・・・
MoneyCount.activation : 100
MoneyCount.activation : 10000
key: 100, count: 1
key: 10000, count: 1
MoneyCount.activation : 50
key: 100, count: 1
key: 10000, count: 1
key: 50, count: 1
・・・
(3) mp の状態 (スロット 2, 4)
> groovy money_count_vertx_cluster.groovy mp
・・・
MoneyCount.activation : 1
MoneyCount.activation : 10
key: 1, count: 1
key: 10, count: 1
MoneyCount.activation : 5000
MoneyCount.activation : 500
key: 1, count: 1
key: 500, count: 1
key: 5000, count: 1
key: 10, count: 1
MoneyCount.activation : 2000
key: 1, count: 1
key: 500, count: 1
key: 2000, count: 1
key: 5000, count: 1
key: 10, count: 1
・・・
(4) mp の状態 (変化なし)
> groovy money_count_vertx_cluster.groovy mp
・・・

結果2 - (1) の mp を停止

(1) の mp を Ctrl + c でプロセス終了して、下記 URL へアクセスしてみます。

$ curl http://localhost:8080/1000
$ curl http://localhost:8080/5

(1) の担当スロットが、代わりに (4) へ割り当てられてキーが送信されてくるようになります。

(4) mp の状態 ((1) の停止後)
> groovy money_count_vertx_cluster.groovy mp
・・・
MoneyCount.activation : 1000
MoneyCount.activation : 5
key: 1000, count: 1
key: 5, count: 1
・・・