辞書ベースの日本語 Tokenizer - Kuromoji, Sudachi, Fugashi, Kagome, Lindera

辞書をベースに処理する日本語 Tokenizer のいくつかをコードを書いて実行してみました。

  • (a) Lucene Kuromoji
  • (b) atilika Kuromoji
  • (c) Sudachi
  • (d) Kuromoji.js
  • (e) Fugashi
  • (f) Kagome
  • (g) Lindera

今回は以下の文を処理して分割された単語と品詞を出力します。

処理対象文
 WebAssemblyがサーバーレス分野へ大きな影響を与えるだろうと答えた回答者は全体の56%だった。

システム辞書だけを使用し、分割モードを指定する場合は固有名詞などをそのままにする(細かく分割しない)モードを選ぶ事にします。

ソースコードhttps://github.com/fits/try_samples/tree/master/blog/20220106/

(a) Lucene Kuromoji

Lucene に組み込まれた Kuromoji で Elasticsearch や Solr で使われます。

kuromoji.gradle を見ると、システム辞書は以下のどちらかを使うようになっているようです。

a1

lucene-analyzers-kuromoji の JapaneseTokenizer を使います。 辞書は IPADIC のようです。

lucene/a1.groovy
@Grab('org.apache.lucene:lucene-analyzers-kuromoji:8.11.1')
import org.apache.lucene.analysis.ja.JapaneseTokenizer;
import org.apache.lucene.analysis.ja.JapaneseTokenizer.Mode
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute
import org.apache.lucene.analysis.ja.tokenattributes.PartOfSpeechAttribute

def text = args[0]

new JapaneseTokenizer(null, false, Mode.NORMAL).withCloseable { tokenizer ->
    def term = tokenizer.addAttribute(CharTermAttribute)
    def pos = tokenizer.addAttribute(PartOfSpeechAttribute)

    tokenizer.reader = new StringReader(text)
    tokenizer.reset()

    while(tokenizer.incrementToken()) {
        println "term=${term}, partOfSpeech=${pos.partOfSpeech}"
    }
}
a1 結果
> groovy a1.groovy "WebAssemblyがサーバーレス分野へ大きな影響を与えるだろうと答えた回答者は全体の56%だった。"

term=WebAssembly, partOfSpeech=名詞-固有名詞-組織
term=が, partOfSpeech=助詞-格助詞-一般
term=サーバー, partOfSpeech=名詞-一般
term=レス, partOfSpeech=名詞-サ変接続
term=分野, partOfSpeech=名詞-一般
term=へ, partOfSpeech=助詞-格助詞-一般
term=大きな, partOfSpeech=連体詞
term=影響, partOfSpeech=名詞-サ変接続
term=を, partOfSpeech=助詞-格助詞-一般
term=与える, partOfSpeech=動詞-自立
term=だろ, partOfSpeech=助動詞
term=う, partOfSpeech=助動詞
term=と, partOfSpeech=助詞-格助詞-引用
term=答え, partOfSpeech=動詞-自立
term=た, partOfSpeech=助動詞
term=回答, partOfSpeech=名詞-サ変接続
term=者, partOfSpeech=名詞-接尾-一般
term=は, partOfSpeech=助詞-係助詞
term=全体, partOfSpeech=名詞-副詞可能
term=の, partOfSpeech=助詞-連体化
term=5, partOfSpeech=名詞-数
term=6, partOfSpeech=名詞-数
term=%, partOfSpeech=名詞-接尾-助数詞
term=だっ, partOfSpeech=助動詞
term=た, partOfSpeech=助動詞
term=。, partOfSpeech=記号-句点

a2

org.codelibs が上記の ipadic-neologd 版を提供していたので、ついでに試してみました。

処理内容はそのままで、モジュールとパッケージ名を変えるだけです。

lucene/a2.groovy
@GrabResolver('https://maven.codelibs.org/')
@Grab('org.codelibs:lucene-analyzers-kuromoji-ipadic-neologd:8.2.0-20200120')
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute
import org.codelibs.neologd.ipadic.lucene.analysis.ja.JapaneseTokenizer
import org.codelibs.neologd.ipadic.lucene.analysis.ja.JapaneseTokenizer.Mode
import org.codelibs.neologd.ipadic.lucene.analysis.ja.tokenattributes.PartOfSpeechAttribute

def text = args[0]

new JapaneseTokenizer(null, false, Mode.NORMAL).withCloseable { tokenizer ->
    ・・・
}
a2 結果
> groovy a2.groovy "WebAssemblyがサーバーレス分野へ大きな影響を与えるだろうと答えた回答者は全体の56%だった。"

term=WebAssembly, partOfSpeech=名詞-固有名詞-組織
term=が, partOfSpeech=助詞-格助詞-一般
term=サーバーレス, partOfSpeech=名詞-固有名詞-一般
term=分野, partOfSpeech=名詞-一般
term=へ, partOfSpeech=助詞-格助詞-一般
term=大きな, partOfSpeech=連体詞
term=影響, partOfSpeech=名詞-サ変接続
term=を, partOfSpeech=助詞-格助詞-一般
term=与える, partOfSpeech=動詞-自立
term=だろ, partOfSpeech=助動詞
term=う, partOfSpeech=助動詞
term=と, partOfSpeech=助詞-格助詞-引用
term=答え, partOfSpeech=動詞-自立
term=た, partOfSpeech=助動詞
term=回答者, partOfSpeech=名詞-固有名詞-一般
term=は, partOfSpeech=助詞-係助詞
term=全体, partOfSpeech=名詞-副詞可能
term=の, partOfSpeech=助詞-連体化
term=5, partOfSpeech=名詞-数
term=6, partOfSpeech=名詞-数
term=%, partOfSpeech=名詞-接尾-助数詞
term=だっ, partOfSpeech=助動詞
term=た, partOfSpeech=助動詞
term=。, partOfSpeech=記号-句点

a1 と違って "サーバーレス" や "回答者" となりました。

(b) atilika Kuromoji

https://github.com/atilika/kuromoji

Lucene Kuromoji のベースとなった Kuromoji。 更新は途絶えているようですが、色々な辞書に対応しています。

ここでは以下の 2種類の辞書を試してみました。

  • UniDic(2.1.2)
  • JUMAN(7.0-20130310)

b1

まずは、UniDic 版です。

kuromoji/b1.groovy
@Grab('com.atilika.kuromoji:kuromoji-unidic:0.9.0')
import com.atilika.kuromoji.unidic.Tokenizer

def text = args[0]
def tokenizer = new Tokenizer()

tokenizer.tokenize(args[0]).each {
    def pos = [
        it.partOfSpeechLevel1,
        it.partOfSpeechLevel2,
        it.partOfSpeechLevel3,
        it.partOfSpeechLevel4
    ]

    println "term=${it.surface}, partOfSpeech=${pos}"
}
b1 結果
> groovy b1.groovy "WebAssemblyがサーバーレス分野へ大きな影響を与えるだろうと答えた回答者は全体の56%だ った。"

term=WebAssembly, partOfSpeech=[名詞, 普通名詞, 一般, *]
term=が, partOfSpeech=[助詞, 格助詞, *, *]
term=サーバー, partOfSpeech=[名詞, 普通名詞, 一般, *]
term=レス, partOfSpeech=[名詞, 普通名詞, 一般, *]
term=分野, partOfSpeech=[名詞, 普通名詞, 一般, *]
term=へ, partOfSpeech=[助詞, 格助詞, *, *]
term=大きな, partOfSpeech=[連体詞, *, *, *]
term=影響, partOfSpeech=[名詞, 普通名詞, サ変可能, *]
term=を, partOfSpeech=[助詞, 格助詞, *, *]
term=与える, partOfSpeech=[動詞, 一般, *, *]
term=だろう, partOfSpeech=[助動詞, *, *, *]
term=と, partOfSpeech=[助詞, 格助詞, *, *]
term=答え, partOfSpeech=[動詞, 一般, *, *]
term=た, partOfSpeech=[助動詞, *, *, *]
term=回答, partOfSpeech=[名詞, 普通名詞, サ変可能, *]
term=者, partOfSpeech=[接尾辞, 名詞的, 一般, *]
term=は, partOfSpeech=[助詞, 係助詞, *, *]
term=全体, partOfSpeech=[名詞, 普通名詞, 一般, *]
term=の, partOfSpeech=[助詞, 格助詞, *, *]
term=5, partOfSpeech=[名詞, 数詞, *, *]
term=6, partOfSpeech=[名詞, 数詞, *, *]
term=%, partOfSpeech=[名詞, 普通名詞, 助数詞可能, *]
term=だっ, partOfSpeech=[助動詞, *, *, *]
term=た, partOfSpeech=[助動詞, *, *, *]
term=。, partOfSpeech=[補助記号, 句点, *, *]

"だろう" が分割されていないのが特徴。

b2

JUMAN 辞書版です。

kuromoji/b2.groovy
@Grab('com.atilika.kuromoji:kuromoji-jumandic:0.9.0')
import com.atilika.kuromoji.jumandic.Tokenizer

def text = args[0]
def tokenizer = new Tokenizer()

・・・
b2 結果
> groovy b2.groovy "WebAssemblyがサーバーレス分野へ大きな影響を与えるだろうと答えた回答者は全体の56%だ った。"

term=WebAssembly, partOfSpeech=[名詞, 組織名, *, *]
term=が, partOfSpeech=[助詞, 格助詞, *, *]
term=サーバーレス, partOfSpeech=[名詞, 人名, *, *]
term=分野, partOfSpeech=[名詞, 普通名詞, *, *]
term=へ, partOfSpeech=[助詞, 格助詞, *, *]
term=大きな, partOfSpeech=[連体詞, *, *, *]
term=影響, partOfSpeech=[名詞, サ変名詞, *, *]
term=を, partOfSpeech=[助詞, 格助詞, *, *]
term=与える, partOfSpeech=[動詞, *, 母音動詞, 基本形]
term=だろう, partOfSpeech=[助動詞, *, 助動詞だろう型, 基本形]
term=と, partOfSpeech=[助詞, 格助詞, *, *]
term=答えた, partOfSpeech=[動詞, *, 母音動詞, タ形]
term=回答, partOfSpeech=[名詞, サ変名詞, *, *]
term=者, partOfSpeech=[接尾辞, 名詞性名詞接尾辞, *, *]
term=は, partOfSpeech=[助詞, 副助詞, *, *]
term=全体, partOfSpeech=[名詞, 普通名詞, *, *]
term=の, partOfSpeech=[助詞, 接続助詞, *, *]
term=56, partOfSpeech=[名詞, 数詞, *, *]
term=%, partOfSpeech=[接尾辞, 名詞性名詞助数辞, *, *]
term=だった, partOfSpeech=[判定詞, *, 判定詞, ダ列タ形]
term=。, partOfSpeech=[特殊, 句点, *, *]

