Groovy で Dempsy を単独実行2 - KeySource

前回に続き、Dempsy を Groovy で単独実行してみます。 今回は KeySource の設定有無でどのように挙動が変わるかを簡単に調べます。

サンプルソースは http://github.com/fits/try_samples/tree/master/blog/20131201/

KeySource 無しの場合

まず、前回サンプルの Adaptor クラスを変更し、http://localhost:8080/<金額> へのアクセスがあった際に Dispatcher へメッセージを dispatch() するように変更しました。 (HTTP サーバー処理は Vert.x を組み込み実行)

また、outputExecuter に RelativeOutputSchedule を設定し 10秒毎にカウント値を出力するように変更しています。

money_count_vertx.groovy
・・・
class Constants {
    final static def MONEYS = [
        '1', '5', '10', '50', '100', '500', '1000', '2000', '5000', '10000'
    ]
}

class MoneyAdaptor implements Adaptor {
    Dispatcher dispatcher

    void start() {
        println 'MoneyAdaptor.start ...'

        def vertx = Vertx.newVertx()

        def rm = new RouteMatcher()
        rm.get '/:money', { req ->
            def money = req.params['money']

            if (Constants.MONEYS.contains(money)) {
                dispatcher.dispatch(new Money(req.params['money']))
            }
            req.response.end()
        }

        vertx.createHttpServer().requestHandler(rm.asClosure()).listen 8080
    }

    void stop() {
        println 'MoneyAdaptor.stop'
    }
}

def mp = new ClusterDefinition('mp', new MoneyCount())
mp.outputExecuter = new com.nokia.dempsy.output.RelativeOutputSchedule(10, java.util.concurrent.TimeUnit.SECONDS)

def app = new ApplicationDefinition('money-count').add(
    new ClusterDefinition('adaptor', new MoneyAdaptor()),
    mp
)
・・・

実行

それでは実行します。

出力1
> groovy money_count_vertx.groovy
・・・
MoneyAdaptor.start ...

MoneyAdaptor が start しただけで、MoneyCount は 1つもアクティブ化されていない状態です。

ここで http://localhost:8080/5 へアクセスすると、キー値 5 を処理する MoneyCount がアクティブ化され、10秒毎に現在のカウント値が出力されるようになります。

出力2
MoneyCount.activation : 5
key: 5, count: 1

この後、http://localhost:8080/5 へアクセスするたびにカウントアップされます。

次に、http://localhost:8080/10000 へアクセスするとキー値 10000 を処理する新しい MoneyCount がアクティブ化されます。

出力3
MoneyCount.activation : 10000
key: 10000, count: 1
key: 5, count: 4

このように、KeySource を設定しなかった場合は新しくキーが発生する度に MessageProcessor の新しいインスタンスがアクティブ化されます。

KeySource 有りの場合

次に、KeySource を設定してみます。 KeySource を設定するには下記のようにします。

  • KeySource インターフェースの実装オブジェクトを MessageProcessor を設定した ClusterDefinitionsetKeySource() で設定

なお、KeySource インターフェースは getAllPossibleKeys() で全てのキーを返すように実装します。

変更点は下記のようになります。

money_count_vertx_ks.groovy
・・・
def mp = new ClusterDefinition('mp', new MoneyCount()).setKeySource({
    Constants.MONEYS
} as KeySource<String>)
・・・

実行

それでは実行します。

出力1
> groovy money_count_vertx_ks.groovy
・・・
MoneyAdaptor.start ...
MoneyCount.activation : 1
MoneyCount.activation : 5
MoneyCount.activation : 10
MoneyCount.activation : 50
MoneyCount.activation : 100
MoneyCount.activation : 500
MoneyCount.activation : 1000
MoneyCount.activation : 2000
MoneyCount.activation : 5000
MoneyCount.activation : 10000

KeySource を設定しなかった場合とは異なり、起動時に全キーの MoneyCount がアクティブ化されます。

http://localhost:8080/5 等へアクセスすると該当キーがカウントアップされます。

出力2
key: 1, count: 0
key: 100, count: 0
key: 10000, count: 1
key: 500, count: 0
key: 1000, count: 0
key: 2000, count: 0
key: 5, count: 4
key: 5000, count: 0
key: 10, count: 0
key: 50, count: 0

まとめ

簡単にまとめると下記のようになります。

KeySource の設定 MessageProcessor のアクティブ化タイミング
新しくキーが発生する度
起動時

Groovy で Dempsy を単独実行1

以前、Groovy で Storm を使う にて Storm を Groovy から使ってみましたが、今回は同様のフレームワークである Dempsy を Groovy で単独実行してみました。

サンプルソースは http://github.com/fits/try_samples/tree/master/blog/20131130/

はじめに

Dempsy アプリケーションは基本的に下記のように実装します。

  • (1) メッセージクラスの作成
  • (2) メッセージ処理クラスの作成
  • (3) アダプタークラスの作成
  • (4) アプリケーション定義の構築
  • (5) Dempsy の構築と実行

