Groovy で Dempsy を単独実行2 - KeySource
前回に続き、Dempsy を Groovy で単独実行してみます。 今回は KeySource の設定有無でどのように挙動が変わるかを簡単に調べます。
サンプルソースは http://github.com/fits/try_samples/tree/master/blog/20131201/
KeySource 無しの場合
まず、前回サンプルの Adaptor クラスを変更し、http://localhost:8080/<金額>
へのアクセスがあった際に Dispatcher
へメッセージを dispatch()
するように変更しました。 (HTTP サーバー処理は Vert.x を組み込み実行)
また、outputExecuter に RelativeOutputSchedule
を設定し 10秒毎にカウント値を出力するように変更しています。
money_count_vertx.groovy
・・・ class Constants { final static def MONEYS = [ '1', '5', '10', '50', '100', '500', '1000', '2000', '5000', '10000' ] } class MoneyAdaptor implements Adaptor { Dispatcher dispatcher void start() { println 'MoneyAdaptor.start ...' def vertx = Vertx.newVertx() def rm = new RouteMatcher() rm.get '/:money', { req -> def money = req.params['money'] if (Constants.MONEYS.contains(money)) { dispatcher.dispatch(new Money(req.params['money'])) } req.response.end() } vertx.createHttpServer().requestHandler(rm.asClosure()).listen 8080 } void stop() { println 'MoneyAdaptor.stop' } } def mp = new ClusterDefinition('mp', new MoneyCount()) mp.outputExecuter = new com.nokia.dempsy.output.RelativeOutputSchedule(10, java.util.concurrent.TimeUnit.SECONDS) def app = new ApplicationDefinition('money-count').add( new ClusterDefinition('adaptor', new MoneyAdaptor()), mp ) ・・・
実行
それでは実行します。
出力1
> groovy money_count_vertx.groovy ・・・ MoneyAdaptor.start ...
MoneyAdaptor が start しただけで、MoneyCount は 1つもアクティブ化されていない状態です。
ここで http://localhost:8080/5
へアクセスすると、キー値 5 を処理する MoneyCount がアクティブ化され、10秒毎に現在のカウント値が出力されるようになります。
出力2
MoneyCount.activation : 5 key: 5, count: 1
この後、http://localhost:8080/5
へアクセスするたびにカウントアップされます。
次に、http://localhost:8080/10000
へアクセスするとキー値 10000 を処理する新しい MoneyCount がアクティブ化されます。
出力3
MoneyCount.activation : 10000 key: 10000, count: 1 key: 5, count: 4
このように、KeySource を設定しなかった場合は新しくキーが発生する度に MessageProcessor の新しいインスタンスがアクティブ化されます。
KeySource 有りの場合
次に、KeySource を設定してみます。 KeySource を設定するには下記のようにします。
KeySource
インターフェースの実装オブジェクトを MessageProcessor を設定したClusterDefinition
へsetKeySource()
で設定
なお、KeySource インターフェースは getAllPossibleKeys()
で全てのキーを返すように実装します。
変更点は下記のようになります。
money_count_vertx_ks.groovy
・・・ def mp = new ClusterDefinition('mp', new MoneyCount()).setKeySource({ Constants.MONEYS } as KeySource<String>) ・・・
実行
それでは実行します。
出力1
> groovy money_count_vertx_ks.groovy ・・・ MoneyAdaptor.start ... MoneyCount.activation : 1 MoneyCount.activation : 5 MoneyCount.activation : 10 MoneyCount.activation : 50 MoneyCount.activation : 100 MoneyCount.activation : 500 MoneyCount.activation : 1000 MoneyCount.activation : 2000 MoneyCount.activation : 5000 MoneyCount.activation : 10000
KeySource を設定しなかった場合とは異なり、起動時に全キーの MoneyCount がアクティブ化されます。
http://localhost:8080/5
等へアクセスすると該当キーがカウントアップされます。
出力2
key: 1, count: 0 key: 100, count: 0 key: 10000, count: 1 key: 500, count: 0 key: 1000, count: 0 key: 2000, count: 0 key: 5, count: 4 key: 5000, count: 0 key: 10, count: 0 key: 50, count: 0
まとめ
簡単にまとめると下記のようになります。
KeySource の設定 | MessageProcessor のアクティブ化タイミング |
---|---|
無 | 新しくキーが発生する度 |
有 | 起動時 |
Groovy で Dempsy を単独実行1
以前、Groovy で Storm を使う にて Storm を Groovy から使ってみましたが、今回は同様のフレームワークである Dempsy を Groovy で単独実行してみました。
サンプルソースは http://github.com/fits/try_samples/tree/master/blog/20131130/
はじめに
Dempsy アプリケーションは基本的に下記のように実装します。
- (1) メッセージクラスの作成
- (2) メッセージ処理クラスの作成
- (3) アダプタークラスの作成
- (4) アプリケーション定義の構築
- (5) Dempsy の構築と実行
Dempsy の基本動作は以下のようになっているようです。
- メッセージクラスのキー値毎にメッセージ処理クラスがインスタンス化される(
clone()
でクローンが作られる) - アダプタークラスで
Dispatcher
にメッセージをdispatch()
すると該当キーの担当メッセージ処理インスタンスへメッセージが渡される
Adaptor -> Dispatcher -> MessageProcessor
また、KeySource の指定有無によってもメッセージ処理クラスのインスタンス化されるタイミングが異なります。
KeySource の指定 | メッセージ処理クラスのインスタンス化タイミング |
---|---|
無 | 新しくキーが発生する度にその都度インスタンスが作られる |
有 | 起動時に処理するキー数に応じたインスタンスが作られる |
Dempsy アプリケーションの実装
前回の Spark でも実装したお金の数え上げ処理を今回も実装してみることにします。
ここで、Dempsy はリアルタイムにメッセージを処理するためのフレームワークですので、固定的なファイルの内容を処理するのは本来の用途とは異なる点にご注意ください。
(1) メッセージクラスの作成
まずはメッセージクラスを作成します。 メッセージクラスは以下のように作成します。
- キー値を返す getter メソッドに
@MessageKey
を付ける
なお、下記では @TupleConstructor
を使っていますが、これはコンストラクタの定義を省略するためのもので Dempsy とは特に関係ありません。
@TupleConstructor class Money { String moneyText @MessageKey String getMoneyText() { moneyText } }
(2) メッセージ処理クラスの作成
メッセージ処理クラスは以下のように作成します。
@MessageProcessor
を付与Cloneable
インターフェースを実装- メッセージを処理するメソッドに
@MessageHandler
を付与
これで必要最小限のメッセージ処理クラスを実装できますが、下記のような追加処理も実装できます。
- アクティブ化の処理を行うメソッドに
@Activation
を付与 (引数にキーの値が渡される) - パッシブ化の処理を行うメソッドに
@Passivation
を付与 - 出力処理を行うメソッドに
@Output
を付与
下記サンプルでは Cloneable インターフェースを実装する代わりに Groovy の @AutoClone
を使用しています。
また、キーとカウント数を出力するために @Activation と @Output を使っています。
@MessageProcessor @AutoClone class MoneyCount { private long count = 0 private String key // アクティブ化の処理 @Activation void setMoneyKey(String key) { println 'MoneyCount.activation' this.key = key } // メッセージの処理 @MessageHandler void countMoney(Money money) { // カウントアップ count++ } // 結果出力 @Output void printResults() { println "key: ${key}, count: ${count}" } }
(3) アダプタークラスの作成
アダプタークラスは以下のように作成します。
Adaptor
インターフェースを実装setDispatcher()
メソッドで設定されたDispatcher
オブジェクトへメッセージをdispatch()
する
下記では指定されたファイルの行毎にメッセージを作って Dispatcher オブジェクトへ dispatch しています。
class MoneyAdaptor implements Adaptor { Dispatcher dispatcher private File inputFile MoneyAdaptor(File inputFile) { this.inputFile = inputFile } void start() { println 'MoneyAdaptor.start ...' inputFile.eachLine { line -> dispatcher.dispatch(new Money(line)) } } void stop() { println 'MoneyAdaptor.stop' } }
(4) アプリケーション定義の構築
アプリケーション定義 ApplicationDefinition
は以下のようにして構築します。
- メッセージ処理クラスやアダプタークラスをインスタンス化して
ClusterDefinition
へ設定 ClusterDefinition
をApplicationDefinition
へ追加@Output
の出力処理を行うためにClusterDefinition
へOutputExecuter
を設定
なお、Dempsy には終了時に 1度だけ結果を出力するような OutputExecuter が用意されていなかったので、下記では自前で実装し設定しています。
def mp = new ClusterDefinition('mp', new MoneyCount()) // 終了時に結果を出力する OutputExecuter を設定 mp.outputExecuter = [ setOutputInvoker: { invoker -> println 'OutputExecuter.setOutputInvoker' this.invoker = invoker }, start: { -> println 'OutputExecuter.start' }, stop: { -> println 'OutputExecuter.stop' // 処理結果を出力 invoker.invokeOutput() } ] as OutputExecuter def app = new ApplicationDefinition('money-count').add( new ClusterDefinition('adaptor', new MoneyAdaptor(new File(args[0]))), mp )
(5) Dempsy の構築と実行
最後に Dempsy をインスタンス化し、(4) のアプリケーション定義と下記のような構成を設定すれば start()
メソッドで単独実行できます。 (分散実行の場合は設定するクラス構成が多少変わります)
プロパティ名 | クラス |
---|---|
clusterSessionFactory | LocalClusterSessionFactory |
clusterCheck | AlwaysInCurrentCluster |
defaultRoutingStrategy | DecentralizedRoutingStrategy |
defaultSerializer | KryoSerializer |
defaultStatsCollectorFactory | StatsCollectorFactoryCoda |
defaultTransport | BlockingQueueTransport |
なお、今回のサンプルでは処理を開始すると自動的には終了しません。
終了時に結果を出力させたいので、ShutdownHook を使って Dempsy の stop()
メソッドを呼び出すようにしています。
def dempsy = new Dempsy() dempsy.applicationDefinitions = [app] dempsy.clusterSessionFactory = new LocalClusterSessionFactory() dempsy.clusterCheck = new AlwaysInCurrentCluster() dempsy.defaultRoutingStrategy = new DecentralizedRoutingStrategy(1, 1) dempsy.defaultSerializer = new KryoSerializer() dempsy.defaultStatsCollectorFactory = new StatsCollectorFactoryCoda() dempsy.defaultTransport = new BlockingQueueTransport() // 開始 dempsy.start() Runtime.runtime.addShutdownHook { -> println 'shutdown ...' // 終了 dempsy.stop() }
また、lib-dempsyspring モジュールを使えば Dempsy-localVm.xml ファイルにて Dempsy 単独実行用の構成が定義されているので、自前で clusterSessionFactory 等の設定を行う必要はありません。
実行
それでは実行してみます。
> groovy money_count.groovy input_sample.txt ・・・ OutputExecuter.setOutputInvoker OutputExecuter.start MoneyAdaptor.start ... MoneyCount.activation MoneyCount.activation MoneyCount.activation MoneyCount.activation MoneyCount.activation MoneyCount.activation MoneyCount.activation MoneyCount.activation MoneyCount.activation
実行すると上記のログが出力された状態になります。
ここで、input_sample.txt では 9種類のキーを用いており、キー数に応じた MoneyCount.activation が出力されている事が分かります。
とりあえず、自動的には終了しませんので Ctrl + c
で終了します。
そうすると、Adaptor と OutputExecuter が終了し、個々の MessageProcessor の結果が出力されます。
shutdown ... MoneyAdaptor.stop OutputExecuter.stop key: 1, count: 2 key: 100, count: 2 key: 10000, count: 2 key: 500, count: 1 key: 1000, count: 3 key: 2000, count: 1 key: 5, count: 3 key: 10, count: 2 key: 50, count: 1
money_count.groovy
ソース全体は下記のようになっています。
@Grab('net.dempsy:lib-dempsyimpl:0.7.9') @Grab('org.slf4j:slf4j-log4j12:1.7.5') import com.nokia.dempsy.annotations.Activation import com.nokia.dempsy.annotations.MessageKey import com.nokia.dempsy.annotations.MessageProcessor import com.nokia.dempsy.annotations.MessageHandler import com.nokia.dempsy.annotations.Output import com.nokia.dempsy.Adaptor import com.nokia.dempsy.Dempsy import com.nokia.dempsy.Dispatcher import com.nokia.dempsy.config.ApplicationDefinition import com.nokia.dempsy.config.ClusterDefinition import com.nokia.dempsy.output.OutputExecuter import com.nokia.dempsy.cluster.invm.LocalClusterSessionFactory import com.nokia.dempsy.router.AlwaysInCurrentCluster import com.nokia.dempsy.router.DecentralizedRoutingStrategy import com.nokia.dempsy.serialization.kryo.KryoSerializer import com.nokia.dempsy.monitoring.coda.StatsCollectorFactoryCoda import com.nokia.dempsy.messagetransport.blockingqueue.BlockingQueueTransport import groovy.transform.* // (1) メッセージクラスの作成 @TupleConstructor class Money { String moneyText @MessageKey String getMoneyText() { moneyText } } // (2) メッセージ処理クラスの作成 @MessageProcessor @AutoClone class MoneyCount { private long count = 0 private String key @Activation void setMoneyKey(String key) { println 'MoneyCount.activation' this.key = key } @MessageHandler void countMoney(Money money) { count++ } @Output void printResults() { println "key: ${key}, count: ${count}" } } // (3) アダプタークラスの作成 class MoneyAdaptor implements Adaptor { Dispatcher dispatcher private File inputFile MoneyAdaptor(File inputFile) { this.inputFile = inputFile } void start() { println 'MoneyAdaptor.start ...' inputFile.eachLine { line -> dispatcher.dispatch(new Money(line)) } } void stop() { println 'MoneyAdaptor.stop' } } // (4) アプリケーション定義オブジェクトの構築 def mp = new ClusterDefinition('mp', new MoneyCount()) // 終了時に結果を出力する OutputExecuter を設定 mp.outputExecuter = [ setOutputInvoker: { invoker -> println 'OutputExecuter.setOutputInvoker' this.invoker = invoker }, start: { -> println 'OutputExecuter.start' }, stop: { -> println 'OutputExecuter.stop' invoker.invokeOutput() } ] as OutputExecuter def app = new ApplicationDefinition('money-count').add( new ClusterDefinition('adaptor', new MoneyAdaptor(new File(args[0]))), mp ) // (5) Dempsy オブジェクトの構築と実行 def dempsy = new Dempsy() dempsy.applicationDefinitions = [app] dempsy.clusterSessionFactory = new LocalClusterSessionFactory() dempsy.clusterCheck = new AlwaysInCurrentCluster() dempsy.defaultRoutingStrategy = new DecentralizedRoutingStrategy(1, 1) dempsy.defaultSerializer = new KryoSerializer() dempsy.defaultStatsCollectorFactory = new StatsCollectorFactoryCoda() dempsy.defaultTransport = new BlockingQueueTransport() dempsy.start() Runtime.runtime.addShutdownHook { -> println 'shutdown ...' dempsy.stop() }
Scala 2.10 で Apache Spark を使用
Apache Spark をソースからビルドして Scala 2.10 上で実行してみました。
サンプルソースは http://github.com/fits/try_samples/tree/master/blog/20131121/
Spark のソースをビルド
まずは、Spark ソースの scala-2.10
ブランチを git clone
します。
ソースの取得
$ git clone -b scala-2.10 git://github.com/apache/incubator-spark.git
次に、sbt ディレクトリにある sbt
コマンドで assembly
を実行します。
なお、現時点のデフォルトでは Hadoop 1.0.4 用のモジュールが生成されるようですが、SPARK_HADOOP_VERSION
と SPARK_YARN
環境変数で Hadoop のバージョンを変更できるようです。
ビルド
$ cd incubator-spark $ sbt/sbt assembly
ビルドが正常に完了すると、必要なモジュールを一通り組み込んだ assembly/target/scala-2.10/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop1.0.4.jar
が生成されます。 (examples の方にも JAR が作成されます)
Scala 2.10 で実行
それでは、前回とほぼ同じ内容の下記ソースを実行してみます。
src/main/scala/money_count.scala
package fits.sample import org.apache.spark._ import SparkContext._ object MoneyCount { def main(args: Array[String]) { val spark = new SparkContext("local", "MoneyCount") val file = spark.textFile(args(0)) val res = file.map { (_, 1) }.reduceByKey(_ + _) res.foreach { t => println(s"${t._1} = ${t._2}") } } }
実行は今回も Gradle で行います。
Spark のソースをビルドしてできた spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop1.0.4.jar
を lib ディレクトリへ配置しました。
build.gradle
apply plugin: 'scala' apply plugin: 'application' def scalaVersion = '2.10.3' repositories { mavenCentral() } dependencies { compile "org.scala-lang:scala-library:${scalaVersion}" compile files('lib/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop1.0.4.jar') } mainClassName = "fits.sample.MoneyCount" run { if (project.hasProperty('args')) { args project.args } }
ディレクトリ構成は以下のようにしました。
ディレクトリ構成
- lib |_ spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop1.0.4.jar - src |_ main |_ scala | |_ money_count.scala | |_ resources |_ log4j.properties - build.gradle - input_sample.txt
なお、ログ出力には slf4j-log4j12 が使われるので、とりあえず log4j の設定ファイルを用意しておきました。
src/main/resources/log4j.properties
log4j.rootLogger=WARN, CONSOLE log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout log4j.appender.CONSOLE.layout.ConversionPattern=%5p %m%n
gradle run
で実行してみると下記のような結果となり、Scala 2.10 上での動作が確認できました。
実行結果
> gradle run -Pargs=input_sample.txt :compileJava UP-TO-DATE :compileScala UP-TO-DATE :processResources :classes :run WARN Unable to load native-hadoop library for your platform... using builtin-java classes where applicable WARN Snappy native library not loaded 1 = 2 10 = 2 10000 = 2 5 = 3 100 = 2 1000 = 3 50 = 1 500 = 1 2000 = 1 BUILD SUCCESSFUL
Java で Apache Spark を使用
以前、sbt を使って Scala で Hadoop MapReduce 実装 や Groovy で Storm を使う で実施したお金の数え上げ処理を Spark 0.8 を使って Java で実装してみました。
Spark は以前、Spark を使って単純なレコメンドを実施 で 0.4 を試しましたが、0.8 でも API に大きな変化はないようです。(パッケージ名は org.apache.spark へ変わってますが)
サンプルソースは http://github.com/fits/try_samples/tree/master/blog/20131116/
はじめに
実装する処理内容は、下記のようなファイルを読み込んで数値毎にカウントするだけの単純なものです。
input_sample.txt
100 1 5 50 500 1000 10000 1000 1 10 5 5 10 100 1000 10000 2000
Java で実装
- Java SE 7.0
実装した処理内容は下記の通りです。
今回は単一のワーカースレッドでローカル実行させるだけなので JavaSparkContext の第1引数に local と指定しています。
複数のワーカースレッドでローカル実行するには local[スレッド数]
のように指定します。(例えば 4スレッドの場合は local[4]
)
src/main/java/fits/sample/MoneyCount.java
package fits.sample; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; class MoneyCount { public static void main(String... args) { JavaSparkContext spark = new JavaSparkContext("local", "MoneyCount"); JavaRDD<String> file = spark.textFile(args[0]); JavaPairRDD<String, Integer> res = file.map(new PairFunction<String, String, Integer>() { public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } }).reduceByKey(new Function2<Integer, Integer, Integer>() { public Integer call(Integer t1, Integer t2) { return t1 + t2; } }); res.foreach(new VoidFunction<Tuple2<String, Integer>>() { public void call(Tuple2<String, Integer> t) { System.out.println(t._1 + " = " + t._2); } }); } }
gradle でビルド・実行してみます。
Spark 0.8 は古めの Akka ライブラリに依存しているため http://repo.typesafe.com/typesafe/releases/ リポジトリを追加する必要があるようです。
build.gradle
apply plugin: 'java' apply plugin: 'application' repositories { mavenCentral() maven { url 'http://repo.typesafe.com/typesafe/releases/' } } dependencies { compile 'org.apache.spark:spark-core_2.9.3:0.8.0-incubating' compile 'org.slf4j:slf4j-nop:1.7.5' } mainClassName = "fits.sample.MoneyCount" run { if (project.hasProperty('args')) { args project.args } }
実行結果は下記のようになります。
実行結果
> gradle run -Pargs=input_sample.txt :compileJava :processResources UP-TO-DATE :classes :run 13/11/16 02:12:37 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 13/11/16 02:12:37 WARN snappy.LoadSnappy: Snappy native library not loaded 13/11/16 02:12:37 INFO mapred.FileInputFormat: Total input paths to process : 1 1 = 2 10 = 2 10000 = 2 5 = 3 100 = 2 1000 = 3 50 = 1 500 = 1 2000 = 1 BUILD SUCCESSFUL
なお、Java 8 や Groovy で同様の処理を試そうとしてみましたが、実行時に ASM 関係でエラーが発生し、正常に実行させる事はできませんでした。
Scala で実装
同じ処理を Scala で実装すると下記のようにもっとシンプルになります。
src/main/scala/money_count.scala
package fits.sample import org.apache.spark._ import SparkContext._ object MoneyCount { def main(args: Array[String]) { val spark = new SparkContext("local", "MoneyCount") val file = spark.textFile(args(0)) val res = file.map { (_, 1) }.reduceByKey(_ + _) res.foreach { t => println(t._1 + " = " + t._2) } } }
Maven セントラルリポジトリには、Scala 2.10 用の Spark モジュールが見当たらないため、基本的に Scala 2.9 で実行する事になります。
ちなみに、Spark(バージョンは 0.9)をソースからビルドすれば Scala 2.10 で実行できました。(git://github.com/apache/incubator-spark.git の scala-2.10 ブランチ使用)
build.gradle
apply plugin: 'scala' apply plugin: 'application' def scalaVersion = '2.9.3' repositories { mavenCentral() maven { url 'http://repo.typesafe.com/typesafe/releases/' } } dependencies { compile "org.scala-lang:scala-library:${scalaVersion}" compile "org.apache.spark:spark-core_${scalaVersion}:0.8.0-incubating" compile 'org.slf4j:slf4j-nop:1.7.5' } mainClassName = "fits.sample.MoneyCount" run { if (project.hasProperty('args')) { args project.args } }
実行結果
> gradle run -Pargs=input_sample.txt :compileJava UP-TO-DATE :compileScala UP-TO-DATE :processResources UP-TO-DATE :classes UP-TO-DATE :run 13/11/16 01:38:39 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 13/11/16 01:38:39 WARN snappy.LoadSnappy: Snappy native library not loaded 13/11/16 01:38:39 INFO mapred.FileInputFormat: Total input paths to process : 1 1 = 2 10 = 2 10000 = 2 5 = 3 100 = 2 1000 = 3 50 = 1 500 = 1 2000 = 1 BUILD SUCCESSFUL
Groovy で ZooKeeper を組み込み実行
以前、Groovy で Apache ZooKeeper を使う にて ZooKeeper のクライアントを Groovy スクリプトで実装しましたが、今回は ZooKeeper のサーバーを Groovy で組み込み実行してみました。
サンプルソースは http://github.com/fits/try_samples/tree/master/blog/20131109/
ZooKeeper 組み込み実行スクリプト
ZooKeeper を組み込み実行するための単純なスクリプトは下記のようになります。 第一引数に ZooKeeper 設定ファイルのパスを指定して実行します。
zk_run.groovy
@Grapes([ @Grab("org.apache.zookeeper:zookeeper:3.4.5"), @GrabExclude("com.sun.jmx#jmxri"), @GrabExclude("com.sun.jdmk#jmxtools"), @GrabExclude("javax.jms#jms") ]) import org.apache.zookeeper.server.quorum.QuorumPeerMain new QuorumPeerMain().initializeAndRun(args) // 単独実行しかしないのであれば下記でも可 // org.apache.zookeeper.server.ZooKeeperServerMain.main(args)
複製モードで実行しないのであれば、QuorumPeerMain の代わりに ZooKeeperServerMain を直接使う方法もあります。
また、ログ出力を行うには ZooKeeper の conf ディレクトリにある log4j.properties
ファイルを上記スクリプトと同じディレクトリへ配置しておきます。
単独モード実行
それでは単独モードで実行してみます。
まずは、必要最小限の設定を記載した ZooKeeper の設定ファイルを用意しておきます。
u0/zoo.cfg (設定ファイル)
dataDir=u0/tmp/zookeeper clientPort=2181
設定ファイルへのパスを引数にして zk_run.groovy を実行すれば ZooKeeper が起動します。
実行
> groovy zk_run.groovy u0/zoo.cfg 2013-11-09 10:57:13,730 [myid:] - INFO [main:QuorumPeerConfig@101] - Reading configuration from: u0/zoo.cfg 2013-11-09 10:57:13,746 [myid:] - INFO [main:DatadirCleanupManager@78] - autopurge.snapRetainCount set to 3 2013-11-09 10:57:13,746 [myid:] - INFO [main:DatadirCleanupManager@79] - autopurge.purgeInterval set to 0 2013-11-09 10:57:13,746 [myid:] - INFO [main:DatadirCleanupManager@101] - Purge task is not scheduled. 2013-11-09 10:57:13,746 [myid:] - WARN [main:QuorumPeerMain@113] - Either no config or no quorum defined in config, running in standalone mode ・・・
zkCli
コマンドを使って接続できます。
> zkCli Connecting to localhost:2181 ・・・
複製モード実行
次に複製モードの実行を試してみます。
複製モード用の設定ファイル
まずは設定ファイルを用意します。 単独実行と比べて initLimit、syncLimit、server 設定の追加が必要です。
server 設定は下記のようなフォーマットとなっており、ポート番号を 2つ指定する必要があります。 (サーバー同士の接続ポートとリーダー選出に使用するポートのようです)
server.<サーバー番号>=<サーバー名>:<ポート番号1>:<ポート番号2>
今回は同一ホスト上で実行するのでポート番号をそれぞれ分けています。
なお、ZooKeeper の複製モードは最低 3サーバー構成で実行するようです。
2サーバー構成で実行すると WARN ログ No server failure will be tolerated. You need at least 3 servers.
が出力されます。
u1/zoo.cfg (設定ファイル1)
dataDir=u1/tmp/zookeeper clientPort=2181 initLimit=10 syncLimit=5 server.1=localhost:2888:3888 server.2=localhost:2889:3889 server.3=localhost:2890:3890
u2/zoo.cfg (設定ファイル2)
dataDir=u2/tmp/zookeeper clientPort=2182 initLimit=10 syncLimit=5 server.1=localhost:2888:3888 server.2=localhost:2889:3889 server.3=localhost:2890:3890
u3/zoo.cfg (設定ファイル3)
dataDir=u3/tmp/zookeeper clientPort=2183 initLimit=10 syncLimit=5 server.1=localhost:2888:3888 server.2=localhost:2889:3889 server.3=localhost:2890:3890
myid ファイル
更に、複製モードではサーバー番号を設定した myid
ファイルを予め配置しておく必要があります。
myid
ファイルは設定ファイルの dataDir
で指定したディレクトリへ配置します。
u1/tmp/zookeeper/myid
1
u2/tmp/zookeeper/myid
2
u3/tmp/zookeeper/myid
3
実行
サーバーを順次実行します。
他のサーバーを起動していない状態では ConnectException の WARN ログが出力されますが、全サーバーを起動すると出なくなります。
サーバー1 実行
> groovy zk_run.groovy u1/zoo.cfg 2013-11-09 20:53:09,157 [myid:] - INFO [main:QuorumPeerConfig@101] - Reading configuration from: u1/zoo.cfg ・・・ 2013-11-09 20:53:10,390 [myid:1] - WARN [WorkerSender[myid=1]:QuorumCnxManager@368] - Cannot open channel to 2 at election address localhost/127.0.0.1:3889 java.net.ConnectException: Connection refused: connect at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method) ・・・ ・・・ 2013-11-09 21:02:04,636 [myid:1] - INFO [WorkerReceiver[myid=1]:FastLeaderElection@542] - Notification: 2 (n.leader), 0x0 (n.zxid), 0x1 (n.round), LOOKING (n.state), 3 (n.sid), 0x3 (n.peerEPoch), FOLLOWING (my state)
サーバー2 実行
> groovy zk_run.groovy u2/zoo.cfg ・・・ 2013-11-09 21:02:04,948 [myid:2] - INFO [LearnerHandler-/127.0.0.1:49289:LearnerHandler@419] - Sending snapshot last zxid of peer is 0x0 zxid of leader is 0x400000000sent zxid of db as 0x400000000
サーバー3 実行
> groovy zk_run.groovy u3/zoo.cfg ・・・ 2013-11-09 21:02:04,948 [myid:3] - INFO [QuorumPeer[myid=3]/0:0:0:0:0:0:0:0:2183:FileTxnSnapLog@240] - Snapshotting: 0x400000000 to u3/tmp/zookeeper/version-2/snapshot.400000000
Gradle でアプリケーションを zip 化する
Gradle を使って Vert.x 組み込み実行による単純な Web アプリケーションを zip 化してみました。
サンプルソースは http://github.com/fits/try_samples/tree/master/blog/20131020/
zip 化のタスク定義
とりあえず、Gradle のビルド定義 build.gradle へ下記のように設定すれば zip 化を実現できます。
- (1) type: Zip、dependsOn: jar を指定したタスクを定義
- (2) artifacts.archives へ (1) のタスクを設定
(1) で dependsOn: jar
のように依存関係を設定しておかないと、ソースのコンパイル (compileJava タスクなど) 前に zip 化のタスクが実行されてしまうので注意が必要です。
なお、(2) の設定によって assemble や build タスク等の実行時に zip 化のタスクを実行するようになります。 (dependsOn: jar
によって jar タスクの後に実施されます)
サンプル
それでは、下記のような構成のサンプルプロジェクトを使って zip 化を試してみます。
サンプルプロジェクトの構成
bin |_ sampleapp.bat |_ ・・・ conf |_ dev |_ app.properties |_ prod |_ app.properties src |_ main |_ ・・・ web |_ ・・・ build.gradle
作成する zip ファイルは下記のような構成にします。
- (a) bin 内のファイルを zip のルートへ配置
- (b) conf/xxx 内のファイルを zip の conf へ配置 (xxx は env プロパティの値)
- (c) アプリケーションの JAR と依存ライブラリを zip の lib へ配置
- (d) web ディレクトリを zip の web へ配置
zip ファイルの構成
conf |_ app.properties lib |_ gradle_zip_sample.jar (アプリケーションの JAR) |_ groovy-all-2.1.8.jar |_ ・・・ web |_ ・・・ sampleapp.bat ・・・
Gradle のビルド定義ファイル build.gradle は以下のようになります。
ビルド定義 build.gradle
apply plugin: 'groovy' // プロジェクトプロパティを使った環境指定 def env = hasProperty('env')? env: 'dev' repositories { mavenCentral() } dependencies { compile 'org.codehaus.groovy:groovy-all:2.1.8' compile 'io.vertx:vertx-core:2.0.2-final' compile 'io.vertx:lang-groovy:2.0.0-final' } defaultTasks 'clean', 'build' // (1) zip 化タスクの定義 task zipApp(type: Zip, dependsOn: jar) { // zip ファイル名の変更 archiveName = "${baseName}_${env}.${extension}" // (a) bin 内のファイルを zip のルートへ配置 from 'bin' // (b) conf/xxx 内のディレクトリ・ファイルを zip の conf へ配置(xxx は env の値) into('conf') { from "conf/$env" } // (c) into('lib') { // アプリケーションの JAR from jar.archivePath // 各種依存ライブラリ from configurations.runtime } // (d) into('web') { from 'web' } } // (2) artifacts { archives zipApp }
Zip のような CopySpec インターフェースの実装クラスでは、from()
でコピー元のディレクトリやファイルを指定し、into()
でコピー先 (zip ファイル内) のパスを指定します。
なお、一部のディレクトリやファイルのみ特定のパスへコピーするには、下記のように第2引数へクロージャを渡す into(destPath, configureClosure)
メソッドを使います。
into('<コピー先>') { from '<コピー元>' ・・・ }
ここで、from に runtimeClasspath を指定すると、依存ライブラリと共に JAR ファイル化されていないアプリケーションのクラス・リソースファイルがそのまま含まれてしまう点に注意が必要です。
runtimeClasspath
アプリケーションのクラス・リソースファイル + 依存ライブラリ
runtimeClasspath の内容
・・・/build/classes/main ・・・/build/resources/main ・・・/.gradle/caches/・・・/groovy-all-2.1.8.jar ・・・
依存ライブラリとアプリケーションの JAR ファイル (今回のサンプルでは gradle_zip_sample.jar) を zip へ格納するには runtimeClasspath の代わりに以下を使います。
jar.archivePath
アプリケーションの JAR ファイルconfigurations.runtime
依存ライブラリ
configurations.runtime の内容
・・・/.gradle/caches/・・・/groovy-all-2.1.8.jar ・・・
zip 化の実行
gradle コマンドで build タスクを実行すれば build/distributions ディレクトリへ zip ファイルが生成されます。
> gradle build
なお、今回のサンプルでは gradle build
もしくは gradle build -Penv=dev
で conf/dev/app.properties を conf/app.properties へ配置した gradle_zip_sample_dev.zip ファイルが、gradle build -Penv=prod
で conf/prod/app.properties を使った gradle_zip_sample_prod.zip ファイルが作られます。
CommonsとSpringのBeanUtils.copyProperties
Commons BeanUtils に Bean のプロパティをコピーする BeanUtils.copyProperties() というなかなか便利なメソッドがありますが、同名のメソッドが Spring にもあります。 (当然ながらパッケージ名は異なります)
ちなみに Seasar2 にも BeanUtil.copyProperties() というものがあります。
ここで、Commons のものと Spring のもので引数の順序が逆になっていたりと紛らわしい違いがあるので簡単にまとめてみました。
分類 | 第一引数 | 第二引数 | 例外 | MapからBeanへのコピー |
---|---|---|---|---|
Commons | コピー先 | コピー元 | 検査例外あり | ○ |
Spring | コピー元 | コピー先 | 実行時例外のみ | × |
パッケージ名や throws 定義されている例外は下記の通りです。
- Commons 版
- パッケージ: org.apache.commons.beanutils
- 例外: IllegalAccessException(検査例外), InvocationTargetException(検査例外)
- Spring 版
- パッケージ: org.springframework.beans
- 例外: BeansException(実行時例外)
また、Commons 版では Map から Bean へのプロパティコピーを実施できますが、Spring 版ではできない模様です。
そして、Commons 版では BigDecimal へ null 値をコピーするとデフォルトで下記のような例外が発生します。
BigDecimal へ null 値をコピーした場合の例外 (Commons 版)
org.apache.commons.beanutils.ConversionException: No value specified for 'BigDecimal'
これを防止するには ConvertUtils.register() を使って BigDecimalConverter を登録しておきます。
BigDecimal へ null 値をコピーするための対策 (Commons 版)
ConvertUtils.register(new BigDecimalConverter(null), BigDecimal.class)
動作確認
下記のようなサンプルスクリプトを使って挙動の違いを確認してみました。
copyproperties_sample.groovy
@Grab('commons-beanutils:commons-beanutils:1.8.3') @Grab('org.springframework:spring-beans:3.2.4.RELEASE') import org.apache.commons.beanutils.ConvertUtils import org.apache.commons.beanutils.converters.BigDecimalConverter import groovy.transform.* @TupleConstructor @ToString(includeNames=true) class Data { String name BigDecimal value } // Commons 版 BeanUtils.copyProperties def copyPropsCommons = { data -> println '----- Commons -----' try { def trg = new Data() org.apache.commons.beanutils.BeanUtils.copyProperties(trg, data) println trg } catch (e) { println e } } // Commons 版 BeanUtils.copyProperties with converter def copyPropsCommons2 = { data -> println '----- Commons with converter -----' def trg = new Data() // null を BigDecimal のプロパティへコピーするための設定 ConvertUtils.register(new BigDecimalConverter(null), BigDecimal) org.apache.commons.beanutils.BeanUtils.copyProperties(trg, data) // BigDecimalConverter の解除 ConvertUtils.deregister() println trg } // Spring 版 BeanUtils.copyProperties def copyPropsSpring = { data -> println '----- Spring -----' def trg = new Data() org.springframework.beans.BeanUtils.copyProperties(data, trg) println trg } def procList = [copyPropsCommons, copyPropsCommons2, copyPropsSpring] def d1 = new Data('sample1', 100) procList.each { it(d1) } println '' def d2 = new Data('sample2') procList.each { it(d2) } println '' def d3 = [ name: 'sample3', value: 10 ] // Commons 版は Map からのコピーが可能 copyPropsCommons(d3) // Spring 版は Map からのコピーが不可 copyPropsSpring(d3)
実行結果は下記のようになりました。
実行結果
> groovy copyproperties_sample.groovy ----- Commons ----- Data(name:sample1, value:100) ----- Commons with converter ----- Data(name:sample1, value:100) ----- Spring ----- Data(name:sample1, value:100) ----- Commons ----- org.apache.commons.beanutils.ConversionException: No value specified for 'BigDecimal' ----- Commons with converter ----- Data(name:sample2, value:null) ----- Spring ----- Data(name:sample2, value:null) ----- Commons ----- Data(name:sample3, value:10) ----- Spring ----- Data(name:null, value:null)
サンプルソースは http://github.com/fits/try_samples/tree/master/blog/20131013/