"サーバーレス"、"だろう"、"答えた"、"56"、"だった" が分割されていないのが特徴。 "サーバーレス" が人名となっているのは不思議。

(c) Sudachi

https://github.com/WorksApplications/Sudachi

Rust 版 もありますが、Java 版を使いました。

辞書は UniDic と NEologd をベースに調整したものらしく、3種類(Small, Core, Full)用意されています。

辞書が継続的にメンテナンスされており最新のものを使えるのが魅力だと思います。

ここではデフォルトの Core 辞書を使いました。(system_core.dic ファイルをカレントディレクトリに配置して実行)

  • Core 辞書(20211220 版)

また、Elasticsearch 用のプラグイン analysis-sudachi も用意されています。

sudachi/c1.groovy
@Grab('com.worksap.nlp:sudachi:0.5.3')
import com.worksap.nlp.sudachi.DictionaryFactory
import com.worksap.nlp.sudachi.Tokenizer

def text = args[0]

new DictionaryFactory().create().withCloseable { dic ->
    def tokenizer = dic.create()
    def ts = tokenizer.tokenize(Tokenizer.SplitMode.C, text)

    ts.each { t ->
        def pos = dic.getPartOfSpeechString(t.partOfSpeechId())

        println "term=${t.surface()}, partOfSpeech=${pos}"
    }
}
c1 結果
> groovy c1.groovy "WebAssemblyがサーバーレス分野へ大きな影響を与えるだろうと答えた回答者は全体の56%だっ た。"

term=WebAssembly, partOfSpeech=[名詞, 普通名詞, 一般, *, *, *]
term=が, partOfSpeech=[助詞, 格助詞, *, *, *, *]
term=サーバーレス, partOfSpeech=[名詞, 普通名詞, 一般, *, *, *]
term=分野, partOfSpeech=[名詞, 普通名詞, 一般, *, *, *]
term=へ, partOfSpeech=[助詞, 格助詞, *, *, *, *]
term=大きな, partOfSpeech=[連体詞, *, *, *, *, *]
term=影響, partOfSpeech=[名詞, 普通名詞, サ変可能, *, *, *]
term=を, partOfSpeech=[助詞, 格助詞, *, *, *, *]
term=与える, partOfSpeech=[動詞, 一般, *, *, 下一段-ア行, 終止形-一般]
term=だろう, partOfSpeech=[助動詞, *, *, *, 助動詞-ダ, 意志推量形]
term=と, partOfSpeech=[助詞, 格助詞, *, *, *, *]
term=答え, partOfSpeech=[動詞, 一般, *, *, 下一段-ア行, 連用形-一般]
term=た, partOfSpeech=[助動詞, *, *, *, 助動詞-タ, 連体形-一般]
term=回答者, partOfSpeech=[名詞, 普通名詞, 一般, *, *, *]
term=は, partOfSpeech=[助詞, 係助詞, *, *, *, *]
term=全体, partOfSpeech=[名詞, 普通名詞, 一般, *, *, *]
term=の, partOfSpeech=[助詞, 格助詞, *, *, *, *]
term=56, partOfSpeech=[名詞, 数詞, *, *, *, *]
term=%, partOfSpeech=[名詞, 普通名詞, 助数詞可能, *, *, *]
term=だっ, partOfSpeech=[助動詞, *, *, *, 助動詞-ダ, 連用形-促音便]
term=た, partOfSpeech=[助動詞, *, *, *, 助動詞-タ, 終止形-一般]
term=。, partOfSpeech=[補助記号, 句点, *, *, *, *]

"サーバーレス"、"だろう"、"回答者"、"56" となっているのが特徴。

(d) Kuromoji.js

https://github.com/takuyaa/kuromoji.js/

Kuromoji の JavaScript 実装。

kuromoji.js/d1.mjs
import kuromoji from 'kuromoji'

const dicPath = 'node_modules/kuromoji/dict'
const text = process.argv[2]

kuromoji.builder({ dicPath }).build((err, tokenizer) => {
    if (err) {
        console.error(err)
        return
    }

    const ts = tokenizer.tokenize(text)

    for (const t of ts) {
        const pos = [t.pos, t.pos_detail_1, t.pos_detail_2, t.pos_detail_3]

        console.log(`term=${t.surface_form}, partOfSpeech=${pos}`)
    }
})
d1 結果
> node d1.mjs "WebAssemblyがサーバーレス分野へ大きな影響を与えるだろうと答えた回答者は全体の56%だった。"

term=WebAssembly, partOfSpeech=名詞,固有名詞,組織,*
term=が, partOfSpeech=助詞,格助詞,一般,*
term=サーバー, partOfSpeech=名詞,一般,*,*
term=レス, partOfSpeech=名詞,サ変接続,*,*
term=分野, partOfSpeech=名詞,一般,*,*
term=へ, partOfSpeech=助詞,格助詞,一般,*
term=大きな, partOfSpeech=連体詞,*,*,*
term=影響, partOfSpeech=名詞,サ変接続,*,*
term=を, partOfSpeech=助詞,格助詞,一般,*
term=与える, partOfSpeech=動詞,自立,*,*
term=だろ, partOfSpeech=助動詞,*,*,*
term=う, partOfSpeech=助動詞,*,*,*
term=と, partOfSpeech=助詞,格助詞,引用,*
term=答え, partOfSpeech=動詞,自立,*,*
term=た, partOfSpeech=助動詞,*,*,*
term=回答, partOfSpeech=名詞,サ変接続,*,*
term=者, partOfSpeech=名詞,接尾,一般,*
term=は, partOfSpeech=助詞,係助詞,*,*
term=全体, partOfSpeech=名詞,副詞可能,*,*
term=の, partOfSpeech=助詞,連体化,*,*
term=5, partOfSpeech=名詞,数,*,*
term=6, partOfSpeech=名詞,数,*,*
term=%, partOfSpeech=名詞,接尾,助数詞,*
term=だっ, partOfSpeech=助動詞,*,*,*
term=た, partOfSpeech=助動詞,*,*,*
term=。, partOfSpeech=記号,句点,*,*

a1 と同じ結果になりました。

(e) Fugashi

https://github.com/polm/fugashi

MeCabPython 用ラッパー。

辞書として unidic-lite と unidic のパッケージが用意されていましたが、 ここでは JUMAN 辞書を使いました。

  • JUMAN
fugashi/e1.py
from fugashi import GenericTagger
import sys

text = sys.argv[1]

tagger = GenericTagger()

for t in tagger(text):
    pos = t.feature[0:4]
    print(f"term={t.surface}, partOfSpeech={pos}")
e1 結果
> python e1.py "WebAssemblyがサーバーレス分野へ大きな影響を与えるだろうと答えた回答者は全体の56%だった。"

term=WebAssembly, partOfSpeech=('名詞', '組織名', '*', '*')
term=が, partOfSpeech=('助詞', '格助詞', '*', '*')
term=サーバーレス, partOfSpeech=('名詞', '人名', '*', '*')
term=分野, partOfSpeech=('名詞', '普通名詞', '*', '*')
term=へ, partOfSpeech=('助詞', '格助詞', '*', '*')
term=大きな, partOfSpeech=('連体詞', '*', '*', '*')
term=影響, partOfSpeech=('名詞', 'サ変名詞', '*', '*')
term=を, partOfSpeech=('助詞', '格助詞', '*', '*')
term=与える, partOfSpeech=('動詞', '*', '母音動詞', '基本形')
term=だろう, partOfSpeech=('助動詞', '*', '助動詞だろう型', '基本形')
term=と, partOfSpeech=('助詞', '格助詞', '*', '*')
term=答えた, partOfSpeech=('動詞', '*', '母音動詞', 'タ形')
term=回答, partOfSpeech=('名詞', 'サ変名詞', '*', '*')
term=者, partOfSpeech=('接尾辞', '名詞性名詞接尾辞', '*', '*')
term=は, partOfSpeech=('助詞', '副助詞', '*', '*')
term=全体, partOfSpeech=('名詞', '普通名詞', '*', '*')
term=の, partOfSpeech=('助詞', '接続助詞', '*', '*')
term=56, partOfSpeech=('名詞', '数詞', '*', '*')
term=%, partOfSpeech=('接尾辞', '名詞性名詞助数辞', '*', '*')
term=だった, partOfSpeech=('判定詞', '*', '判定詞', 'ダ列タ形')
term=。, partOfSpeech=('特殊', '句点', '*', '*')

同じ辞書を使っている b2 と同じ結果になりました。("サーバーレス" が人名なのも同じ)。

(f) Kagome

https://github.com/ikawaha/kagome

ここでは以下の辞書を使用します。

  • IPADIC(mecab-ipadic-2.7.0-20070801)
  • UniDic(2.1.2)

なお、Tokenize を呼び出した場合は、分割モードとして Normal が適用されるようです。

f1

IPADIC 版

kagome/f1.go
package main

import (
    "fmt"
    "os"

    "github.com/ikawaha/kagome-dict/ipa"
    "github.com/ikawaha/kagome/v2/tokenizer"
)

func main() {
    text := os.Args[1]

    t, err := tokenizer.New(ipa.Dict(), tokenizer.OmitBosEos())

    if err != nil {
        panic(err)
    }
    // 分割モード Normal
    ts := t.Tokenize(text)

    for _, t := range ts {
        fmt.Printf("term=%s, partOfSpeech=%v\n", t.Surface, t.POS())
    }
}
f1 結果
> go run f1.go "WebAssemblyがサーバーレス分野へ大きな影響を与えるだろうと答えた回答者は全体の56%だった。"

term=WebAssembly, partOfSpeech=[名詞 固有名詞 組織 *]
term=が, partOfSpeech=[助詞 格助詞 一般 *]
term=サーバー, partOfSpeech=[名詞 一般 * *]
term=レス, partOfSpeech=[名詞 サ変接続 * *]
term=分野, partOfSpeech=[名詞 一般 * *]
term=へ, partOfSpeech=[助詞 格助詞 一般 *]
term=大きな, partOfSpeech=[連体詞 * * *]
term=影響, partOfSpeech=[名詞 サ変接続 * *]
term=を, partOfSpeech=[助詞 格助詞 一般 *]
term=与える, partOfSpeech=[動詞 自立 * *]
term=だろ, partOfSpeech=[助動詞 * * *]
term=う, partOfSpeech=[助動詞 * * *]
term=と, partOfSpeech=[助詞 格助詞 引用 *]
term=答え, partOfSpeech=[動詞 自立 * *]
term=た, partOfSpeech=[助動詞 * * *]
term=回答, partOfSpeech=[名詞 サ変接続 * *]
term=者, partOfSpeech=[名詞 接尾 一般 *]
term=は, partOfSpeech=[助詞 係助詞 * *]
term=全体, partOfSpeech=[名詞 副詞可能 * *]
term=の, partOfSpeech=[助詞 連体化 * *]
term=5, partOfSpeech=[名詞 数 * *]
term=6, partOfSpeech=[名詞 数 * *]
term=%, partOfSpeech=[名詞 接尾 助数詞 *]
term=だっ, partOfSpeech=[助動詞 * * *]
term=た, partOfSpeech=[助動詞 * * *]
term=。, partOfSpeech=[記号 句点 * *]