Dempsy の基本動作は以下のようになっているようです。

  • メッセージクラスのキー値毎にメッセージ処理クラスがインスタンス化される(clone() でクローンが作られる)
  • アダプタークラスで Dispatcher にメッセージを dispatch() すると該当キーの担当メッセージ処理インスタンスへメッセージが渡される
Adaptor -> Dispatcher -> MessageProcessor

また、KeySource の指定有無によってもメッセージ処理クラスのインスタンス化されるタイミングが異なります。

KeySource の指定 メッセージ処理クラスのインスタンス化タイミング
新しくキーが発生する度にその都度インスタンスが作られる
起動時に処理するキー数に応じたインスタンスが作られる

Dempsy アプリケーションの実装

前回の Spark でも実装したお金の数え上げ処理を今回も実装してみることにします。

ここで、Dempsy はリアルタイムにメッセージを処理するためのフレームワークですので、固定的なファイルの内容を処理するのは本来の用途とは異なる点にご注意ください。

(1) メッセージクラスの作成

まずはメッセージクラスを作成します。 メッセージクラスは以下のように作成します。

  • キー値を返す getter メソッドに @MessageKey を付ける

なお、下記では @TupleConstructor を使っていますが、これはコンストラクタの定義を省略するためのもので Dempsy とは特に関係ありません。

@TupleConstructor
class Money {
    String moneyText

    @MessageKey
    String getMoneyText() {
        moneyText
    }
}

(2) メッセージ処理クラスの作成

メッセージ処理クラスは以下のように作成します。

  • @MessageProcessor を付与
  • Cloneable インターフェースを実装
  • メッセージを処理するメソッドに @MessageHandler を付与

これで必要最小限のメッセージ処理クラスを実装できますが、下記のような追加処理も実装できます。

  • アクティブ化の処理を行うメソッドに @Activation を付与 (引数にキーの値が渡される)
  • パッシブ化の処理を行うメソッドに @Passivation を付与
  • 出力処理を行うメソッドに @Output を付与

下記サンプルでは Cloneable インターフェースを実装する代わりに Groovy の @AutoClone を使用しています。

また、キーとカウント数を出力するために @Activation と @Output を使っています。

@MessageProcessor
@AutoClone
class MoneyCount {
    private long count = 0
    private String key

    // アクティブ化の処理
    @Activation
    void setMoneyKey(String key) {
        println 'MoneyCount.activation'
        this.key = key
    }

    // メッセージの処理
    @MessageHandler
    void countMoney(Money money) {
        // カウントアップ
        count++
    }

    // 結果出力
    @Output
    void printResults() {
        println "key: ${key}, count: ${count}"
    }
}

(3) アダプタークラスの作成

アダプタークラスは以下のように作成します。

  • Adaptor インターフェースを実装
  • setDispatcher() メソッドで設定された Dispatcher オブジェクトへメッセージを dispatch() する

下記では指定されたファイルの行毎にメッセージを作って Dispatcher オブジェクトへ dispatch しています。

class MoneyAdaptor implements Adaptor {
    Dispatcher dispatcher
    private File inputFile

    MoneyAdaptor(File inputFile) {
        this.inputFile = inputFile
    }

    void start() {
        println 'MoneyAdaptor.start ...'

        inputFile.eachLine { line ->
            dispatcher.dispatch(new Money(line))
        }
    }

    void stop() {
        println 'MoneyAdaptor.stop'
    }
}

(4) アプリケーション定義の構築

アプリケーション定義 ApplicationDefinition は以下のようにして構築します。

  • メッセージ処理クラスやアダプタークラスをインスタンス化して ClusterDefinition へ設定
  • ClusterDefinitionApplicationDefinition へ追加
  • @Output の出力処理を行うために ClusterDefinitionOutputExecuter を設定

なお、Dempsy には終了時に 1度だけ結果を出力するような OutputExecuter が用意されていなかったので、下記では自前で実装し設定しています。

def mp = new ClusterDefinition('mp', new MoneyCount())
// 終了時に結果を出力する OutputExecuter を設定
mp.outputExecuter = [
    setOutputInvoker: { invoker ->
        println 'OutputExecuter.setOutputInvoker'
        this.invoker = invoker
    },
    start: { ->
        println 'OutputExecuter.start'
    },
    stop: { ->
        println 'OutputExecuter.stop'
        // 処理結果を出力
        invoker.invokeOutput()
    }
] as OutputExecuter

def app = new ApplicationDefinition('money-count').add(
    new ClusterDefinition('adaptor', new MoneyAdaptor(new File(args[0]))),
    mp
)

(5) Dempsy の構築と実行

最後に Dempsy をインスタンス化し、(4) のアプリケーション定義と下記のような構成を設定すれば start() メソッドで単独実行できます。 (分散実行の場合は設定するクラス構成が多少変わります)

プロパティ名 クラス
clusterSessionFactory LocalClusterSessionFactory
clusterCheck AlwaysInCurrentCluster
defaultRoutingStrategy DecentralizedRoutingStrategy
defaultSerializer KryoSerializer
defaultStatsCollectorFactory StatsCollectorFactoryCoda
defaultTransport BlockingQueueTransport

