Groovy で Dempsy を分散実行
前回 と 前々回に続き、今回は Dempsy を Groovy で分散実行してみます。
サンプルソースは http://github.com/fits/try_samples/tree/master/blog/20131208/
はじめに
Dempsy を分散実行するには ZooKeeper が必要となりますので 「GroovyでZooKeeperを組み込み実行」 で作成した Groovy スクリプトを使う事にします。
また、分散実行するには Dempsy オブジェクトの構成を下記のようにします。 (※ の箇所が単独実行と異なる部分)
Dempsy 分散実行時の構成
プロパティ名 | クラス |
---|---|
clusterSessionFactory | ZookeeperSessionFactory ※ |
clusterCheck | SpecificClusterCheck ※ |
defaultRoutingStrategy | DecentralizedRoutingStrategy |
defaultSerializer | KryoSerializer |
defaultStatsCollectorFactory | StatsCollectorFactoryCoda |
defaultTransport | TcpTransport ※ |
Dempsy 分散実行アプリケーション
それでは、前回の KeySource 指定無しのサンプルを元に分散実行への対応を行ってみます。
基本的には Dempsy オブジェクトの構成を変えるだけです。
単独実行と異なり、分散実行では ClusterDefinition 毎にプロセスを起動しますので、実行時引数でクラスター名を指定するようにしています。
なお、dempsy.waitToBeStopped()
はクラスター adaptor
が終了しないようにするための措置です。 (dempsy.waitToBeStopped() が無いと、すぐに終了してしまいます)
money_count_vertx_cluster.groovy
・・・前回と同じ実装・・・ // 実行する ClusterDefinition の名称 def cluster = args[0] def dempsy = new Dempsy() dempsy.applicationDefinitions = [app] dempsy.clusterSessionFactory = new ZookeeperSessionFactory('localhost:2181', 5000) // 実行する ClusterDefinition を選定 dempsy.clusterCheck = new SpecificClusterCheck(new ClusterId('money-count', cluster)) // mp を 3ノード構成で処理する設定 dempsy.defaultRoutingStrategy = new DecentralizedRoutingStrategy(5, 3) dempsy.defaultSerializer = new KryoSerializer() dempsy.defaultStatsCollectorFactory = new StatsCollectorFactoryCoda() dempsy.defaultTransport = new TcpTransport() dempsy.start() Runtime.runtime.addShutdownHook { -> println 'shutdown ...' dempsy.stop() } // Adaptor が終了するのを防止(MessageProcessor の場合は不要) dempsy.waitToBeStopped()
MessageProcessor (クラスター mp
) の実行ノード数などは DecentralizedRoutingStrategy で設定します。
DecentralizedRoutingStrategy のコンストラクタは DecentralizedRoutingStrategy(int defaultTotalSlots, int defaultNumNodes)
となっており、第1引数でスロット(シャード)数、第2引数でノード数を指定するようになっています。
キー毎の担当スロットが messageKey.hashCode() % defaultTotalSlots
で算出され、ノード毎にスロットが割り当てられて処理されます。
今回のサンプルのキー構成でスロットが適切にバラけるように defaultTotalSlots を 5
にしました。
スロット数 5 の場合のキー毎の messageKey.hashCode() % 5 結果
キー名 | スロット |
---|---|
1 | 4 |
5 | 3 |
10 | 2 |
50 | 1 |
100 | 0 |
500 | 4 |
1000 | 3 |
2000 | 4 |
5000 | 2 |
10000 | 1 |
分散実行
それでは分散実行してみます。
まずは ZooKeeper を実行しておきます。
ZooKeeper 実行
> groovy zk_run.groovy zoo.cfg
次に Adaptor を実行します。
Adaptor 実行
> groovy money_count_vertx_cluster.groovy adaptor ・・・ MoneyAdaptor.start ... ・・・
そして MessageProcessor を実行します。
DecentralizedRoutingStrategy で指定したのは 3ノードですが、ノードが停止した場合の挙動を確認するため今回は 4ノード分のプロセスを実行しました。
(1) mp 実行 (1つ目)
> groovy money_count_vertx_cluster.groovy mp ・・・ 2013-12-08 19:42:11,187 [myid:] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@738] - Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x142d19fda340012, negotiated timeout = 6000 2013-12-08 19:42:15,664 [myid:] - INFO [main:SimpleThreadPool@270] - Job execution threads will use class loader of thread: main ・・・
(2) mp 実行 (2つ目)
> groovy money_count_vertx_cluster.groovy mp ・・・
(3) mp 実行 (3つ目)
> groovy money_count_vertx_cluster.groovy mp ・・・
(4) mp 実行 (4つ目)
> groovy money_count_vertx_cluster.groovy mp ・・・
結果1 - 全キーへのアクセス
下記のように全キーの URL へアクセスしてカウントアップしてみます。
$ curl http://localhost:8080/1 $ curl http://localhost:8080/10 $ curl http://localhost:8080/100 $ curl http://localhost:8080/1000 $ curl http://localhost:8080/10000 $ curl http://localhost:8080/5000 $ curl http://localhost:8080/500 $ curl http://localhost:8080/50 $ curl http://localhost:8080/5 $ curl http://localhost:8080/2000
(1) ~ (4) の mp の状態と割り当てられたスロットは下記のようになりました。
なお、(4) のノードにはスロットが割り当てられておらずキーも送信されていない事を確認できました。
(1) mp の状態 (スロット 3)
> groovy money_count_vertx_cluster.groovy mp ・・・ MoneyCount.activation : 1000 key: 1000, count: 1 MoneyCount.activation : 5 key: 1000, count: 1 key: 5, count: 1 ・・・
(2) mp の状態 (スロット 0, 1)
> groovy money_count_vertx_cluster.groovy mp ・・・ MoneyCount.activation : 100 MoneyCount.activation : 10000 key: 100, count: 1 key: 10000, count: 1 MoneyCount.activation : 50 key: 100, count: 1 key: 10000, count: 1 key: 50, count: 1 ・・・
(3) mp の状態 (スロット 2, 4)
> groovy money_count_vertx_cluster.groovy mp ・・・ MoneyCount.activation : 1 MoneyCount.activation : 10 key: 1, count: 1 key: 10, count: 1 MoneyCount.activation : 5000 MoneyCount.activation : 500 key: 1, count: 1 key: 500, count: 1 key: 5000, count: 1 key: 10, count: 1 MoneyCount.activation : 2000 key: 1, count: 1 key: 500, count: 1 key: 2000, count: 1 key: 5000, count: 1 key: 10, count: 1 ・・・
(4) mp の状態 (変化なし)
> groovy money_count_vertx_cluster.groovy mp ・・・
結果2 - (1) の mp を停止
(1) の mp を Ctrl + c
でプロセス終了して、下記 URL へアクセスしてみます。
$ curl http://localhost:8080/1000 $ curl http://localhost:8080/5
(1) の担当スロットが、代わりに (4) へ割り当てられてキーが送信されてくるようになります。
(4) mp の状態 ((1) の停止後)
> groovy money_count_vertx_cluster.groovy mp ・・・ MoneyCount.activation : 1000 MoneyCount.activation : 5 key: 1000, count: 1 key: 5, count: 1 ・・・