同じ辞書を使っている a1 と同じ結果になりました。

f2

UniDic 版

kagome/f2.go
package main

import (
    "fmt"
    "os"

    "github.com/ikawaha/kagome-dict/uni"
    "github.com/ikawaha/kagome/v2/tokenizer"
)

func main() {
    text := os.Args[1]

    t, err := tokenizer.New(uni.Dict(), tokenizer.OmitBosEos())

    ・・・
}
f2 結果
> go run f2.go "WebAssemblyがサーバーレス分野へ大きな影響を与えるだろうと答えた回答者は全体の56%だった。"

term=WebAssembly, partOfSpeech=[名詞 普通名詞 一般 *]
term=が, partOfSpeech=[助詞 格助詞 * *]
term=サーバー, partOfSpeech=[名詞 普通名詞 一般 *]
term=レス, partOfSpeech=[名詞 普通名詞 一般 *]
term=分野, partOfSpeech=[名詞 普通名詞 一般 *]
term=へ, partOfSpeech=[助詞 格助詞 * *]
term=大きな, partOfSpeech=[連体詞 * * *]
term=影響, partOfSpeech=[名詞 普通名詞 サ変可能 *]
term=を, partOfSpeech=[助詞 格助詞 * *]
term=与える, partOfSpeech=[動詞 一般 * *]
term=だろう, partOfSpeech=[助動詞 * * *]
term=と, partOfSpeech=[助詞 格助詞 * *]
term=答え, partOfSpeech=[動詞 一般 * *]
term=た, partOfSpeech=[助動詞 * * *]
term=回答, partOfSpeech=[名詞 普通名詞 サ変可能 *]
term=者, partOfSpeech=[接尾辞 名詞的 一般 *]
term=は, partOfSpeech=[助詞 係助詞 * *]
term=全体, partOfSpeech=[名詞 普通名詞 一般 *]
term=の, partOfSpeech=[助詞 格助詞 * *]
term=5, partOfSpeech=[名詞 数詞 * *]
term=6, partOfSpeech=[名詞 数詞 * *]
term=%, partOfSpeech=[名詞 普通名詞 助数詞可能 *]
term=だっ, partOfSpeech=[助動詞 * * *]
term=た, partOfSpeech=[助動詞 * * *]
term=。, partOfSpeech=[補助記号 句点 * *]

同じ辞書を使っている b1 と同じ結果になりました。

(g) Lindera

https://github.com/lindera-morphology/lindera

kuromoji-rs のフォークのようで、辞書は IPADIC です。

  • IPADIC(mecab-ipadic-2.7.0-20070801)
lindera/src/main.rs
use lindera::tokenizer::Tokenizer;
use lindera_core::LinderaResult;

use std::env;

fn main() -> LinderaResult<()> {
    let text = env::args().nth(1).unwrap_or("".to_string());

    let mut tokenizer = Tokenizer::new()?;
    let ts = tokenizer.tokenize(&text)?;

    for t in ts {
        let pos = t.detail.get(0..4).unwrap_or(&t.detail);

        println!("text={}, partOfSpeech={:?}", t.text, pos);
    }

    Ok(())
}
結果
> cargo run "WebAssemblyがサーバーレス分野へ大きな影響を与えるだろうと答えた回答者は全体の56%だった。"

・・・
text=WebAssembly, partOfSpeech=["UNK"]
text=が, partOfSpeech=["助詞", "格助詞", "一般", "*"]
text=サーバー, partOfSpeech=["名詞", "一般", "*", "*"]
text=レス, partOfSpeech=["名詞", "サ変接続", "*", "*"]
text=分野, partOfSpeech=["名詞", "一般", "*", "*"]
text=へ, partOfSpeech=["助詞", "格助詞", "一般", "*"]
text=大きな, partOfSpeech=["連体詞", "*", "*", "*"]
text=影響, partOfSpeech=["名詞", "サ変接続", "*", "*"]
text=を, partOfSpeech=["助詞", "格助詞", "一般", "*"]
text=与える, partOfSpeech=["動詞", "自立", "*", "*"]
text=だろ, partOfSpeech=["助動詞", "*", "*", "*"]
text=う, partOfSpeech=["助動詞", "*", "*", "*"]
text=と, partOfSpeech=["助詞", "格助詞", "引用", "*"]
text=答え, partOfSpeech=["動詞", "自立", "*", "*"]
text=た, partOfSpeech=["助動詞", "*", "*", "*"]
text=回答, partOfSpeech=["名詞", "サ変接続", "*", "*"]
text=者, partOfSpeech=["名詞", "接尾", "一般", "*"]
text=は, partOfSpeech=["助詞", "係助詞", "*", "*"]
text=全体, partOfSpeech=["名詞", "副詞可能", "*", "*"]
text=の, partOfSpeech=["助詞", "連体化", "*", "*"]
text=5, partOfSpeech=["名詞", "数", "*", "*"]
text=6, partOfSpeech=["名詞", "数", "*", "*"]
text=%, partOfSpeech=["名詞", "接尾", "助数詞", "*"]
text=だっ, partOfSpeech=["助動詞", "*", "*", "*"]
text=た, partOfSpeech=["助動詞", "*", "*", "*"]
text=。, partOfSpeech=["記号", "句点", "*", "*"]

a1 と概ね同じ結果となりましたが、"WebAssembly" が名詞になっていないのが特徴。

Metabase における週初めは日曜

Metabase を試していたところ、以下の点が気になりました。

  • 週単位で集計すると週初めが日曜になる(日曜から土曜までの集計)
(画面例)

f:id:fits:20190525210517p:plain

DB 等、一般的なシステムにおける週初めは月曜になる(ISO 8601)はずなので、Metabase が日曜へ変えているのは確実。

