Groovy で Apache ZooKeeper を使う - Webコンテンツの分散ダウンロード

id:fits:20110925 や id:fits:20111016 でやったような Web コンテンツのダウンロード処理を Apache ZooKeeper を使って分散処理してみました。

Apache ZooKeeper は分散システム間で協調動作させるためのサーバーソフトウェアで、シンプルで可用性が高くなるように設計されています。


今回は、スタンドアロンモードで立ち上げた単一の ZooKeeper サーバーに URL を設定した znode*1 を登録しておき、複数のプロセスで実行したスクリプトがそれぞれ URL を取得し、Web コンテンツをダウンロードするような処理を行います。

サンプルソースは http://github.com/fits/try_samples/tree/master/blog/20111204/

URL登録処理

まず、ZooKeeper サーバーに URL を設定した znode を登録する Groovy スクリプトを作成します。

ZooKeeper サーバーへの接続は ZooKeeper クラスのコンストラクタ実行時に非同期で開始され、Watcher の process メソッドを通してイベント通知されるようになっています。

そのため、下記では CountDownLatch クラスを使って接続の完了を待つようにしています。(countDown メソッドでカウントが 0 になるまで await メソッドがブロック)

znode の作成には create メソッドを使用し、CreateMode.PERSISTENT で永続的な znode を、PERSISTENT_SEQUENTIAL で末尾に連番が付与された永続的な znode を作成できるようになっています。


例えば、"/download/url-" を PERSISTENT_SEQUENTIAL で作成すると以下のような znode が作成される事になります。

  • /download/url-0000000000
  • /download/url-0000000001
  • /download/url-0000000002

また、znode に設定できるデータ(create メソッドの第2引数で指定、今回は URL)にはサイズ制限があり 1MB までとなっています。

url_regist.groovy
@Grapes([
    @Grab("org.apache.zookeeper:zookeeper:3.4.0"),
    @GrabExclude("com.sun.jmx#jmxri"),
    @GrabExclude("com.sun.jdmk#jmxtools"),
    @GrabExclude("javax.jms#jms")
])
import java.util.concurrent.CountDownLatch
import org.apache.zookeeper.ZooKeeper
import org.apache.zookeeper.Watcher
import static org.apache.zookeeper.Watcher.Event.KeeperState.*
import static org.apache.zookeeper.ZooDefs.Ids.*
import static org.apache.zookeeper.CreateMode.*

def signal = new CountDownLatch(1)

//サーバー接続開始
def zk = new ZooKeeper("localhost", 5000, {event ->
    if (event.state == SyncConnected) {
        //接続完了時に await のブロックを解除
        signal.countDown()
    }
} as Watcher)

//サーバーへの接続待ち
signal.await()

def root = "/download"

//znode 作成
if (zk.exists(root, false) == null) {
    zk.create(root, null, OPEN_ACL_UNSAFE, PERSISTENT)
}

//標準入力から読み取った URL を設定し znode 作成
System.in.readLines() each {
    def path = zk.create("${root}/url-", it.getBytes("UTF-8"), OPEN_ACL_UNSAFE, PERSISTENT_SEQUENTIAL)
    println "created path : ${path}"
}

//サーバー接続を閉じる
zk.close()

ちなみに、@Grab("org.apache.zookeeper:zookeeper:3.4.0") だけだと以下のようなエラーが発生するので @GrabExclude を使って除外する必要があります。