なお、今回のサンプルでは処理を開始すると自動的には終了しません。

終了時に結果を出力させたいので、ShutdownHook を使って Dempsy の stop() メソッドを呼び出すようにしています。

def dempsy = new Dempsy()

dempsy.applicationDefinitions = [app]
dempsy.clusterSessionFactory = new LocalClusterSessionFactory()
dempsy.clusterCheck = new AlwaysInCurrentCluster()
dempsy.defaultRoutingStrategy = new DecentralizedRoutingStrategy(1, 1)
dempsy.defaultSerializer = new KryoSerializer()
dempsy.defaultStatsCollectorFactory = new StatsCollectorFactoryCoda()
dempsy.defaultTransport = new BlockingQueueTransport()

// 開始
dempsy.start()

Runtime.runtime.addShutdownHook { ->
    println 'shutdown ...'
    // 終了
    dempsy.stop()
}

また、lib-dempsyspring モジュールを使えば Dempsy-localVm.xml ファイルにて Dempsy 単独実行用の構成が定義されているので、自前で clusterSessionFactory 等の設定を行う必要はありません。

実行

それでは実行してみます。

> groovy money_count.groovy input_sample.txt
・・・
OutputExecuter.setOutputInvoker
OutputExecuter.start
MoneyAdaptor.start ...
MoneyCount.activation
MoneyCount.activation
MoneyCount.activation
MoneyCount.activation
MoneyCount.activation
MoneyCount.activation
MoneyCount.activation
MoneyCount.activation
MoneyCount.activation

実行すると上記のログが出力された状態になります。

ここで、input_sample.txt では 9種類のキーを用いており、キー数に応じた MoneyCount.activation が出力されている事が分かります。

とりあえず、自動的には終了しませんので Ctrl + c で終了します。 そうすると、Adaptor と OutputExecuter が終了し、個々の MessageProcessor の結果が出力されます。

shutdown ...
MoneyAdaptor.stop
OutputExecuter.stop
key: 1, count: 2
key: 100, count: 2
key: 10000, count: 2
key: 500, count: 1
key: 1000, count: 3
key: 2000, count: 1
key: 5, count: 3
key: 10, count: 2
key: 50, count: 1
money_count.groovy

ソース全体は下記のようになっています。

@Grab('net.dempsy:lib-dempsyimpl:0.7.9')
@Grab('org.slf4j:slf4j-log4j12:1.7.5')
import com.nokia.dempsy.annotations.Activation
import com.nokia.dempsy.annotations.MessageKey
import com.nokia.dempsy.annotations.MessageProcessor
import com.nokia.dempsy.annotations.MessageHandler
import com.nokia.dempsy.annotations.Output

import com.nokia.dempsy.Adaptor
import com.nokia.dempsy.Dempsy
import com.nokia.dempsy.Dispatcher

import com.nokia.dempsy.config.ApplicationDefinition
import com.nokia.dempsy.config.ClusterDefinition

import com.nokia.dempsy.output.OutputExecuter

import com.nokia.dempsy.cluster.invm.LocalClusterSessionFactory
import com.nokia.dempsy.router.AlwaysInCurrentCluster
import com.nokia.dempsy.router.DecentralizedRoutingStrategy
import com.nokia.dempsy.serialization.kryo.KryoSerializer
import com.nokia.dempsy.monitoring.coda.StatsCollectorFactoryCoda
import com.nokia.dempsy.messagetransport.blockingqueue.BlockingQueueTransport

import groovy.transform.*

// (1) メッセージクラスの作成
@TupleConstructor
class Money {
    String moneyText

    @MessageKey
    String getMoneyText() {
        moneyText
    }
}

// (2) メッセージ処理クラスの作成
@MessageProcessor
@AutoClone
class MoneyCount {
    private long count = 0
    private String key

    @Activation
    void setMoneyKey(String key) {
        println 'MoneyCount.activation'
        this.key = key
    }

    @MessageHandler
    void countMoney(Money money) {
        count++
    }

    @Output
    void printResults() {
        println "key: ${key}, count: ${count}"
    }
}

// (3) アダプタークラスの作成
class MoneyAdaptor implements Adaptor {
    Dispatcher dispatcher
    private File inputFile

    MoneyAdaptor(File inputFile) {
        this.inputFile = inputFile
    }

    void start() {
        println 'MoneyAdaptor.start ...'

        inputFile.eachLine { line ->
            dispatcher.dispatch(new Money(line))
        }
    }

    void stop() {
        println 'MoneyAdaptor.stop'
    }
}

// (4) アプリケーション定義オブジェクトの構築
def mp = new ClusterDefinition('mp', new MoneyCount())
// 終了時に結果を出力する OutputExecuter を設定
mp.outputExecuter = [
    setOutputInvoker: { invoker ->
        println 'OutputExecuter.setOutputInvoker'
        this.invoker = invoker
    },
    start: { ->
        println 'OutputExecuter.start'
    },
    stop: { ->
        println 'OutputExecuter.stop'
        invoker.invokeOutput()
    }
] as OutputExecuter