そこで、「SQLを見る」をクリックして SQL の内容を確認してみると、やはり日曜へ変える(週初めの月曜 - 1日)ようになっていました。(接続先 DB は PostgreSQL

クエリビルダーで生成された SQL
SELECT (date_trunc('week', CAST((CAST("public"."stock_move"."date" AS timestamp) + INTERVAL '1 day') AS timestamp)) - INTERVAL '1 day') AS "date", sum("public"."stock_move"."product_qty") AS "sum"
FROM "public"."stock_move"
GROUP BY (date_trunc('week', CAST((CAST("public"."stock_move"."date" AS timestamp) + INTERVAL '1 day') AS timestamp)) - INTERVAL '1 day')
ORDER BY (date_trunc('week', CAST((CAST("public"."stock_move"."date" AS timestamp) + INTERVAL '1 day') AS timestamp)) - INTERVAL '1 day') ASC

特に設定も見当たらないので(タイムゾーンや言語を設定しても無駄だった)、該当箇所の ソース を見てみると、日曜へ変える事しか考慮していない事が判明。

src/matabase/driver/postgres.clj
・・・

(defmethod sql.qp/date [:postgres :week]            [_ _ expr] (hx/- (date-trunc :week (hx/+ (hx/->timestamp expr)
                                                                                             one-day))
                                                                     one-day))

・・・

(現時点では)週初めを月曜へ変えるには Metabase のソースを書き換える事になりそうですが、日曜を前提に作られている点が懸念されます。

また、Allow organizations to determine the start of their week #1779 などを見る限り、Metabase 側の対応に期待するのも厳しそうです。

PostgreSQL 検索時の週初めを月曜へ変更

試しに、Java の Instrumentation 機能を利用し、PostgreSQL 検索時に週初めが月曜となるようにしてみます。

Clojure で実装された該当処理(上記 postgres.clj の処理内容)が Java 上でどのように処理されるのか調べたところ、以下のようになっていました。

  • metabase.driver.postgres__init クラスの static initializer 実行時に metabase.driver.sql.query-processor 名前空間に属する date 変数の rawRoot に [:postgres :week] をキーにして addMethod している

つまり、この処理が終わった後で [:postgres :week] の処理を差し替えれば何とかなりそうです。

実装

ソースは http://github.com/fits/try_samples/tree/master/blog/20190525/

postgres__init クラスの初期化後に処理を実施したいので、ClassFileTransformer を使って任意のクラスのロード時に処理を差し込めるようにしました。(クラスのロード時に transform メソッドが呼ばれる)

ここでは org/postgresql/Driver クラスのロード時に処理するようにしましたが、postgres__init の初期化が済んでいればどのタイミングでも問題ないと思います。

差し替え後の処理 PgWeekFuncClojure のコードで (date-trunc :week expr) を実施するように実装しています。

sample/SampleAgent.java
package sample;

import java.lang.instrument.*;
import java.security.*;
import clojure.lang.*;

public class SampleAgent {
    public static void premain(String agentArgs, Instrumentation inst) {
        inst.addTransformer(new PgWeekTransformer());
    }

    static class PgWeekTransformer implements ClassFileTransformer {
        public byte[] transform(ClassLoader loader, String className, Class<?> classBeingRedefined, ProtectionDomain protectionDomain, byte[] classfileBuffer) {
            // org/postgresql/Driver クラスのロード時に実施
            if (className.equals("org/postgresql/Driver")) {
                replacePgWeek();
            }

            return null;
        }

        // [:postgres :week] の処理を置き換えるための処理
        private void replacePgWeek() {
            // metabase.driver.sql.query-processor 名前空間の取得
            Namespace n = Namespace.find(
                Symbol.intern("metabase.driver.sql.query-processor")
            );

            // date 変数の取得
            Var v = n.findInternedVar(Symbol.intern("date"));

            MultiFn root = (MultiFn)v.getRawRoot();

            // キー [:postgres :week] の作成
            IPersistentVector k = Tuple.create(
                RT.keyword(null, "postgres"), 
                RT.keyword(null, "week")
            );

            // [:postgres :week] へ紐づいた処理を差し替え
            root.removeMethod(k);
            root.addMethod(k, new PgWeekFunc());
        }
    }

    // 週の処理関数を定義
    static class PgWeekFunc extends AFunction {
        public static final Var const__1 = RT.var(
            "metabase.driver.postgres", 
            "date-trunc"
        );

        public static final Keyword const__2 = RT.keyword(null, "week");

        public static Object invokeStatic(Object obj1, Object obj2, Object expr) {
            // (date-trunc :week expr) の実施
            return ((IFn)const__1.getRawRoot()).invoke(const__2, expr);
        }

        public Object invoke(Object obj1, Object obj2, Object obj3) {
            return invokeStatic(obj1, obj2, obj3);
        }
    }
}
META-INF/MANIFEST.MF
Manifest-Version: 1.0
Premain-Class: sample.SampleAgent

上記ソースをビルドして JAR ファイル化(例. sample-agent.jar)しておきます。

実行

Metabase の実行時に(上で作成した)sample-agent.jar を -javaagent オプションで適用します。

Metabase 実行(Instrumentation 適用)
> java -javaagent:sample-agent.jar -jar metabase.jar

SQL を確認してみると、INTERVAL '1 day' の減算等が無くなり、処理の差し替えが効いている事を確認できました。

クエリビルダーで生成された SQL(差し替え後)
SELECT date_trunc('week', CAST("public"."stock_move"."date" AS timestamp)) AS "date", sum("public"."stock_move"."product_qty") AS "sum"
FROM "public"."stock_move"
GROUP BY date_trunc('week', CAST("public"."stock_move"."date" AS timestamp))
ORDER BY date_trunc('week', CAST("public"."stock_move"."date" AS timestamp)) ASC

ついでに、サーバーからのレスポンス内容を確認してみると、日付が月曜になり集計結果が変わっている事を確認できました。

レスポンス内容(一部)
"data":{
    "rows":[
        ["2019-04-22T00:00:00.000+09:00",2139.0],
        ["2019-05-06T00:00:00.000+09:00",30.0],
        ["2019-05-13T00:00:00.000+09:00",13.0]
    ],
    "columns":["date","sum"],
    ・・・
}

ただし、Web 画面上は JavaScript が週の範囲を生成している事から、(date: Week の)表示上は日曜から土曜となってしまいます。

(画面例)

f:id:fits:20190525210556p:plain

このように、週初めを月曜へ変えるには以下のような JavaScript の処理に関しても考慮が必要になりそうです。

frontend/src/metabase/lib/formatting.js
・・・

function formatWeek(m: Moment, options: FormattingOptions = {}) {
  // force 'en' locale for now since our weeks currently always start on Sundays
  m = m.locale("en");
  return formatMajorMinor(m.format("wo"), m.format("gggg"), options);
}

・・・

MySQL Binary Log connector でバイナリログをイベント処理

MySQL Binary Log connector (mysql-binlog-connector-java) を使うと、Java プログラムで MySQL / MariaDB のバイナリログをイベント処理できます。

そのため、MySQL の CDC(Change Data Capture)として使えるかもしれません。

ソースは http://github.com/fits/try_samples/tree/master/blog/20171030/

Groovy で実装

MySQL へ接続してバイナリログの内容を取得するには BinaryLogClient を使います。

registerEventListener メソッドで EventListener 実装オブジェクトを登録しておくと、バイナリログの内容をデシリアライズした Event オブジェクトを引数として onEvent メソッドを呼び出してくれます。

BinaryLogClient のソース(listenForEventPackets() メソッドなど)を見てみると、バイナリログを順番にデシリアライズして(特定のイベントタイプをスキップするような処理は無さそう)、登録している EventListener の onEvent を順次呼び出しているだけのようなので、場合によっては処理性能に注意が必要かもしれません。

binlog_sample.groovy
@Grab('com.github.shyiko:mysql-binlog-connector-java:0.13.0')
import com.github.shyiko.mysql.binlog.BinaryLogClient

def host = args[0]
def port = args[1] as int
def user = args[2]
def pass = args[3]

def client = new BinaryLogClient(host, port, user, pass)

client.registerEventListener { ev ->
    // バイナリログの内容を処理
    println ev
}

client.connect()

動作確認

動作確認は Docker で行ってみます。

準備

BinaryLogClient で接続するために MySQL 側でレプリケーション用の設定を行います。

まずは、レプリケーションの設定ファイルを用意します。

/home/vagrant/mysql/conf/repl.cnf (レプリケーションの設定)
[mysqld]
log-bin=mysql-bin
server-id=1

次に、レプリケーション用の接続ユーザーを追加するための SQL ファイルも用意しておきます。

/home/vagrant/mysql/init/repl-user.sqlレプリケーション用のユーザー)
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO repl@'%' IDENTIFIED BY 'pass';

今回はコンテナ間の接続(DB への接続)に Docker のユーザー定義ネットワークを使います。

そのため、まずはブリッジネットワーク(sample1)を作成しておきます。

Docker ユーザー定義ネットワークの作成
$ docker network create --driver bridge sample1

/home/vagrant/groovy/binlog_sample.groovy ファイルを用意した後、sample1 のネットワークへ参加するように Groovy のコンテナを実行します。

Groovy コンテナ実行
$ docker run --rm -it --net=sample1 -v /home/vagrant/groovy:/work groovy bash

・・・
groovy@・・・:~$ cd /work

a. MySQL 5.7 の場合

それでは、MySQL のコンテナを実行して動作確認を行います。

MySQL の Docker 公式イメージでは、/etc/mysql/conf.d 内の設定ファイルを適用し、/docker-entrypoint-initdb.d 内の SQL ファイルを実行するようになっています。

今回はこれを使って、先ほど用意しておいたレプリケーションの設定ファイルとユーザー作成 SQL を適用するように実行します。

a-1. MySQL コンテナ実行
$ docker run --name mysql1 --net=sample1 -e MYSQL_ROOT_PASSWORD=secret -d -v /home/vagrant/mysql/conf:/etc/mysql/conf.d -v /home/vagrant/mysql/init:/docker-entrypoint-initdb.d mysql

事前に実行しておいた Groovy コンテナ上で binlog_sample.groovy を実行します。 ユーザー名とパスワードはレプリケーション用のものを使用します。

a-2. binlog_sample.groovy 実行(Groovy コンテナ)
groovy@・・・:/work$ groovy binlog_sample.groovy mysql1 3306 repl pass

Oct 22, 2017 4:46:02 PM com.github.shyiko.mysql.binlog.BinaryLogClient connect
INFO: Connected to mysql1:3306 at mysql-bin.000003/154 (sid:65535, cid:3)
Event{header=EventHeaderV4{timestamp=0, eventType=ROTATE, serverId=1, headerLength=19, dataLength=28, nextPosition=0, flags=32}, data=RotateEventData{binlogFilename='mysql-bin.000003', binlogPosition=154}}
・・・

この状態で以下の SQL を実行してみます。

CREATE DATABASE db1;
USE db1;

CREATE TABLE tbl1 (id int NOT NULL PRIMARY KEY, name varchar(10) NOT NULL);

INSERT INTO tbl1 VALUES (1, 'a');
UPDATE tbl1 SET name = 'aa' WHERE id = 1;
DELETE FROM tbl1 WHERE id = 1;

CREATE TABLE tbl2 (id int NOT NULL PRIMARY KEY, name varchar(10) NOT NULL);

START TRANSACTION;

INSERT INTO tbl1 VALUES (1, 'a');
INSERT INTO tbl2 VALUES (2, 'b');

COMMIT;

上記 SQL 実行後の出力結果です。

a-3. binlog_sample.groovy 出力結果
・・・
Event{header=EventHeaderV4{timestamp=1508690815000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=219, flags=0}, data=null}
Event{header=EventHeaderV4{timestamp=1508690815000, eventType=QUERY, serverId=1, headerLength=19, dataLength=72, nextPosition=310, flags=8}, data=QueryEventData{threadId=4, executionTime=0, errorCode=0, database='db1', sql='CREATE DATABASE db1'}}
Event{header=EventHeaderV4{timestamp=1508690815000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=375, flags=0}, data=null}
Event{header=EventHeaderV4{timestamp=1508690815000, eventType=QUERY, serverId=1, headerLength=19, dataLength=127, nextPosition=521, flags=0}, data=QueryEventData{threadId=4, executionTime=0, errorCode=0, database='db1', sql='CREATE TABLE tbl1 (id int NOT NULL PRIMARY KEY, name varchar(10) NOT NULL)'}}
Event{header=EventHeaderV4{timestamp=1508690815000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=586, flags=0}, data=null}
Event{header=EventHeaderV4{timestamp=1508690815000, eventType=QUERY, serverId=1, headerLength=19, dataLength=52, nextPosition=657, flags=8}, data=QueryEventData{threadId=4, executionTime=0, errorCode=0, database='db1', sql='BEGIN'}}
Event{header=EventHeaderV4{timestamp=1508690815000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=30, nextPosition=706, flags=0}, data=TableMapEventData{tableId=219, database='db1', table='tbl1', columnTypes=3, 15, columnMetadata=0, 10, columnNullability={}}}
Event{header=EventHeaderV4{timestamp=1508690815000, eventType=EXT_WRITE_ROWS, serverId=1, headerLength=19, dataLength=23, nextPosition=748, flags=0}, data=WriteRowsEventData{tableId=219, includedColumns={0, 1}, rows=[
    [1, a]
]}}
Event{header=EventHeaderV4{timestamp=1508690815000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=779, flags=0}, data=XidEventData{xid=14}}
Event{header=EventHeaderV4{timestamp=1508690815000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=844, flags=0}, data=null}
Event{header=EventHeaderV4{timestamp=1508690815000, eventType=QUERY, serverId=1, headerLength=19, dataLength=52, nextPosition=915, flags=8}, data=QueryEventData{threadId=4, executionTime=0, errorCode=0, database='db1', sql='BEGIN'}}
Event{header=EventHeaderV4{timestamp=1508690815000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=30, nextPosition=964, flags=0}, data=TableMapEventData{tableId=219, database='db1', table='tbl1', columnTypes=3, 15, columnMetadata=0, 10, columnNullability={}}}
Event{header=EventHeaderV4{timestamp=1508690815000, eventType=EXT_UPDATE_ROWS, serverId=1, headerLength=19, dataLength=32, nextPosition=1015, flags=0}, data=UpdateRowsEventData{tableId=219, includedColumnsBeforeUpdate={0, 1}, includedColumns={0, 1}, rows=[
    {before=[1, a], after=[1, aa]}
]}}
Event{header=EventHeaderV4{timestamp=1508690815000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=1046, flags=0}, data=XidEventData{xid=15}}
Event{header=EventHeaderV4{timestamp=1508690815000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=1111, flags=0}, data=null}
Event{header=EventHeaderV4{timestamp=1508690815000, eventType=QUERY, serverId=1, headerLength=19, dataLength=52, nextPosition=1182, flags=8}, data=QueryEventData{threadId=4, executionTime=0, errorCode=0, database='db1', sql='BEGIN'}}
Event{header=EventHeaderV4{timestamp=1508690815000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=30, nextPosition=1231, flags=0}, data=TableMapEventData{tableId=219, database='db1', table='tbl1', columnTypes=3, 15, columnMetadata=0, 10, columnNullability={}}}
Event{header=EventHeaderV4{timestamp=1508690815000, eventType=EXT_DELETE_ROWS, serverId=1, headerLength=19, dataLength=24, nextPosition=1274, flags=0}, data=DeleteRowsEventData{tableId=219, includedColumns={0, 1}, rows=[
    [1, aa]
]}}
Event{header=EventHeaderV4{timestamp=1508690815000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=1305, flags=0}, data=XidEventData{xid=16}}
Event{header=EventHeaderV4{timestamp=1508690815000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=1370, flags=0}, data=null}
Event{header=EventHeaderV4{timestamp=1508690815000, eventType=QUERY, serverId=1, headerLength=19, dataLength=127, nextPosition=1516, flags=0}, data=QueryEventData{threadId=4, executionTime=1, errorCode=0, database='db1', sql='CREATE TABLE tbl2 (id int NOT NULL PRIMARY KEY, name varchar(10) NOT NULL)'}}
Event{header=EventHeaderV4{timestamp=1508690819000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=1581, flags=0}, data=null}
Event{header=EventHeaderV4{timestamp=1508690816000, eventType=QUERY, serverId=1, headerLength=19, dataLength=52, nextPosition=1652, flags=8}, data=QueryEventData{threadId=4, executionTime=0, errorCode=0, database='db1', sql='BEGIN'}}
Event{header=EventHeaderV4{timestamp=1508690816000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=30, nextPosition=1701, flags=0}, data=TableMapEventData{tableId=219, database='db1', table='tbl1', columnTypes=3, 15, columnMetadata=0, 10, columnNullability={}}}
Event{header=EventHeaderV4{timestamp=1508690816000, eventType=EXT_WRITE_ROWS, serverId=1, headerLength=19, dataLength=23, nextPosition=1743, flags=0}, data=WriteRowsEventData{tableId=219, includedColumns={0, 1}, rows=[
    [1, a]
]}}
Event{header=EventHeaderV4{timestamp=1508690816000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=30, nextPosition=1792, flags=0}, data=TableMapEventData{tableId=220, database='db1', table='tbl2', columnTypes=3, 15, columnMetadata=0, 10, columnNullability={}}}
Event{header=EventHeaderV4{timestamp=1508690816000, eventType=EXT_WRITE_ROWS, serverId=1, headerLength=19, dataLength=23, nextPosition=1834, flags=0}, data=WriteRowsEventData{tableId=220, includedColumns={0, 1}, rows=[
    [2, b]
]}}
Event{header=EventHeaderV4{timestamp=1508690819000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=1865, flags=0}, data=XidEventData{xid=19}}