org.codehaus.groovy.control.MultipleCompilationErrorsException: startup failed:General error during conversion: Error grabbing Grapes -- [download failed: javax.jms#jms;1.1!jms.jar, download failed: com.sun.jdmk#jmxtools;1.2.1!jmxtools.jar, download failed: com.sun.jmx#jmxri;1.2.1!jmxri.jar]
java.lang.RuntimeException: Error grabbing Grapes -- [download failed: javax.jms#jms;1.1!jms.jar, download failed: com.sun.jdmk#jmxtools;1.2.1!jmxtools.jar, download failed: com.sun.jmx#jmxri;1.2.1!jmxri.jar]


また、multi メソッドを使って複数の処理を一括実行する事も可能です。(下記では複数の znode 作成を一括実行している)

url_regist_multi.groovy
・・・
//全ての znode 作成を一括実行
def res = zk.multi(System.in.readLines().collect {
    Op.create("${root}/url-", it.getBytes("UTF-8"), OPEN_ACL_UNSAFE, PERSISTENT_SEQUENTIAL)
})
//結果を出力
res.each {
    try {
        println "created path : ${it.path}"
    } catch(e) {
        println "error : ${it.err}"
    }
}
zk.close()

URLダウンロード処理(簡易版)

次に、ZooKeeper サーバーから URL を取得してダウンロード処理を実施する Groovy スクリプトです。


とりあえず、ZooKeeper サーバーとのやりとりを最小限に抑えた簡易版を作成してみました。

getChildren を使って子 znode のリストを取得しシャッフルした後、先頭の znode から URL を取得し znode を削除してダウンロード処理を行うようにします。(シャッフルは同じ znode に処理が集中しないようにするため)

子 znode のリスト取得(getChildren)から znode 削除(delete)までの間に別のプロセスが znode を削除した場合、getData や delete で KeeperException.NoNodeException が throw されます。
今回は NoNodeException を catch すると再度 getChildren からやり直すようにしているため、複数プロセスで同じ URL のダウンロードは基本的に発生しないようになっています。(つまり、znode を delete できたプロセスのみダウンロード処理を行う)

ただし、ダウンロード前に対象の znode を削除してしまうため、ダウンロード中にスクリプトが異常終了すると URL が失われてしまうという問題があります。

download_url.groovy
@Grapes([
    @Grab("org.apache.zookeeper:zookeeper:3.4.0"),
    @GrabExclude("com.sun.jmx#jmxri"),
    @GrabExclude("com.sun.jdmk#jmxtools"),
    @GrabExclude("javax.jms#jms")
])
import java.util.concurrent.CountDownLatch
import org.apache.zookeeper.ZooKeeper
import org.apache.zookeeper.Watcher
import org.apache.zookeeper.KeeperException
import static org.apache.zookeeper.Watcher.Event.KeeperState.*
import static org.apache.zookeeper.ZooDefs.Ids.*
import static org.apache.zookeeper.CreateMode.*

//ループ処理
//closure が指定回数(count)連続で false を返すとループを抜ける
def countDownLoop = {count, closure ->
    def initialCount = count

    while (count > 0) {
        if (closure()) {
            count = initialCount
        }
        else {
            count--
            Thread.sleep(1000)
        }
    }
}

//Webコンテンツダウンロード処理
def downloadUrl = {URL url ->
    try {
        def f = "${args[0]}/${url.file.split('/').last()}"

        url.withInputStream {input ->
            new File(f).bytes = input.bytes
        }

        println "downloaded : ${url} => ${f}"

    } catch (IOException e) {
        println "failed: ${url}, ${e}"
    }
}

def signal = new CountDownLatch(1)

//サーバー接続開始
def zk = new ZooKeeper("localhost", 5000, {event ->
    if (event.state == SyncConnected) {
        signal.countDown()
    }
} as Watcher)

//接続待ち
signal.await()

def root = "/download"

countDownLoop(10) {
    def result = false
    //URL を設定した znode のリスト取得
    def list = zk.getChildren(root, false)

    if (list) {
        //シャッフル(同じ znode に処理が集中するのを防ぐための措置)
        Collections.shuffle(list)

        def path = "${root}/${list.first()}"

        try {
            //URL取得
            def data = zk.getData(path, false, null)
            //ダウンロード対象の znode 削除
            zk.delete(path, -1)

            //ダウンロード処理
            downloadUrl(new URL(new String(data, "UTF-8")))

        } catch (KeeperException.NoNodeException e) {
            //getData や delete 実行時に既に該当 znode が削除されていた場合
            println "no node : ${path}"
        }
        result = true
    }
    result
}
zk.close()

URLダウンロード処理(ロック版)

簡易版での URL が失われる問題の対策として、ロック目的の子 znode を作成する方法が考えられます。

create で CreateMode.EPHEMERAL_SEQUENTIAL を指定するとクライアントプロセスの終了時に自動削除される一時的な znode を連番付きで作成できるので、これをロック目的の znode に利用します。

処理対象の znode にロック目的の子 znode を作成し、子 znode を一番最初に作成したプロセスがダウンロード処理を行い、他は再度 getChildren から処理をやり直す事で簡易版での問題に対応してみました。


ここで、ZooKeeper では子 znode のある親 znode を削除できないため、子 znode を全て削除した後で親 znode を削除する必要がありますが、子 znode を全削除してから親 znode を削除するまでの間にロック用の子 znode が作成されてしまうと困るので、子 znode と親 znode の削除を一括で実行する必要があります。

一括での削除の用途に URL 登録で使った multi を使う事もできますが、今回は ZooKeeper の Transaction を使ってみました。

実際のところ、Transaction といっても Apache ZooKeeper 3.4.0 では multi と同じ処理(multiInternal)を実行しているので multi を使っても変わりありません。(Transaction の内部では MultiTransactionRecord に処理毎の Op を溜めておいて commit の実行時に一括実行している)

download_url_lock.groovy
・・・
def root = "/download"

countDownLoop(10) {
    def result = false
    def list = zk.getChildren(root, false)

    if (list) {
        Collections.shuffle(list)

        def path = "${root}/${list.first()}"

        try {
            //ロック用の znode が存在しない場合
            if (zk.getChildren(path, false).empty) {
                //ロック用の znode を作成する
                def lockPath = zk.create("${path}/lock-", null, OPEN_ACL_UNSAFE, EPHEMERAL_SEQUENTIAL)
                //ロック用の znode リスト取得
                def lockList = zk.getChildren(path, false)

                //自分の作成したロックが一番最初のロックであれば処理を続行
                if (lockPath == "${path}/${lockList.sort().first()}") {

                    def data = zk.getData(path, false, null)

                    downloadUrl(new URL(new String(data, "UTF-8")))

                    //Transaction 取得
                    def tr = zk.transaction()

                    zk.getChildren(path, false).each {
                        tr.delete("${path}/${it}", -1)
                    }
                    //commit で子を含めた znode の削除を実施
                    tr.delete(path, -1).commit()
                }
                else {
                    zk.delete(lockPath, -1)
                }
            }
        } catch (KeeperException.NoNodeException e) {
            println "no node : ${path}"
        }
        result = true
    }
    result
}
zk.close()

実行

それでは ZooKeeper を用いた複数プロセスでの分散ダウンロード処理を実行してみます。

まず、ZooKeeper サーバーを実行します。
ZooKeeper 用の設定ファイル conf/zoo.cfg ファイルを作成し、bin/zkServer(環境に合わせて .cmd か .sh)を実行します。

conf/zoo.cfg 例(スタンドアロンモード用)
tickTime=2000
dataDir=/tmp/zookeeper
clientPort=2181
サーバーの実行例(スタンドアロンモード)
> zkServer

次にダウンロード対象の URL を登録します。(/download を登録するため初回は download_url.groovy よりも先に実行する必要がある)

URL登録の実行例
> groovy url_regist_multi.groovy < url.txt
created path : /download/url-0000000000
created path : /download/url-0000000001
・・・

最後に、ダウンロード処理を実行するスクリプトを別プロセスで複数実行してみると、それぞれのプロセスでダウンロード処理が実施されるはずです。

ダウンロード処理の実行例
> groovy download_url_lock.groovy temp
downloaded : http://localhost/test1.jpg => temp/test1.jpg
・・・

*1:ZooKeeper でのノード