def app = new ApplicationDefinition('money-count').add(
    new ClusterDefinition('adaptor', new MoneyAdaptor(new File(args[0]))),
    mp
)

// (5) Dempsy オブジェクトの構築と実行
def dempsy = new Dempsy()

dempsy.applicationDefinitions = [app]
dempsy.clusterSessionFactory = new LocalClusterSessionFactory()
dempsy.clusterCheck = new AlwaysInCurrentCluster()
dempsy.defaultRoutingStrategy = new DecentralizedRoutingStrategy(1, 1)
dempsy.defaultSerializer = new KryoSerializer()
dempsy.defaultStatsCollectorFactory = new StatsCollectorFactoryCoda()
dempsy.defaultTransport = new BlockingQueueTransport()

dempsy.start()

Runtime.runtime.addShutdownHook { ->
    println 'shutdown ...'
    dempsy.stop()
}

Scala 2.10 で Apache Spark を使用

Apache Spark をソースからビルドして Scala 2.10 上で実行してみました。

サンプルソースは http://github.com/fits/try_samples/tree/master/blog/20131121/

Spark のソースをビルド

まずは、Spark ソースの scala-2.10 ブランチを git clone します。

ソースの取得
$ git clone -b scala-2.10 git://github.com/apache/incubator-spark.git

次に、sbt ディレクトリにある sbt コマンドで assembly を実行します。

なお、現時点のデフォルトでは Hadoop 1.0.4 用のモジュールが生成されるようですが、SPARK_HADOOP_VERSIONSPARK_YARN 環境変数Hadoop のバージョンを変更できるようです。

ビルド
$ cd incubator-spark
$ sbt/sbt assembly

ビルドが正常に完了すると、必要なモジュールを一通り組み込んだ assembly/target/scala-2.10/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop1.0.4.jar が生成されます。 (examples の方にも JAR が作成されます)

Scala 2.10 で実行

それでは、前回とほぼ同じ内容の下記ソースを実行してみます。

src/main/scala/money_count.scala
package fits.sample

import org.apache.spark._
import SparkContext._

object MoneyCount {
    def main(args: Array[String]) {

        val spark = new SparkContext("local", "MoneyCount")

        val file = spark.textFile(args(0))

        val res = file.map {
            (_, 1)
        }.reduceByKey(_ + _)

        res.foreach { t => println(s"${t._1} = ${t._2}") }
    }
}

実行は今回も Gradle で行います。

Spark のソースをビルドしてできた spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop1.0.4.jar を lib ディレクトリへ配置しました。

build.gradle
apply plugin: 'scala'
apply plugin: 'application'

def scalaVersion = '2.10.3'

repositories {
    mavenCentral()
}

dependencies {
    compile "org.scala-lang:scala-library:${scalaVersion}"
    compile files('lib/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop1.0.4.jar')
}

mainClassName = "fits.sample.MoneyCount"

run {
    if (project.hasProperty('args')) {
        args project.args
    }
}

ディレクトリ構成は以下のようにしました。

ディレクトリ構成
- lib
   |_ spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop1.0.4.jar

- src
   |_ main
       |_ scala
       |   |_ money_count.scala
       |
       |_ resources
           |_ log4j.properties

- build.gradle
- input_sample.txt

なお、ログ出力には slf4j-log4j12 が使われるので、とりあえず log4j の設定ファイルを用意しておきました。

src/main/resources/log4j.properties
log4j.rootLogger=WARN, CONSOLE

log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%5p %m%n

gradle run で実行してみると下記のような結果となり、Scala 2.10 上での動作が確認できました。

実行結果

> gradle run -Pargs=input_sample.txt

:compileJava UP-TO-DATE
:compileScala UP-TO-DATE
:processResources
:classes
:run
 WARN Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
 WARN Snappy native library not loaded
1 = 2
10 = 2
10000 = 2
5 = 3
100 = 2
1000 = 3
50 = 1
500 = 1
2000 = 1

BUILD SUCCESSFUL

Java で Apache Spark を使用

以前、sbt を使って Scala で Hadoop MapReduce 実装Groovy で Storm を使う で実施したお金の数え上げ処理を Spark 0.8 を使って Java で実装してみました。

Spark は以前、Spark を使って単純なレコメンドを実施 で 0.4 を試しましたが、0.8 でも API に大きな変化はないようです。(パッケージ名は org.apache.spark へ変わってますが)

サンプルソースは http://github.com/fits/try_samples/tree/master/blog/20131116/

はじめに

実装する処理内容は、下記のようなファイルを読み込んで数値毎にカウントするだけの単純なものです。

input_sample.txt
100
1
5
50
500
1000
10000
1000
1
10
5
5
10
100
1000
10000
2000

Java で実装

実装した処理内容は下記の通りです。