eventType を簡単にまとめると以下のようになりました。

SQL eventType
CREATE QUERY
INSERT EXT_WRITE_ROWS
UPDATE EXT_UPDATE_ROWS
DELETE EXT_DELETE_ROWS

注意点として、EXT_XXX_ROWS の Event 内容にテーブル名は含まれておらず tableId で判断する必要がありそうです。 tableId とテーブル名のマッピングは直前の TABLE_MAP で実施されています。

また、バイナリログのフォーマット(以下)は RBR(行ベースレプリケーション) となっていました。

mysql> show global variables like 'binlog_format';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW   |
+---------------+-------+

b. MariaDB 10.2 の場合

ついでに、MariaDB でも試してみます。

設定は MySQL と同じものが使えるので、ここでは Docker イメージ名を mariadb に変えて実行するだけです。(以下ではコンテナ名も変えています)

b-1. MariaDB コンテナ実行
$ docker run --name mariadb1 --net=sample1 -e MYSQL_ROOT_PASSWORD=secret -d -v /home/vagrant/mysql/conf:/etc/mysql/conf.d -v /home/vagrant/mysql/init:/docker-entrypoint-initdb.d mariadb

接続先を mariadb1 (MariaDB) へ変えてスクリプトを実行します。

b-2. binlog_sample.groovy 実行(Groovy コンテナ)
groovy@・・・:/work$ groovy binlog_sample.groovy mariadb1 3306 repl pass

・・・

MySQL と同じ SQL を実行した後の出力結果です。

b-3. binlog_sample.groovy 出力結果
・・・
Event{header=EventHeaderV4{timestamp=1508691100000, eventType=QUERY, serverId=1, headerLength=19, dataLength=23, nextPosition=384, flags=8}, data=QueryEventData{threadId=0, executionTime=0, errorCode=0, database='', sql='# Dum'}}
Event{header=EventHeaderV4{timestamp=1508691100000, eventType=QUERY, serverId=1, headerLength=19, dataLength=66, nextPosition=469, flags=8}, data=QueryEventData{threadId=10, executionTime=0, errorCode=0, database='db1', sql='CREATE DATABASE db1'}}
Event{header=EventHeaderV4{timestamp=1508691100000, eventType=QUERY, serverId=1, headerLength=19, dataLength=23, nextPosition=511, flags=8}, data=QueryEventData{threadId=0, executionTime=0, errorCode=0, database='', sql='# Dum'}}
Event{header=EventHeaderV4{timestamp=1508691100000, eventType=QUERY, serverId=1, headerLength=19, dataLength=121, nextPosition=651, flags=0}, data=QueryEventData{threadId=10, executionTime=0, errorCode=0, database='db1', sql='CREATE TABLE tbl1 (id int NOT NULL PRIMARY KEY, name varchar(10) NOT NULL)'}}
Event{header=EventHeaderV4{timestamp=1508691100000, eventType=QUERY, serverId=1, headerLength=19, dataLength=23, nextPosition=693, flags=8}, data=QueryEventData{threadId=0, executionTime=0, errorCode=0, database='', sql='BEGIN'}}
Event{header=EventHeaderV4{timestamp=1508691100000, eventType=QUERY, serverId=1, headerLength=19, dataLength=79, nextPosition=791, flags=0}, data=QueryEventData{threadId=10, executionTime=0, errorCode=0, database='db1', sql='INSERT INTO tbl1 VALUES (1, 'a')'}}
Event{header=EventHeaderV4{timestamp=1508691100000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=822, flags=0}, data=XidEventData{xid=12}}
Event{header=EventHeaderV4{timestamp=1508691100000, eventType=QUERY, serverId=1, headerLength=19, dataLength=23, nextPosition=864, flags=8}, data=QueryEventData{threadId=0, executionTime=0, errorCode=0, database='', sql='BEGIN'}}
Event{header=EventHeaderV4{timestamp=1508691100000, eventType=QUERY, serverId=1, headerLength=19, dataLength=87, nextPosition=970, flags=0}, data=QueryEventData{threadId=10, executionTime=0, errorCode=0, database='db1', sql='UPDATE tbl1 SET name = 'aa' WHERE id = 1'}}
Event{header=EventHeaderV4{timestamp=1508691100000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=1001, flags=0}, data=XidEventData{xid=13}}
Event{header=EventHeaderV4{timestamp=1508691100000, eventType=QUERY, serverId=1, headerLength=19, dataLength=23, nextPosition=1043, flags=8}, data=QueryEventData{threadId=0, executionTime=0, errorCode=0, database='', sql='BEGIN'}}
Event{header=EventHeaderV4{timestamp=1508691100000, eventType=QUERY, serverId=1, headerLength=19, dataLength=76, nextPosition=1138, flags=0}, data=QueryEventData{threadId=10, executionTime=0, errorCode=0, database='db1', sql='DELETE FROM tbl1 WHERE id = 1'}}
Event{header=EventHeaderV4{timestamp=1508691100000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=1169, flags=0}, data=XidEventData{xid=14}}
Event{header=EventHeaderV4{timestamp=1508691100000, eventType=QUERY, serverId=1, headerLength=19, dataLength=23, nextPosition=1211, flags=8}, data=QueryEventData{threadId=0, executionTime=0, errorCode=0, database='', sql='# Dum'}}
Event{header=EventHeaderV4{timestamp=1508691100000, eventType=QUERY, serverId=1, headerLength=19, dataLength=121, nextPosition=1351, flags=0}, data=QueryEventData{threadId=10, executionTime=1, errorCode=0, database='db1', sql='CREATE TABLE tbl2 (id int NOT NULL PRIMARY KEY, name varchar(10) NOT NULL)'}}
Event{header=EventHeaderV4{timestamp=1508691101000, eventType=QUERY, serverId=1, headerLength=19, dataLength=23, nextPosition=1393, flags=8}, data=QueryEventData{threadId=0, executionTime=0, errorCode=0, database='', sql='BEGIN'}}
Event{header=EventHeaderV4{timestamp=1508691101000, eventType=QUERY, serverId=1, headerLength=19, dataLength=79, nextPosition=1491, flags=0}, data=QueryEventData{threadId=10, executionTime=0, errorCode=0, database='db1', sql='INSERT INTO tbl1 VALUES (1, 'a')'}}
Event{header=EventHeaderV4{timestamp=1508691101000, eventType=QUERY, serverId=1, headerLength=19, dataLength=79, nextPosition=1589, flags=0}, data=QueryEventData{threadId=10, executionTime=0, errorCode=0, database='db1', sql='INSERT INTO tbl2 VALUES (2, 'b')'}}
Event{header=EventHeaderV4{timestamp=1508691101000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=1620, flags=0}, data=XidEventData{xid=17}}

eventType を簡単にまとめると以下のようになりました。

SQL eventType
CREATE QUERY
INSERT QUERY
UPDATE QUERY
DELETE QUERY

バイナリログのフォーマットは MBRミックスベースレプリケーション)となっていました。

MariaDB [(none)]> show global variables like 'binlog_format';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | MIXED |
+---------------+-------+

Akka でステートマシンを処理

前回 の有限ステートマシン(FSM)の処理を Akka の JavaAPI を使って実装してみます。

ソースは http://github.com/fits/try_samples/tree/master/blog/20171016/

ステートマシンの実装

まずは、以下のステートマシンを実装します。

  • 初期状態は Idle 状態
  • Idle 状態で On イベントが発生すると Active 状態へ遷移
  • Active 状態で Off イベントが発生すると Idle 状態へ遷移
現在の状態 Off On
Idle Active
Active Idle

AbstractFSM<状態の型, データの型> を extends する事でステートマシンを実装します。

状態以外にも状態のデータを指定できるので、Active 状態へ変化した回数をカウントするようにしてみました。

