Groovy で Storm を使う

リアルタイムデータ処理の Storm を Groovy から軽く使ってみました。

Storm は CEP(複合イベント処理)の一種のようですが、Esperid:fits:20081126)などよりも Hadoopid:fits:20101010, id:fits:20101026)に近いような印象です。(S4 の方がより近い模様)

ただ、処理を継続させる事*1を前提にしている点が Hadoop と大きく違っているように思います。

なお、Storm では JavaClojure 以外に RubyPython もデフォルトでサポートされているようです。

サンプルソースは http://github.com/fits/try_samples/tree/master/blog/20111227/

コインの数え上げ1

まずは backtype.storm.testing.TestWordCounter のソースを参考に、以前 Scala + Hadoop で実施した(id:fits:20101010)コインの数え上げサンプルを実装してみました。(ローカルモードで実行)

Storm では Spout と Bolt を組み合わせた Topology を使って処理します。

  • Spout : データソース
  • Bolt : データ処理
  • Topology : Spout と Bolt の組み合わせ

下記サンプルでは、標準入力から 1行ずつデータを取り出す StdInSpout、カウント処理を行う CountBolt、カウントしたデータを出力する PrintBolt を用意し、TopologyBuilder を使って Topology を構築し LocalCluster で実行しています。

PrintBolt は CountBolt で emit としたデータを順次出力しているだけなので、カウントの最終結果だけではなく途中のカウント状況も全て出力する点に注意してください。

サンプルの Spout と Bolt 構成(Topology)
StdInSpout -> CountBolt -> PrintBolt

なお、StdInSpout がデータを emit しなくなっても Topology は終了しないので、5秒待った後にシャットダウンするようにしています。


また、Bolt はシリアライズされて各ワーカーに配置されるので、インスタンスフィールでカウントを管理している CountBolt の場合、別のワーカー上で同じ money がカウントされてしまうと正常にカウントできません。そのため、同じ money は同じワーカーで処理するように fieldsGrouping を使っています。

  • fieldsGrouping : 指定のフィールド値でワーカーへ割り振る
  • shuffleGrouping : ランダムにワーカーへ割り振る

グルーピングの処理は他にも globalGrouping, allGrouping, noneGrouping, directGrouping が用意されています。

money_count.groovy
@Grapes([
    @GrabResolver(name = "clojars.org", root = "http://clojars.org/repo"),
    @Grab("storm:storm:0.6.1-rc")
])
import backtype.storm.Config
import backtype.storm.LocalCluster
import backtype.storm.spout.SpoutOutputCollector
import backtype.storm.task.TopologyContext
import backtype.storm.topology.IRichSpout
import backtype.storm.topology.IBasicBolt
import backtype.storm.topology.OutputFieldsDeclarer
import backtype.storm.topology.BasicOutputCollector
import backtype.storm.topology.TopologyBuilder
import backtype.storm.tuple.Fields
import backtype.storm.tuple.Tuple
import backtype.storm.tuple.Values
import backtype.storm.utils.Utils

//入力データ
class StdInSpout implements IRichSpout {
    def collector
    def dataSet

    boolean isDistributed() {
        false
    }

    void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector
        dataSet = System.in.newReader().iterator()
    }

    void nextTuple() {
        if (dataSet.hasNext()) {
            //標準入力の 1行分をデータとして設定
            collector.emit(new Values(dataSet.next()))
        }
    }

    //データの構成を定義
    void declareOutputFields(OutputFieldsDeclarer decl) {
        //データの構成
        decl.declare(new Fields("money"))
    }

    void close() {}
    void ack(msgId) {}
    void fail(msgId) {}
}
//カウント処理
class CountBolt implements IBasicBolt {
    def counter = [:]

    void execute(Tuple input, BasicOutputCollector collector) {
        def money = input.getValueByField("money")
        def count = (counter.containsKey(money))? counter.get(money): 0

        count++
        counter.put(money, count)

        //カウントを設定
        collector.emit(new Values(money, count))
    }

    //データの構成を定義
    void declareOutputFields(OutputFieldsDeclarer decl) {
        decl.declare(new Fields("money", "count"))
    }

    void prepare(Map conf, TopologyContext context) {}
    void cleanup() {}
}
//出力処理
class PrintBolt implements IBasicBolt {
    void execute(Tuple input, BasicOutputCollector collector) {
        def money = input.getValueByField("money")
        def count = input.getValueByField("count")

        println "${money} = ${count}"
    }

    void prepare(Map conf, TopologyContext context) {}
    void cleanup() {}
    void declareOutputFields(OutputFieldsDeclarer decl) {}
}

def builder = new TopologyBuilder()