今回は単一のワーカースレッドでローカル実行させるだけなので JavaSparkContext の第1引数に local と指定しています。

複数のワーカースレッドでローカル実行するには local[スレッド数] のように指定します。(例えば 4スレッドの場合は local[4]

src/main/java/fits/sample/MoneyCount.java
package fits.sample;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;

class MoneyCount {
    public static void main(String... args) {
        JavaSparkContext spark = new JavaSparkContext("local", "MoneyCount");

        JavaRDD<String> file = spark.textFile(args[0]);

        JavaPairRDD<String, Integer> res = file.map(new PairFunction<String, String, Integer>() {
            public Tuple2<String, Integer> call(String s) {
                return new Tuple2<String, Integer>(s, 1);
            }
        }).reduceByKey(new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer t1, Integer t2) {
                return t1 + t2;
            }
        });

        res.foreach(new VoidFunction<Tuple2<String, Integer>>() {
            public void call(Tuple2<String, Integer> t) {
                System.out.println(t._1 + " = " + t._2);
            }
        });
    }
}

gradle でビルド・実行してみます。

Spark 0.8 は古めの Akka ライブラリに依存しているため http://repo.typesafe.com/typesafe/releases/ リポジトリを追加する必要があるようです。

build.gradle
apply plugin: 'java'
apply plugin: 'application'

repositories {
    mavenCentral()

    maven {
        url 'http://repo.typesafe.com/typesafe/releases/'
    }
}

dependencies {
    compile 'org.apache.spark:spark-core_2.9.3:0.8.0-incubating'
    compile 'org.slf4j:slf4j-nop:1.7.5'
}

mainClassName = "fits.sample.MoneyCount"

run {
    if (project.hasProperty('args')) {
        args project.args
    }
}

実行結果は下記のようになります。

実行結果

> gradle run -Pargs=input_sample.txt

:compileJava
:processResources UP-TO-DATE
:classes
:run
13/11/16 02:12:37 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
13/11/16 02:12:37 WARN snappy.LoadSnappy: Snappy native library not loaded
13/11/16 02:12:37 INFO mapred.FileInputFormat: Total input paths to process : 1
1 = 2
10 = 2
10000 = 2
5 = 3
100 = 2
1000 = 3
50 = 1
500 = 1
2000 = 1

BUILD SUCCESSFUL

なお、Java 8 や Groovy で同様の処理を試そうとしてみましたが、実行時に ASM 関係でエラーが発生し、正常に実行させる事はできませんでした。

Scala で実装

同じ処理を Scala で実装すると下記のようにもっとシンプルになります。

src/main/scala/money_count.scala
package fits.sample

import org.apache.spark._
import SparkContext._

object MoneyCount {
    def main(args: Array[String]) {
        val spark = new SparkContext("local", "MoneyCount")

        val file = spark.textFile(args(0))

        val res = file.map {
            (_, 1)
        }.reduceByKey(_ + _)

        res.foreach { t => println(t._1 + " = " + t._2) }
    }
}

Maven セントラルリポジトリには、Scala 2.10 用の Spark モジュールが見当たらないため、基本的に Scala 2.9 で実行する事になります。