指定の状態で特定のイベントを処理するには when(現在の状態, FSMStateFunctionBuilder) を使い、イベントに対する処理は matchEventEquals() 等を使います。

goTo(遷移先の状態) で状態を遷移させ、その際に using(データ) で状態遷移後のデータを指定できます。

処理対象外のイベントを受け取った場合の処理は whenUnhandled(FSMStateFunctionBuilder) で指定でき、状態遷移の状況確認に onTransition が使えます。

sample.groovy
@Grab('com.typesafe.akka:akka-actor_2.12:2.5.6')
import akka.actor.AbstractFSM
import akka.actor.ActorSystem
import akka.actor.ActorRef
import akka.actor.Props

enum States { Idle, Active }
enum Events { On, Off }

class SampleStateMachine extends AbstractFSM<States, Integer> {
    {
        // 初期状態の設定
        startWith(States.Idle, 0)

        // On イベントで Idle から Active 状態へ遷移
        when(States.Idle, matchEventEquals(Events.On) { event, data ->
            // Active へ遷移
            goTo(States.Active).using(data + 1)
        })

        // Off イベントで Active から Idle 状態へ遷移
        when(States.Active, matchEventEquals(Events.Off) { event, data ->
            // Idle へ遷移
            goTo(States.Idle)
        })

        // 処理されないイベント発生時
        whenUnhandled(
            matchAnyEvent { event, data ->
                println "*** Unhandled event=${event}, data=${data}"
                stay()
            }
        )

        // 状態遷移の発生時
        onTransition { from, to -> 
            println "*** stateChanged: ${from} -> ${to}, data=${stateData()}, next data=${nextStateData()}"
        }
    }
}

def system = ActorSystem.create()

def actor = system.actorOf(Props.create(SampleStateMachine))

actor.tell(Events.On, ActorRef.noSender())
actor.tell(Events.Off, ActorRef.noSender())

actor.tell(Events.Off, ActorRef.noSender())

sleep 2000

system.terminate()

実行結果は以下の通りです。

実行結果
> groovy sample.groovy

*** stateChanged: Idle -> Active, data=0, next data=1
*** stateChanged: Active -> Idle, data=1, next data=1
*** Unhandled event=Off, data=1

タイムアウト付きステートマシンの実装1

次は、タイムアウト時の遷移を追加してみます。

現在の状態 Off On Timeout (2秒)
Idle Active
Active Idle Idle

when で scala.concurrent.duration.FiniteDuration を指定すると状態のタイムアウト ※ を指定できます。

 ※ ただし、このタイムアウトは
    その状態で何もメッセージを受信しなかった時間に対する
    タイムアウトのようです

    状態の遷移が発生しなくても
    メッセージを受信する度にタイムアウトはリセットされます

タイムアウト発生時のイベント(メッセージ)は StateTimeout() の戻り値と等しくなります。

timeout_sample1.groovy
・・・
import scala.concurrent.duration.Duration
import java.util.concurrent.TimeUnit

enum States { Idle, Active }
enum Events { On, Off }

class SampleStateMachine extends AbstractFSM<States, Integer> {
    {
        startWith(States.Idle, 0)

        when(States.Idle, matchEventEquals(Events.On) { event, data ->
            goTo(States.Active).using(data + 1)
        })

        when(States.Active, Duration.create(2, TimeUnit.SECONDS), 
            matchEventEquals(Events.Off) { event, data ->
                goTo(States.Idle)
            }.eventEquals(StateTimeout()) { event, data ->
                // タイムアウト時の処理
                println "*** timeout: event=${event}, data=${data}"

                goTo(States.Idle)
            /*  以下でも可
                self().tell(Events.Off, self())
                stay()
            */
            }
        )
        ・・・
    }
}

def system = ActorSystem.create()

def actor = system.actorOf(Props.create(SampleStateMachine))

actor.tell(Events.On, ActorRef.noSender())
actor.tell(Events.Off, ActorRef.noSender())

actor.tell(Events.Off, ActorRef.noSender())

actor.tell(Events.On, ActorRef.noSender())

sleep 2500

system.terminate()

実行結果は以下の通りです。

実行結果
> groovy timeout_sample1.groovy

*** stateChanged: Idle -> Active, data=0, next data=1
*** stateChanged: Active -> Idle, data=1, next data=1
*** Unhandled event=Off, data=1
*** stateChanged: Idle -> Active, data=1, next data=2
*** timeout: event=StateTimeout, data=2
*** stateChanged: Active -> Idle, data=2, next data=2

when で指定したタイムアウトはイベント(メッセージ)を受信するとリセットされる事を確認するため、以下のようにタイムアウト発生前に invalid-message という文字列を 2回 tell するようにしてみます。

timeout_sample1b.groovy (タイムアウトの検証)
・・・
def system = ActorSystem.create()

def actor = system.actorOf(Props.create(SampleStateMachine))

・・・

actor.tell(Events.On, ActorRef.noSender())

sleep 1500

// 1回目
actor.tell('invalid-message', ActorRef.noSender())

sleep 1500

// 2回目
actor.tell('invalid-message', ActorRef.noSender())

sleep 2500

system.terminate()

実行結果は、以下のように invalid-message という文字列を 2回受信した後にタイムアウトしました。

つまり、メッセージの受信でタイムアウトがリセットされていると考えられます。

実行結果(タイムアウトの検証)
> groovy timeout_sample1b.groovy

・・・
*** stateChanged: Idle -> Active, data=1, next data=2
*** Unhandled event=invalid-message, data=2
*** Unhandled event=invalid-message, data=2
*** timeout: event=StateTimeout, data=2
*** stateChanged: Active -> Idle, data=2, next data=2