//Spout と Bolt の組み合わせを設定(Bolt のワーカーを 4つ使用するよう指定)
builder.setSpout("sp1", new StdInSpout())
//Bolt はシリアライズされて、各ワーカーに渡されるため、
//fieldsGrouping で同一の money を同じワーカーで処理するよう指定する
builder.setBolt("bo1", new CountBolt(), 4).fieldsGrouping("sp1", new Fields("money"))
builder.setBolt("bo2", new PrintBolt(), 4).shuffleGrouping("bo1")

def conf = new Config()
conf.debug = false
//ローカルモード用の Cluster
def cluster = new LocalCluster()
//Topology を設定
cluster.submitTopology("moneycount", conf, builder.createTopology())

//Topology は終了しないため 5秒待った後でシャットダウン開始
Utils.sleep(5000)

//Topology の削除
cluster.killTopology("moneycount")

//ログファイルの削除に失敗して IOException を throw するので
//try-catch している
try {
    //シャットダウン
    cluster.shutdown()
} catch (e) {
    println e
}

実はこのまま実行しても org.codehaus.groovy.control.MultipleCompilationErrorsException: startup failed: General error during conversion: Error grabbing Grapes -- [download failed: org.slf4j#slf4j-api;1.5.8!slf4j-api.jar] というエラーが発生して正常に実行できないので、Storm の ivy-0.6.1-rc.xml を直接編集して slf4j のバージョンを 1.6.4 に変更して回避しました。

.groovy/grapes/storm/storm/ivy-0.6.1-rc.xml の変更例(エラーの回避)
・・・
<dependency org="org.slf4j" name="slf4j-log4j12" rev="1.6.4" ・・・ />
・・・
実行例
> groovy money_count.groovy < input_sample.txt
・・・
100 = 1
2000 = 1
1000 = 1
1000 = 2
1 = 2
500 = 1
10000 = 2
10 = 2
5 = 2
50 = 1
5 = 1
1 = 1
10 = 1
10000 = 1
1000 = 3
5 = 3
100 = 2
・・・

途中のカウントも含めカウント結果が出力され、実行のたびに出力順が変わります。

コインの数え上げ2

次に、カウントの最終結果のみ出力するように少し手を加えてみます。

ローカルモードで実行する場合、RegisteredGlobalState を使って状態が保持できるので今回はこれを使う事にします。(分散実行の場合は DB などへ保存する事になると思います)

実装内容は単純で、RegisteredGlobalState に ConcurrentHashMap を登録しておき、Bolt 内で ConcurrentHashMap にカウント(AtomicInteger)の登録とカウントアップを実施し、処理の最後に ConcurrentHashMap の内容を出力するだけです。

money_count2.groovy
@Grapes([
    @GrabResolver(name = "clojars.org", root = "http://clojars.org/repo"),
    @Grab("storm:storm:0.6.1-rc")
])
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger
・・・
import backtype.storm.utils.RegisteredGlobalState

class StdInSpout implements IRichSpout {
    ・・・
}
//カウント処理(RegisteredGlobalState を使って結果を保持)
class StateCountBolt implements IBasicBolt {
    def trackId

    StateCountBolt(trackId) {
        this.trackId = trackId
    }

    void execute(Tuple input, BasicOutputCollector collector) {
        def money = input.getValueByField("money")

        def stats = RegisteredGlobalState.getState(trackId)

        //未登録の場合にカウント(= 0)を登録
        stats.putIfAbsent(money, new AtomicInteger())

        //カウントアップしてカウントを取得
        int count = stats.get(money).incrementAndGet()
        //下記は特に実行しなくてもよい
        collector.emit(new Values(money, count))
    }

    void declareOutputFields(OutputFieldsDeclarer decl) {
        decl.declare(new Fields("money", "count"))
    }

    void prepare(Map conf, TopologyContext context) {}
    void cleanup() {}
}

//ConcurrentHashMap を登録して割り当てられた ID を取得
def trackId = RegisteredGlobalState.registerState(new ConcurrentHashMap())

def builder = new TopologyBuilder()

builder.setSpout("sp1", new StdInSpout())
//ConcurrentHashMap であれば fieldsGrouping は不要
builder.setBolt("bo1", new StateCountBolt(trackId), 4).shuffleGrouping("sp1")

def conf = new Config()
//ローカルモード
def cluster = new LocalCluster()
cluster.submitTopology("moneycount2", conf, builder.createTopology())

Utils.sleep(5000)

cluster.killTopology("moneycount2")

try {
    cluster.shutdown()
} catch (e) {
    println e
}

//結果出力
RegisteredGlobalState.getState(trackId).each {k, v ->
    println "${k} = ${v}"
}
実行例
> groovy money_count2.groovy < input_sample.txt
・・・
1 = 2
100 = 2
10000 = 2
500 = 1
1000 = 3
2000 = 1
5 = 3
10 = 2
50 = 1

*1:中断させない限り終わらないようになっている