ちなみに、Spark(バージョンは 0.9)をソースからビルドすれば Scala 2.10 で実行できました。(git://github.com/apache/incubator-spark.git の scala-2.10 ブランチ使用)

build.gradle
apply plugin: 'scala'
apply plugin: 'application'

def scalaVersion = '2.9.3'

repositories {
    mavenCentral()

    maven {
        url 'http://repo.typesafe.com/typesafe/releases/'
    }
}

dependencies {
    compile "org.scala-lang:scala-library:${scalaVersion}"
    compile "org.apache.spark:spark-core_${scalaVersion}:0.8.0-incubating"
    compile 'org.slf4j:slf4j-nop:1.7.5'
}

mainClassName = "fits.sample.MoneyCount"

run {
    if (project.hasProperty('args')) {
        args project.args
    }
}

実行結果

> gradle run -Pargs=input_sample.txt

:compileJava UP-TO-DATE
:compileScala UP-TO-DATE
:processResources UP-TO-DATE
:classes UP-TO-DATE
:run
13/11/16 01:38:39 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
13/11/16 01:38:39 WARN snappy.LoadSnappy: Snappy native library not loaded
13/11/16 01:38:39 INFO mapred.FileInputFormat: Total input paths to process : 1
1 = 2
10 = 2
10000 = 2
5 = 3
100 = 2
1000 = 3
50 = 1
500 = 1
2000 = 1

BUILD SUCCESSFUL

Groovy で ZooKeeper を組み込み実行

以前、Groovy で Apache ZooKeeper を使う にて ZooKeeper のクライアントを Groovy スクリプトで実装しましたが、今回は ZooKeeper のサーバーを Groovy で組み込み実行してみました。

サンプルソースは http://github.com/fits/try_samples/tree/master/blog/20131109/

ZooKeeper 組み込み実行スクリプト

ZooKeeper を組み込み実行するための単純なスクリプトは下記のようになります。 第一引数に ZooKeeper 設定ファイルのパスを指定して実行します。

zk_run.groovy
@Grapes([
    @Grab("org.apache.zookeeper:zookeeper:3.4.5"),
    @GrabExclude("com.sun.jmx#jmxri"),
    @GrabExclude("com.sun.jdmk#jmxtools"),
    @GrabExclude("javax.jms#jms")
])
import org.apache.zookeeper.server.quorum.QuorumPeerMain

new QuorumPeerMain().initializeAndRun(args)
// 単独実行しかしないのであれば下記でも可
// org.apache.zookeeper.server.ZooKeeperServerMain.main(args)

複製モードで実行しないのであれば、QuorumPeerMain の代わりに ZooKeeperServerMain を直接使う方法もあります。

また、ログ出力を行うには ZooKeeper の conf ディレクトリにある log4j.properties ファイルを上記スクリプトと同じディレクトリへ配置しておきます。

単独モード実行

それでは単独モードで実行してみます。

まずは、必要最小限の設定を記載した ZooKeeper の設定ファイルを用意しておきます。

u0/zoo.cfg (設定ファイル)
dataDir=u0/tmp/zookeeper
clientPort=2181

設定ファイルへのパスを引数にして zk_run.groovy を実行すれば ZooKeeper が起動します。

実行
> groovy zk_run.groovy u0/zoo.cfg

2013-11-09 10:57:13,730 [myid:] - INFO  [main:QuorumPeerConfig@101] - Reading configuration from: u0/zoo.cfg
2013-11-09 10:57:13,746 [myid:] - INFO  [main:DatadirCleanupManager@78] - autopurge.snapRetainCount set to 3
2013-11-09 10:57:13,746 [myid:] - INFO  [main:DatadirCleanupManager@79] - autopurge.purgeInterval set to 0
2013-11-09 10:57:13,746 [myid:] - INFO  [main:DatadirCleanupManager@101] - Purge task is not scheduled.
2013-11-09 10:57:13,746 [myid:] - WARN  [main:QuorumPeerMain@113] - Either no config or no quorum defined in config, running  in standalone mode
・・・

zkCli コマンドを使って接続できます。

> zkCli

Connecting to localhost:2181
・・・

複製モード実行

次に複製モードの実行を試してみます。

複製モード用の設定ファイル

まずは設定ファイルを用意します。 単独実行と比べて initLimit、syncLimit、server 設定の追加が必要です。

server 設定は下記のようなフォーマットとなっており、ポート番号を 2つ指定する必要があります。 (サーバー同士の接続ポートとリーダー選出に使用するポートのようです)

server.<サーバー番号>=<サーバー名>:<ポート番号1>:<ポート番号2>

今回は同一ホスト上で実行するのでポート番号をそれぞれ分けています。

なお、ZooKeeper の複製モードは最低 3サーバー構成で実行するようです。 2サーバー構成で実行すると WARN ログ No server failure will be tolerated. You need at least 3 servers. が出力されます。

u1/zoo.cfg (設定ファイル1)
dataDir=u1/tmp/zookeeper
clientPort=2181

initLimit=10
syncLimit=5

server.1=localhost:2888:3888
server.2=localhost:2889:3889
server.3=localhost:2890:3890
u2/zoo.cfg (設定ファイル2)
dataDir=u2/tmp/zookeeper
clientPort=2182

initLimit=10
syncLimit=5

server.1=localhost:2888:3888
server.2=localhost:2889:3889
server.3=localhost:2890:3890
u3/zoo.cfg (設定ファイル3)
dataDir=u3/tmp/zookeeper
clientPort=2183

initLimit=10
syncLimit=5

server.1=localhost:2888:3888
server.2=localhost:2889:3889
server.3=localhost:2890:3890

myid ファイル

更に、複製モードではサーバー番号を設定した myid ファイルを予め配置しておく必要があります。

myid ファイルは設定ファイルの dataDir で指定したディレクトリへ配置します。

u1/tmp/zookeeper/myid
1
u2/tmp/zookeeper/myid
2
u3/tmp/zookeeper/myid
3

実行

サーバーを順次実行します。

他のサーバーを起動していない状態では ConnectException の WARN ログが出力されますが、全サーバーを起動すると出なくなります。

サーバー1 実行
> groovy zk_run.groovy u1/zoo.cfg

2013-11-09 20:53:09,157 [myid:] - INFO  [main:QuorumPeerConfig@101] - Reading configuration from: u1/zoo.cfg
・・・
2013-11-09 20:53:10,390 [myid:1] - WARN  [WorkerSender[myid=1]:QuorumCnxManager@368] - Cannot open channel to 2 at election address localhost/127.0.0.1:3889
java.net.ConnectException: Connection refused: connect
        at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method)
        ・・・
・・・
2013-11-09 21:02:04,636 [myid:1] - INFO  [WorkerReceiver[myid=1]:FastLeaderElection@542] - Notification: 2 (n.leader), 0x0 (n.zxid), 0x1 (n.round), LOOKING (n.state), 3 (n.sid), 0x3 (n.peerEPoch), FOLLOWING (my state)
サーバー2 実行
> groovy zk_run.groovy u2/zoo.cfg