タイムアウト付きステートマシンの実装2 (状態のタイムアウト

メッセージの受信有無に左右されないタイムアウトを実現するには、when のタイムアウトを使わずに setTimer() を使う事で実装できそうです。

timeout_sample2.groovy
・・・
class SampleStateMachine extends AbstractFSM<States, Integer> {
    {
        ・・・
        when(States.Active, 
            matchEventEquals(Events.Off) { event, data ->
                goTo(States.Idle)
            }.eventEquals(StateTimeout()) { event, data ->
                println "*** timeout: event=${event}, data=${data}"

                goTo(States.Idle)
            /* 以下でも可
                self().tell(Events.Off, self())
                stay()
            */
            }
        )
        ・・・
        onTransition { from, to -> 
            println "*** stateChanged: ${from} -> ${to}, data=${stateData()}, next data=${nextStateData()}"

            if (to == States.Active) {
                // Active 状態のタイムアウト設定
                setTimer(
                    'active-timeout', 
                    StateTimeout(), 
                    Duration.create(2, TimeUnit.SECONDS)
                )
            }
            else {
                // タイムアウトのキャンセル
                cancelTimer('active-timeout')
            }
        }
    }
}

def system = ActorSystem.create()

def actor = system.actorOf(Props.create(SampleStateMachine))

・・・

actor.tell(Events.On, ActorRef.noSender())

sleep 1500

actor.tell("invalid-message", ActorRef.noSender())

sleep 1500

actor.tell("invalid-message", ActorRef.noSender())

sleep 2500

system.terminate()

実行してみると、2回目の invalid-message を受信する前にタイムアウトが発生しており意図通りの動作となりました。

実行結果
> groovy timeout_sample2.groovy

・・・
*** stateChanged: Idle -> Active, data=1, next data=2
*** Unhandled event=invalid-message, data=2
*** timeout: event=StateTimeout, data=2
*** stateChanged: Active -> Idle, data=2, next data=2
*** Unhandled event=invalid-message, data=2

Spring Statemachine でステートマシンを処理

Spring Statemachine を使って単純な有限ステートマシン(FSM)を実装してみました。

ソースは http://github.com/fits/try_samples/tree/master/blog/20171002/

はじめに

Spring Boot 2.0.0.M4 を使用して Kotlin で実装するため、以下のような Gradle ビルド定義を使いました。

build.gradle
buildscript {
    ext {
        kotlinVersion = '1.1.51'
        springBootVersion = '2.0.0.M4'
    }
    repositories {
        mavenCentral()
        maven { url "https://repo.spring.io/snapshot" }
        maven { url "https://repo.spring.io/milestone" }
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
        classpath("org.jetbrains.kotlin:kotlin-gradle-plugin:${kotlinVersion}")
        classpath("org.jetbrains.kotlin:kotlin-allopen:${kotlinVersion}")
    }
}

apply plugin: 'kotlin'
apply plugin: 'kotlin-spring'
apply plugin: 'org.springframework.boot'

compileKotlin {
    kotlinOptions.jvmTarget = "1.8"
}

repositories {
    mavenCentral()
    maven { url "https://repo.spring.io/snapshot" }
    maven { url "https://repo.spring.io/milestone" }
}

dependencies {
    // JDK 9 でも実行できるようにバージョンを設定
    compile("org.springframework.boot:spring-boot-starter:${springBootVersion}")
    // Spring Statemachine
    compile('org.springframework.statemachine:spring-statemachine-core:2.0.0.BUILD-SNAPSHOT')

    compile("org.jetbrains.kotlin:kotlin-stdlib-jre8:${kotlinVersion}")
    compile("org.jetbrains.kotlin:kotlin-reflect:${kotlinVersion}")
}

上記は Spring Initializr で生成したものをベースに多少の変更 ※ を加えています。

 ※ 不要な設定を削除し、JDK 9 でも実行できるように
    compile(・・・) で spring-boot-starter のバージョンを設定

      compile('org.springframework.boot:spring-boot-starter') のままでは
      JDK 9 で以下のようなエラーが発生したため(バージョン指定が欠ける)

        Could not find org.springframework.boot:spring-boot-starter:.

a. StateMachineBuilder 使用

Spring Statemachine では、有限ステートマシンを定義するために以下のような方法が用意されているようなので、まずは StateMachineBuilder を使ってみます。

実装するステートマシンは以下の通りです。

  • 初期状態は Idle 状態
  • Idle 状態で On イベントが発生すると Active 状態へ遷移
  • Active 状態で Off イベントが発生すると Idle 状態へ遷移
現在の状態 Off On
Idle Active
Active Idle

Spring Statemachine におけるステートマシンは StateMachine<状態の型, イベントの型> として扱います。

今回は状態の型を States、イベントの型を Events とし、enum で定義しています。

StateMachineBuilder を使用する場合、builder() で取得した StateMachineBuilder.Builder<状態の型, イベントの型> に対してステートマシンの状態(初期状態など)や状態遷移等の設定を行います。

状態の設定は configureStates()StateMachineStateConfigurer を取得し、更に withStates() で取得した StateConfigurer で設定します。

initial で初期の状態を指定し、states で全ての状態を指定します。

状態遷移の設定は configureTransitions() で取得した StateMachineTransitionConfigurer に対して行います。

別の状態へ遷移する場合は withExternal() で取得した ExternalTransitionConfigurer で設定します。

source(状態) で遷移前の状態、target(状態) で遷移後の状態を指定し event(イベント) で遷移のきっかけとなるイベントを指定します。

その際に、何らかの処理を行う場合は action(処理) で指定できます。

複数の状態遷移を繋げて書きたい場合は and() を使います。

状態遷移等の状況確認には StateMachineListener が使えます。

src/main/kotlin/sample/Application.kt
package sample

import org.springframework.boot.CommandLineRunner
import org.springframework.boot.SpringApplication
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.context.annotation.Bean
import org.springframework.messaging.Message
import org.springframework.statemachine.StateMachine
import org.springframework.statemachine.config.StateMachineBuilder
import org.springframework.statemachine.listener.StateMachineListenerAdapter
import org.springframework.statemachine.state.State

// 状態
enum class States { Idle, Active }
// イベント
enum class Events { On, Off }

@SpringBootApplication
class Application : CommandLineRunner {
    override fun run(vararg args: String?) {
        val machine = stateMachine()
        machine.addStateListener(SampleListener())

        // ステートマシンの開始
        machine.start()

        // Idle -> Active (stateChanged)
        machine.sendEvent(Events.On)

        // Active -> Idle (stateChanged)
        machine.sendEvent(Events.Off)

        // Idle 状態で Off しても何も起こらない (eventNotAccepted)
        machine.sendEvent(Events.Off)
    }

    // 有限ステートマシンの定義
    @Bean
    fun stateMachine(): StateMachine<States, Events> {
        val builder = StateMachineBuilder.builder<States, Events>()
        // 状態の設定
        builder.configureStates().withStates()
                .initial(States.Idle).states(States.values().toSet())

        // 遷移の設定
        builder.configureTransitions()
                // On イベントで Idle から Active 状態へ遷移
                .withExternal().source(States.Idle).target(States.Active).event(Events.On)
                .and()
                // Off イベントで Active から Idle 状態へ遷移
                .withExternal().source(States.Active).target(States.Idle).event(Events.Off)

        return builder.build()
    }
}

class SampleListener : StateMachineListenerAdapter<States, Events>() {
    // 状態遷移の発生時
    override fun stateChanged(from: State<States, Events>?, to: State<States, Events>?) {
        println("*** stateChanged: ${from?.id} -> ${to?.id}")
    }
    // 受付不可なイベント発生時
    override fun eventNotAccepted(event: Message<Events>?) {
        println("*** eventNotAccepted: ${event?.payload}")
    }
}

fun main(args: Array<String>) {
    SpringApplication.run(Application::class.java, *args)
}

実行結果は以下の通りです。

実行結果
> gradle -q bootRun

・・・
*** stateChanged: null -> Idle
・・・
*** stateChanged: Idle -> Active
*** stateChanged: Active -> Idle
*** eventNotAccepted: Off
・・・
・・・ o.s.s.support.LifecycleObjectSupport     : destroy called

処理が終わるとプロセスは終了しました。

b. @StateMachineFactory アノテーション使用

次に、@StateMachineFactory アノテーションを使ってステートマシンを定義します。

状態やイベントの型に enum を使っている場合は、EnumStateMachineConfigurerAdapter<状態の型, イベントの型> を extends したクラスへ @StateMachineFactory を付与します。

この場合、@Autowired 対象の変数の型を StateMachineFactory<状態の型, イベントの型> とします。

@StateMachine アノテーションの場合も基本的に同じで、その場合は @Autowired 対象の型を StateMachine<状態の型, イベントの型> とします。

ステートマシンの定義は、該当する configure(xxxConfigurer) をオーバーライドして StateMachineBuilder と同じ様に設定するだけです。

ここでは、StateMachineBuilder のサンプルへ以下の機能を追加してみました。

  • start() メソッドを呼び出さなくても開始するように autoStartup(true) を設定
  • Active 状態のまま 2秒経過すると Idle 状態へ戻る遷移を追加

状態遷移は以下のようになります。

現在の状態 Off On Timeout (2秒)
Idle Active
Active Idle Idle

ここでは、withInternal()timerOnce(ミリ秒)action(処理) を組み合わせて、Active 状態が 2秒続いた(タイムアウトした)際に Off イベントを送信して Idle 状態へ遷移するようにしてみましたが、timerOnce(ミリ秒)withExternal() でも使えます。

src/main/kotlin/sample/SampleStateMachineConfig.kt
package sample

import org.springframework.statemachine.StateContext
import org.springframework.statemachine.config.EnableStateMachineFactory
import org.springframework.statemachine.config.EnumStateMachineConfigurerAdapter
import org.springframework.statemachine.config.builders.StateMachineConfigurationConfigurer
import org.springframework.statemachine.config.builders.StateMachineStateConfigurer
import org.springframework.statemachine.config.builders.StateMachineTransitionConfigurer

enum class States { Idle, Active }
enum class Events { On, Off }

// 有限ステートマシンの定義
@EnableStateMachineFactory
class SampleStateMachineConfig : EnumStateMachineConfigurerAdapter<States, Events>() {
    override fun configure(config: StateMachineConfigurationConfigurer<States, Events>?) {
        config!!.withConfiguration()
                // 自動的に開始(start メソッドを呼び出す必要がなくなる)
                .autoStartup(true)
    }

    override fun configure(states: StateMachineStateConfigurer<States, Events>?) {
        states!!.withStates()
                .initial(States.Idle).states(States.values().toSet())
    }

    override fun configure(transitions: StateMachineTransitionConfigurer<States, Events>?) {
        transitions!!
                .withExternal().source(States.Idle).target(States.Active).event(Events.On)
                .and()
                .withExternal().source(States.Active).target(States.Idle).event(Events.Off)
                .and()
                .withInternal().source(States.Active).timerOnce(2000).action(this::timeout)
                // 以下でも可
                //.withExternal().source(States.Active).target(States.Idle).timerOnce(2000)
    }

    private fun timeout(ctx: StateContext<States, Events>) {
        println("*** timeout: ${ctx.source.id}")
        // Off イベント送信(Idle 状態へ戻す)
        ctx.stateMachine.sendEvent(Events.Off)
    }
}

上記を @Autowired して使います。

autoStartup を有効化したので StateMachine の start() を呼び出す必要はありません。

src/main/kotlin/sample/Application.kt
package sample

import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.CommandLineRunner
import org.springframework.boot.SpringApplication
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.messaging.Message
import org.springframework.statemachine.config.StateMachineFactory
import org.springframework.statemachine.listener.StateMachineListenerAdapter
import org.springframework.statemachine.state.State

@SpringBootApplication
class Application : CommandLineRunner {
    @Autowired
    lateinit var stateMachineFactory: StateMachineFactory<States, Events>

    override fun run(vararg args: String?) {

        val machine = stateMachineFactory.stateMachine
        machine.addStateListener(SampleListener())

        machine.sendEvent(Events.On)
        machine.sendEvent(Events.Off)

        machine.sendEvent(Events.Off)

        // Active 状態にして放置
        machine.sendEvent(Events.On)

        // timerOnce を使うとプロセスが終了しなくなるため sleep は不要だった
        // Thread.sleep(2500)
    }
}

class SampleListener : StateMachineListenerAdapter<States, Events>() {
    override fun stateChanged(from: State<States, Events>?, to: State<States, Events>?) {
        println("*** stateChanged: ${from?.id} -> ${to?.id}")
    }

    override fun eventNotAccepted(event: Message<Events>?) {
        println("*** eventNotAccepted: ${event?.payload}")
    }
}

fun main(args: Array<String>) {
    SpringApplication.run(Application::class.java, *args)
}

実行結果は以下の通りです。

実行結果
> gradle -q bootRun

・・・
*** stateChanged: Idle -> Active
*** stateChanged: Active -> Idle
*** eventNotAccepted: Off
*** stateChanged: Idle -> Active
・・・
*** timeout: Active
*** stateChanged: Active -> Idle

timerOnce 等を使うとプロセスが終了しなくなるようなので、Ctrl + c 等でプロセスを停止します。

Akka Streams で MQTT Broker へ接続

ローカルで実行した MQTT Broker(前回 参照)に対して Akka Streams の JavaAPI を使って Groovy で接続してみます。

Akka Streams 用の様々なコネクタを備えた Alpakka に MQTT Broker 用の Source や Sink が用意されているので、今回はこちらを使います。

今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170925/

Publish 処理

まずは publish 処理です。

MqttConnectionSettings から MqttSink を作成し、MqttMessage を渡せば MQTT Broker へメッセージを送信できます。

MqttConnectionSettings は MQTT Broker の接続先と clientId、そして永続化の方法を指定して作成します。

clientId はクライアント毎に一意な値を指定します。 (clientId を null にすると IllegalArgumentException: Null clientId となりました)

MqttQoS ではメッセージ到達の QoS を指定します。 MqttQoS.atLeastOnce() は少なくとも 1回(重複の可能性あり)の到達可能性を指定する事になります。

@Grab('com.typesafe.akka:akka-stream_2.12:2.5.4')
@Grab('com.lightbend.akka:akka-stream-alpakka-mqtt_2.12:0.11')
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.javadsl.Source
import akka.stream.alpakka.mqtt.MqttQoS
import akka.stream.alpakka.mqtt.MqttConnectionSettings
import akka.stream.alpakka.mqtt.MqttMessage
import akka.stream.alpakka.mqtt.javadsl.MqttSink
import akka.util.ByteString

def topic = args[0]
def clientId = args[1]
def message = args[2]

def system = ActorSystem.create()
def mat = ActorMaterializer.create(system)

def settings = MqttConnectionSettings.create(
    'tcp://localhost:1883',
    clientId,
    new org.eclipse.paho.client.mqttv3.persist.MemoryPersistence()
)

def msg = MqttMessage.create(topic, ByteString.fromString(message))

Source.single(msg)
    .runWith(MqttSink.create(settings, MqttQoS.atLeastOnce()), mat)

// メッセージ送信前に terminate しないようスリープで調整
sleep 1000

system.terminate()

Subscribe 処理

次に subscribe 処理です。

MqttConnectionSettings から MqttSourceSettings を作り、MqttSource を作成します。

MqttSourceSettings の withSubscriptions で subscribe するトピック名と QoS を指定します。

mqtt_subscribe.groovy
@Grab('com.typesafe.akka:akka-stream_2.12:2.5.4')
@Grab('com.lightbend.akka:akka-stream-alpakka-mqtt_2.12:0.11')
import akka.actor.ActorSystem
import akka.japi.Pair
import akka.stream.ActorMaterializer
import akka.stream.javadsl.Sink
import akka.stream.alpakka.mqtt.MqttQoS
import akka.stream.alpakka.mqtt.MqttSourceSettings
import akka.stream.alpakka.mqtt.MqttConnectionSettings
import akka.stream.alpakka.mqtt.javadsl.MqttSource

def topic = args[0]
def clientId = args[1]

def system = ActorSystem.create()
def mat = ActorMaterializer.create(system)

def settings = MqttSourceSettings.create(
    MqttConnectionSettings.create(
        'tcp://localhost:1883',
        clientId,
        new org.eclipse.paho.client.mqttv3.persist.MemoryPersistence()
    )
).withSubscriptions(
    // トピックと QoS の指定
    Pair.create(topic, MqttQoS.atLeastOnce())
)

MqttSource.create(settings, 10).runWith(Sink.foreach { println it }, mat)

println "subscribe : ${topic}"

System.in.read()

system.terminate()

上記の処理では、再接続の際にセッションがクリアされてしまうので、通信が切断している間のメッセージを後で受け取るような事はできません。

再接続の際にセッションが復元される Persistent Session を適用するには、以下のように withCleanSession(false) とする必要があります。

なお、Persistent Session を適用するには同じ clientId を使って再接続する必要があります。

mqtt_subscribe2.groovy (Persistent Session 版)
・・・

def settings = MqttSourceSettings.create(
    MqttConnectionSettings.create(
        'tcp://localhost:1883',
        clientId,
        new org.eclipse.paho.client.mqttv3.persist.MemoryPersistence()
    ).withCleanSession(false) // Persistent Session
).withSubscriptions(
    Pair.create(topic, MqttQoS.atLeastOnce())
)

・・・

動作確認

前回 の Moquette 組み込み実行スクリプトで MQTT Broker を起動しておきます。

MQTT Broker 実行
> groovy moquette_run.groovy

・・・
[main] INFO io.moquette.server.netty.NettyAcceptor - Server has been bound. host=0.0.0.0, port=1883
・・・
Server started, version 0.10

Subscribe 処理を実行します。

動作確認のため Persistent Session 版も実行していますが、(Publish 処理も含め) clientId へ異なる値をそれぞれ指定します。(ここでは subscribe1・2 と publish1 としています)

また、トピック名は sample とします。

(a) Subscribe1 実行
> groovy mqtt_subscribe.groovy sample subscribe1

subscribe: sample
(b) Subscribe2(Persistent Session 版)実行
> groovy mqtt_subscribe2.groovy sample subscribe2

subscribe: sample

この状態で “a” “ab” “abc” という 3つのメッセージを publish してみます。

Publish 実行
> groovy mqtt_publish.groovy sample publish1 a
> groovy mqtt_publish.groovy sample publish1 ab
> groovy mqtt_publish.groovy sample publish1 abc

Subscribe の結果は以下のようになりました。

(a) Subscribe1 状況
> groovy mqtt_subscribe.groovy sample subscribe1

subscribe: sample
MqttMessage(sample,ByteString(97))
MqttMessage(sample,ByteString(97, 98))
MqttMessage(sample,ByteString(97, 98, 99))
(b) Subscribe2(Persistent Session 版)状況
> groovy mqtt_subscribe2.groovy sample subscribe2

subscribe: sample
MqttMessage(sample,ByteString(97))
MqttMessage(sample,ByteString(97, 98))
MqttMessage(sample,ByteString(97, 98, 99))

ここで (a) と (b) の処理を一度終了しておき、その状態で publish してみます。

Publish 実行
> groovy mqtt_publish.groovy sample publish1 abcd
> groovy mqtt_publish.groovy sample publish1 abcde

(a) と (b) を再実行すると、以下のように Persistent Session 版の方は停止中に publish されたメッセージを取得できました。

(a) Subscribe1 再実行
> groovy mqtt_subscribe.groovy sample subscribe1

subscribe: sample
(b) Subscribe2(Persistent Session 版)再実行
> groovy mqtt_subscribe2.groovy sample subscribe2

subscribe: sample
MqttMessage(sample,ByteString(97, 98, 99, 100))
MqttMessage(sample,ByteString(97, 98, 99, 100, 101))

MQTT Broker をローカル実行

以下の MQTT Broker をそれぞれローカルで実行してみました。

ソースは http://github.com/fits/try_samples/tree/master/blog/20170910/

Mosca

Mosca は Node.js 用の MQTT Broker です。

npm でインストールして mosca コマンドで実行できます。

インストール例
> npm install mosca
実行例
> mosca -v

       +++.+++:   ,+++    +++;   '+++    +++.
      ++.+++.++   ++.++  ++,'+  `+',++  ++,++
      +`  +,  +: .+  .+  +;  +; '+  '+  +`  +`
      +`  +.  +: ,+  `+  ++  +; '+  ;+  +   +.
      +`  +.  +: ,+  `+   +'    '+      +   +.
      +`  +.  +: ,+  `+   :+.   '+      +++++.
      +`  +.  +: ,+  `+    ++   '+      +++++.
      +`  +.  +: ,+  `+     ++  '+      +   +.
      +`  +.  +: ,+  `+  +:  +: '+  ;+  +   +.
      +`  +.  +: .+  .+  +;  +; '+  '+  +   +.
      +`  +.  +:  ++;++  ++'++   ++'+'  +   +.
      +`  +.  +:   +++    +++.   ,++'   +   +.
{"pid":11260,"hostname":"host1","name":"mosca","level":30,"time":1504448625943,"msg":"server started","mqtt":1883,"v":1}

ログが JSON で出力されていますが、pino ※ を使えば以下のようにログを整形して出力してくれます。

 ※ mosca 2.5.2 を npm install すると pino もインストールされました
実行例 - pino 利用
> mosca -v | pino

       +++.+++:   ,+++    +++;   '+++    +++.
      ++.+++.++   ++.++  ++,'+  `+',++  ++,++
      +`  +,  +: .+  .+  +;  +; '+  '+  +`  +`
      +`  +.  +: ,+  `+  ++  +; '+  ;+  +   +.
      +`  +.  +: ,+  `+   +'    '+      +   +.
      +`  +.  +: ,+  `+   :+.   '+      +++++.
      +`  +.  +: ,+  `+    ++   '+      +++++.
      +`  +.  +: ,+  `+     ++  '+      +   +.
      +`  +.  +: ,+  `+  +:  +: '+  ;+  +   +.
      +`  +.  +: .+  .+  +;  +; '+  '+  +   +.
      +`  +.  +:  ++;++  ++'++   ++'+'  +   +.
      +`  +.  +:   +++    +++.   ,++'   +   +.
[2017-09-03T14:24:23.929Z] INFO (mosca/3124 on host1): server started
    mqtt: 1883

サーバー組み込み実行

Mosca を組み込み実行するコードは以下の通りです。

実際は new mosca.Server() だけでサーバーが起動するのですが、そのままだとクライアントからの接続状況が分かり難いのでログ出力しています。

mosca_run.js
const mosca = require('mosca')

const server = new mosca.Server()

server.on('ready', () => console.log('server started'))

server.on('clientConnected', client => 
    console.log(`client connected: ${client.id}`))

server.on('published', (packet) => 
    console.log(`published: ${JSON.stringify(packet)}`))

クライアント処理

MQTT.js をインストールして MQTT のクライアント処理を実装してみます。

MQTT.js インストール例
> npm install mqtt

まずは、指定のトピックへメッセージを publish する処理です。

publish_sample.js
const mqtt = require('mqtt')
const client = mqtt.connect('mqtt://localhost')

const topic = process.argv[2]
const msg = process.argv[3]

client.on('connect', () => {
    client.publish(topic, msg)

    client.end()
})

次に、指定のトピックを subscribe する処理です。

subscribe_sample.js
const mqtt = require('mqtt')
const client = mqtt.connect('mqtt://localhost')

const topic = process.argv[2]

client.on('connect', () => {
    client.subscribe(topic)
})

client.on('message', (topic, msg) => {
    console.log(`topic: ${topic}, msg: ${msg}`)
})

動作確認

まずは Mosca サーバーを実行しておきます。(mosca コマンドで実行しても可)

サーバー組み込み実行
> node mosca_run.js

server started

次に data トピックを subscribe してみます。

subscribe 実行
> node subscribe_sample.js data

data トピックへ sample1 ~ 3 という文字列を publish してみます。

publish 実行
> node publish_sample.js data sample1
> node publish_sample.js data sample2
> node publish_sample.js data sample3

subscribe 側にメッセージが出力されました。

subscribe の結果
topic: data, msg: sample1
topic: data, msg: sample2
topic: data, msg: sample3

Moquette

Moquette は Java 用の MQTT Broker です。

distribution-0.10-bundle-tar.tar.gz をダウンロード・解凍した後、bin/moquette.bat や bin/moquette.sh で実行できるようですが、moquette.bat の内容に問題があって、そのままでは Java 8 で実行できませんでした。(## の行を削除するか rem を付けて、JAVA_OPTS の設定を削る等が必要でした)

サーバー組み込み実行

組み込み実行は io.moquette.server.Servermain メソッドを呼び出すだけです。

ただし、このままでは IllegalArgumentException: Can't locate file "null" となってしまうので config/moquette.conf ファイルを作成しておきます。(デフォルト設定を使うのなら中身は空でよい)

moquette_run.groovy
@GrabResolver(name = 'bintray', root = 'https://jcenter.bintray.com')
@Grab('io.moquette:moquette-broker:0.10')
@Grab('org.slf4j:slf4j-simple:1.7.25')
import io.moquette.server.Server

Server.main(args)

動作確認

Mosca と同様に動作確認を行ってみます。

サーバー組み込み実行
> groovy moquette_run.groovy

・・・
[main] INFO io.moquette.server.netty.NettyAcceptor - Server has been bound. host=0.0.0.0, port=1883
[main] INFO io.moquette.server.netty.NettyAcceptor - Configuring Websocket MQTT transport
[main] INFO io.moquette.server.netty.NettyAcceptor - Property websocket_port has been setted to disabled. Websocket MQTT will be disabled
[main] INFO io.moquette.server.Server - Moquette server has been initialized successfully
Server started, version 0.10

data トピックを subscribe してみます。

subscribe 実行
> node subscribe_sample.js data

data トピックへ sample1 ~ 3 という文字列を publish してみます。

publish 実行
> node publish_sample.js data sample1
> node publish_sample.js data sample2
> node publish_sample.js data sample3

subscribe 側にメッセージが出力されました。

subscribe の結果
topic: data, msg: sample1
topic: data, msg: sample2
topic: data, msg: sample3