・・・
2013-11-09 21:02:04,948 [myid:2] - INFO  [LearnerHandler-/127.0.0.1:49289:LearnerHandler@419] - Sending snapshot last zxid of peer is 0x0  zxid of leader is 0x400000000sent zxid of db as 0x400000000
サーバー3 実行
> groovy zk_run.groovy u3/zoo.cfg

・・・
2013-11-09 21:02:04,948 [myid:3] - INFO  [QuorumPeer[myid=3]/0:0:0:0:0:0:0:0:2183:FileTxnSnapLog@240] - Snapshotting: 0x400000000 to u3/tmp/zookeeper/version-2/snapshot.400000000

Gradle でアプリケーションを zip 化する

Gradle を使って Vert.x 組み込み実行による単純な Web アプリケーションを zip 化してみました。

サンプルソースは http://github.com/fits/try_samples/tree/master/blog/20131020/

zip 化のタスク定義

とりあえず、Gradle のビルド定義 build.gradle へ下記のように設定すれば zip 化を実現できます。

  • (1) type: Zip、dependsOn: jar を指定したタスクを定義
  • (2) artifacts.archives へ (1) のタスクを設定

(1) で dependsOn: jar のように依存関係を設定しておかないと、ソースのコンパイル (compileJava タスクなど) 前に zip 化のタスクが実行されてしまうので注意が必要です。

なお、(2) の設定によって assemble や build タスク等の実行時に zip 化のタスクを実行するようになります。 (dependsOn: jar によって jar タスクの後に実施されます)

サンプル

それでは、下記のような構成のサンプルプロジェクトを使って zip 化を試してみます。

サンプルプロジェクトの構成
bin
  |_ sampleapp.bat
  |_ ・・・
conf
  |_ dev
      |_ app.properties
  |_ prod
      |_ app.properties
src
  |_ main
      |_ ・・・
web
  |_ ・・・

build.gradle

作成する zip ファイルは下記のような構成にします。

  • (a) bin 内のファイルを zip のルートへ配置
  • (b) conf/xxx 内のファイルを zip の conf へ配置 (xxx は env プロパティの値)
  • (c) アプリケーションの JAR と依存ライブラリを zip の lib へ配置
  • (d) web ディレクトリを zip の web へ配置
zip ファイルの構成
conf
  |_ app.properties
lib
  |_ gradle_zip_sample.jar (アプリケーションの JAR)
  |_ groovy-all-2.1.8.jar
  |_ ・・・
web
  |_ ・・・

sampleapp.bat
・・・

Gradle のビルド定義ファイル build.gradle は以下のようになります。

ビルド定義 build.gradle
apply plugin: 'groovy'
// プロジェクトプロパティを使った環境指定
def env = hasProperty('env')? env: 'dev'

repositories {
    mavenCentral()
}

dependencies {
    compile 'org.codehaus.groovy:groovy-all:2.1.8'
    compile 'io.vertx:vertx-core:2.0.2-final'
    compile 'io.vertx:lang-groovy:2.0.0-final'
}

defaultTasks 'clean', 'build'

// (1) zip 化タスクの定義
task zipApp(type: Zip, dependsOn: jar) {
    // zip ファイル名の変更
    archiveName = "${baseName}_${env}.${extension}"

    // (a) bin 内のファイルを zip のルートへ配置
    from 'bin'

    // (b) conf/xxx 内のディレクトリ・ファイルを zip の conf へ配置(xxx は env の値)
    into('conf') {
        from "conf/$env"
    }

    // (c)
    into('lib') {
        // アプリケーションの JAR
        from jar.archivePath
        // 各種依存ライブラリ
        from configurations.runtime
    }

    // (d)
    into('web') {
        from 'web'
    }
}

// (2)
artifacts {
    archives zipApp
}

Zip のような CopySpec インターフェースの実装クラスでは、from() でコピー元のディレクトリやファイルを指定し、into() でコピー先 (zip ファイル内) のパスを指定します。

なお、一部のディレクトリやファイルのみ特定のパスへコピーするには、下記のように第2引数へクロージャを渡す into(destPath, configureClosure) メソッドを使います。

into('<コピー先>') {
    from '<コピー元>'
    ・・・
}

ここで、from に runtimeClasspath を指定すると、依存ライブラリと共に JAR ファイル化されていないアプリケーションのクラス・リソースファイルがそのまま含まれてしまう点に注意が必要です。

  • runtimeClasspath アプリケーションのクラス・リソースファイル + 依存ライブラリ
runtimeClasspath の内容
・・・/build/classes/main
・・・/build/resources/main
・・・/.gradle/caches/・・・/groovy-all-2.1.8.jar
・・・

依存ライブラリとアプリケーションの JAR ファイル (今回のサンプルでは gradle_zip_sample.jar) を zip へ格納するには runtimeClasspath の代わりに以下を使います。

  • jar.archivePath アプリケーションの JAR ファイル
  • configurations.runtime 依存ライブラリ
configurations.runtime の内容
・・・/.gradle/caches/・・・/groovy-all-2.1.8.jar
・・・

zip 化の実行

gradle コマンドで build タスクを実行すれば build/distributions ディレクトリへ zip ファイルが生成されます。

> gradle build

なお、今回のサンプルでは gradle build もしくは gradle build -Penv=dev で conf/dev/app.properties を conf/app.properties へ配置した gradle_zip_sample_dev.zip ファイルが、gradle build -Penv=prod で conf/prod/app.properties を使った gradle_zip_sample_prod.zip ファイルが作られます。

CommonsとSpringのBeanUtils.copyProperties

Commons BeanUtils に Bean のプロパティをコピーする BeanUtils.copyProperties() というなかなか便利なメソッドがありますが、同名のメソッドが Spring にもあります。 (当然ながらパッケージ名は異なります)
ちなみに Seasar2 にも BeanUtil.copyProperties() というものがあります。

ここで、Commons のものと Spring のもので引数の順序が逆になっていたりと紛らわしい違いがあるので簡単にまとめてみました。

分類 第一引数 第二引数 例外 MapからBeanへのコピー
Commons コピー先 コピー元 検査例外あり
Spring コピー元 コピー先 実行時例外のみ ×

パッケージ名や throws 定義されている例外は下記の通りです。

  • Commons 版
    • パッケージ: org.apache.commons.beanutils
    • 例外: IllegalAccessException(検査例外), InvocationTargetException(検査例外)
  • Spring 版
    • パッケージ: org.springframework.beans
    • 例外: BeansException(実行時例外)

また、Commons 版では Map から Bean へのプロパティコピーを実施できますが、Spring 版ではできない模様です。

そして、Commons 版では BigDecimal へ null 値をコピーするとデフォルトで下記のような例外が発生します。

BigDecimal へ null 値をコピーした場合の例外 (Commons 版)
org.apache.commons.beanutils.ConversionException: No value specified for 'BigDecimal'

これを防止するには ConvertUtils.register() を使って BigDecimalConverter を登録しておきます。

BigDecimal へ null 値をコピーするための対策 (Commons 版)
ConvertUtils.register(new BigDecimalConverter(null), BigDecimal.class)

動作確認

下記のようなサンプルスクリプトを使って挙動の違いを確認してみました。

copyproperties_sample.groovy
@Grab('commons-beanutils:commons-beanutils:1.8.3')
@Grab('org.springframework:spring-beans:3.2.4.RELEASE')
import org.apache.commons.beanutils.ConvertUtils
import org.apache.commons.beanutils.converters.BigDecimalConverter

import groovy.transform.*

@TupleConstructor
@ToString(includeNames=true)
class Data {
    String name
    BigDecimal value
}

// Commons 版 BeanUtils.copyProperties
def copyPropsCommons = { data ->
    println '----- Commons -----'

    try {
        def trg = new Data()
        org.apache.commons.beanutils.BeanUtils.copyProperties(trg, data)
        println trg
    } catch (e) {
        println e
    }
}
// Commons 版 BeanUtils.copyProperties with converter
def copyPropsCommons2 = { data ->
    println '----- Commons with converter -----'

    def trg = new Data()

    // null を BigDecimal のプロパティへコピーするための設定
    ConvertUtils.register(new BigDecimalConverter(null), BigDecimal)

    org.apache.commons.beanutils.BeanUtils.copyProperties(trg, data)

    // BigDecimalConverter の解除
    ConvertUtils.deregister()

    println trg
}
// Spring 版 BeanUtils.copyProperties
def copyPropsSpring = { data ->
    println '----- Spring -----'

    def trg = new Data()
    org.springframework.beans.BeanUtils.copyProperties(data, trg)
    println trg
}

def procList = [copyPropsCommons, copyPropsCommons2, copyPropsSpring]

def d1 = new Data('sample1', 100)

procList.each { it(d1) }

println ''

def d2 = new Data('sample2')

procList.each { it(d2) }

println ''

def d3 = [
    name: 'sample3',
    value: 10
]

// Commons 版は Map からのコピーが可能
copyPropsCommons(d3)
// Spring 版は Map からのコピーが不可
copyPropsSpring(d3)

実行結果は下記のようになりました。

実行結果
> groovy copyproperties_sample.groovy
----- Commons -----
Data(name:sample1, value:100)
----- Commons with converter -----
Data(name:sample1, value:100)
----- Spring -----
Data(name:sample1, value:100)

----- Commons -----
org.apache.commons.beanutils.ConversionException: No value specified for 'BigDecimal'
----- Commons with converter -----
Data(name:sample2, value:null)
----- Spring -----
Data(name:sample2, value:null)

----- Commons -----
Data(name:sample3, value:10)
----- Spring -----
Data(name:null, value:null)

サンプルソースは http://github.com/fits/try_samples/tree/master/blog/20131013/