辞書ベースの日本語 Tokenizer - Kuromoji, Sudachi, Fugashi, Kagome, Lindera

辞書をベースに処理する日本語 Tokenizer のいくつかをコードを書いて実行してみました。

  • (a) Lucene Kuromoji
  • (b) atilika Kuromoji
  • (c) Sudachi
  • (d) Kuromoji.js
  • (e) Fugashi
  • (f) Kagome
  • (g) Lindera

今回は以下の文を処理して分割された単語と品詞を出力します。

処理対象文
 WebAssemblyがサーバーレス分野へ大きな影響を与えるだろうと答えた回答者は全体の56%だった。

システム辞書だけを使用し、分割モードを指定する場合は固有名詞などをそのままにする(細かく分割しない)モードを選ぶ事にします。

ソースコードhttps://github.com/fits/try_samples/tree/master/blog/20220106/

(a) Lucene Kuromoji

Lucene に組み込まれた Kuromoji で Elasticsearch や Solr で使われます。

kuromoji.gradle を見ると、システム辞書は以下のどちらかを使うようになっているようです。

a1

lucene-analyzers-kuromoji の JapaneseTokenizer を使います。 辞書は IPADIC のようです。

lucene/a1.groovy
@Grab('org.apache.lucene:lucene-analyzers-kuromoji:8.11.1')
import org.apache.lucene.analysis.ja.JapaneseTokenizer;
import org.apache.lucene.analysis.ja.JapaneseTokenizer.Mode
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute
import org.apache.lucene.analysis.ja.tokenattributes.PartOfSpeechAttribute

def text = args[0]

new JapaneseTokenizer(null, false, Mode.NORMAL).withCloseable { tokenizer ->
    def term = tokenizer.addAttribute(CharTermAttribute)
    def pos = tokenizer.addAttribute(PartOfSpeechAttribute)

    tokenizer.reader = new StringReader(text)
    tokenizer.reset()

    while(tokenizer.incrementToken()) {
        println "term=${term}, partOfSpeech=${pos.partOfSpeech}"
    }
}
a1 結果
> groovy a1.groovy "WebAssemblyがサーバーレス分野へ大きな影響を与えるだろうと答えた回答者は全体の56%だった。"

term=WebAssembly, partOfSpeech=名詞-固有名詞-組織
term=が, partOfSpeech=助詞-格助詞-一般
term=サーバー, partOfSpeech=名詞-一般
term=レス, partOfSpeech=名詞-サ変接続
term=分野, partOfSpeech=名詞-一般
term=へ, partOfSpeech=助詞-格助詞-一般
term=大きな, partOfSpeech=連体詞
term=影響, partOfSpeech=名詞-サ変接続
term=を, partOfSpeech=助詞-格助詞-一般
term=与える, partOfSpeech=動詞-自立
term=だろ, partOfSpeech=助動詞
term=う, partOfSpeech=助動詞
term=と, partOfSpeech=助詞-格助詞-引用
term=答え, partOfSpeech=動詞-自立
term=た, partOfSpeech=助動詞
term=回答, partOfSpeech=名詞-サ変接続
term=者, partOfSpeech=名詞-接尾-一般
term=は, partOfSpeech=助詞-係助詞
term=全体, partOfSpeech=名詞-副詞可能
term=の, partOfSpeech=助詞-連体化
term=5, partOfSpeech=名詞-数
term=6, partOfSpeech=名詞-数
term=%, partOfSpeech=名詞-接尾-助数詞
term=だっ, partOfSpeech=助動詞
term=た, partOfSpeech=助動詞
term=。, partOfSpeech=記号-句点

a2

org.codelibs が上記の ipadic-neologd 版を提供していたので、ついでに試してみました。

処理内容はそのままで、モジュールとパッケージ名を変えるだけです。

lucene/a2.groovy
@GrabResolver('https://maven.codelibs.org/')
@Grab('org.codelibs:lucene-analyzers-kuromoji-ipadic-neologd:8.2.0-20200120')
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute
import org.codelibs.neologd.ipadic.lucene.analysis.ja.JapaneseTokenizer
import org.codelibs.neologd.ipadic.lucene.analysis.ja.JapaneseTokenizer.Mode
import org.codelibs.neologd.ipadic.lucene.analysis.ja.tokenattributes.PartOfSpeechAttribute

def text = args[0]

new JapaneseTokenizer(null, false, Mode.NORMAL).withCloseable { tokenizer ->
    ・・・
}
a2 結果
> groovy a2.groovy "WebAssemblyがサーバーレス分野へ大きな影響を与えるだろうと答えた回答者は全体の56%だった。"

term=WebAssembly, partOfSpeech=名詞-固有名詞-組織
term=が, partOfSpeech=助詞-格助詞-一般
term=サーバーレス, partOfSpeech=名詞-固有名詞-一般
term=分野, partOfSpeech=名詞-一般
term=へ, partOfSpeech=助詞-格助詞-一般
term=大きな, partOfSpeech=連体詞
term=影響, partOfSpeech=名詞-サ変接続
term=を, partOfSpeech=助詞-格助詞-一般
term=与える, partOfSpeech=動詞-自立
term=だろ, partOfSpeech=助動詞
term=う, partOfSpeech=助動詞
term=と, partOfSpeech=助詞-格助詞-引用
term=答え, partOfSpeech=動詞-自立
term=た, partOfSpeech=助動詞
term=回答者, partOfSpeech=名詞-固有名詞-一般
term=は, partOfSpeech=助詞-係助詞
term=全体, partOfSpeech=名詞-副詞可能
term=の, partOfSpeech=助詞-連体化
term=5, partOfSpeech=名詞-数
term=6, partOfSpeech=名詞-数
term=%, partOfSpeech=名詞-接尾-助数詞
term=だっ, partOfSpeech=助動詞
term=た, partOfSpeech=助動詞
term=。, partOfSpeech=記号-句点

a1 と違って "サーバーレス" や "回答者" となりました。

(b) atilika Kuromoji

https://github.com/atilika/kuromoji

Lucene Kuromoji のベースとなった Kuromoji。 更新は途絶えているようですが、色々な辞書に対応しています。

ここでは以下の 2種類の辞書を試してみました。

  • UniDic(2.1.2)
  • JUMAN(7.0-20130310)

b1

まずは、UniDic 版です。

kuromoji/b1.groovy
@Grab('com.atilika.kuromoji:kuromoji-unidic:0.9.0')
import com.atilika.kuromoji.unidic.Tokenizer

def text = args[0]
def tokenizer = new Tokenizer()

tokenizer.tokenize(args[0]).each {
    def pos = [
        it.partOfSpeechLevel1,
        it.partOfSpeechLevel2,
        it.partOfSpeechLevel3,
        it.partOfSpeechLevel4
    ]

    println "term=${it.surface}, partOfSpeech=${pos}"
}
b1 結果
> groovy b1.groovy "WebAssemblyがサーバーレス分野へ大きな影響を与えるだろうと答えた回答者は全体の56%だ った。"

term=WebAssembly, partOfSpeech=[名詞, 普通名詞, 一般, *]
term=が, partOfSpeech=[助詞, 格助詞, *, *]
term=サーバー, partOfSpeech=[名詞, 普通名詞, 一般, *]
term=レス, partOfSpeech=[名詞, 普通名詞, 一般, *]
term=分野, partOfSpeech=[名詞, 普通名詞, 一般, *]
term=へ, partOfSpeech=[助詞, 格助詞, *, *]
term=大きな, partOfSpeech=[連体詞, *, *, *]
term=影響, partOfSpeech=[名詞, 普通名詞, サ変可能, *]
term=を, partOfSpeech=[助詞, 格助詞, *, *]
term=与える, partOfSpeech=[動詞, 一般, *, *]
term=だろう, partOfSpeech=[助動詞, *, *, *]
term=と, partOfSpeech=[助詞, 格助詞, *, *]
term=答え, partOfSpeech=[動詞, 一般, *, *]
term=た, partOfSpeech=[助動詞, *, *, *]
term=回答, partOfSpeech=[名詞, 普通名詞, サ変可能, *]
term=者, partOfSpeech=[接尾辞, 名詞的, 一般, *]
term=は, partOfSpeech=[助詞, 係助詞, *, *]
term=全体, partOfSpeech=[名詞, 普通名詞, 一般, *]
term=の, partOfSpeech=[助詞, 格助詞, *, *]
term=5, partOfSpeech=[名詞, 数詞, *, *]
term=6, partOfSpeech=[名詞, 数詞, *, *]
term=%, partOfSpeech=[名詞, 普通名詞, 助数詞可能, *]
term=だっ, partOfSpeech=[助動詞, *, *, *]
term=た, partOfSpeech=[助動詞, *, *, *]
term=。, partOfSpeech=[補助記号, 句点, *, *]

"だろう" が分割されていないのが特徴。

b2

JUMAN 辞書版です。

kuromoji/b2.groovy
@Grab('com.atilika.kuromoji:kuromoji-jumandic:0.9.0')
import com.atilika.kuromoji.jumandic.Tokenizer

def text = args[0]
def tokenizer = new Tokenizer()

・・・
b2 結果
> groovy b2.groovy "WebAssemblyがサーバーレス分野へ大きな影響を与えるだろうと答えた回答者は全体の56%だ った。"

term=WebAssembly, partOfSpeech=[名詞, 組織名, *, *]
term=が, partOfSpeech=[助詞, 格助詞, *, *]
term=サーバーレス, partOfSpeech=[名詞, 人名, *, *]
term=分野, partOfSpeech=[名詞, 普通名詞, *, *]
term=へ, partOfSpeech=[助詞, 格助詞, *, *]
term=大きな, partOfSpeech=[連体詞, *, *, *]
term=影響, partOfSpeech=[名詞, サ変名詞, *, *]
term=を, partOfSpeech=[助詞, 格助詞, *, *]
term=与える, partOfSpeech=[動詞, *, 母音動詞, 基本形]
term=だろう, partOfSpeech=[助動詞, *, 助動詞だろう型, 基本形]
term=と, partOfSpeech=[助詞, 格助詞, *, *]
term=答えた, partOfSpeech=[動詞, *, 母音動詞, タ形]
term=回答, partOfSpeech=[名詞, サ変名詞, *, *]
term=者, partOfSpeech=[接尾辞, 名詞性名詞接尾辞, *, *]
term=は, partOfSpeech=[助詞, 副助詞, *, *]
term=全体, partOfSpeech=[名詞, 普通名詞, *, *]
term=の, partOfSpeech=[助詞, 接続助詞, *, *]
term=56, partOfSpeech=[名詞, 数詞, *, *]
term=%, partOfSpeech=[接尾辞, 名詞性名詞助数辞, *, *]
term=だった, partOfSpeech=[判定詞, *, 判定詞, ダ列タ形]
term=。, partOfSpeech=[特殊, 句点, *, *]

"サーバーレス"、"だろう"、"答えた"、"56"、"だった" が分割されていないのが特徴。 "サーバーレス" が人名となっているのは不思議。

(c) Sudachi

https://github.com/WorksApplications/Sudachi

Rust 版 もありますが、Java 版を使いました。

辞書は UniDic と NEologd をベースに調整したものらしく、3種類(Small, Core, Full)用意されています。

辞書が継続的にメンテナンスされており最新のものを使えるのが魅力だと思います。

ここではデフォルトの Core 辞書を使いました。(system_core.dic ファイルをカレントディレクトリに配置して実行)

  • Core 辞書(20211220 版)

また、Elasticsearch 用のプラグイン analysis-sudachi も用意されています。

sudachi/c1.groovy
@Grab('com.worksap.nlp:sudachi:0.5.3')
import com.worksap.nlp.sudachi.DictionaryFactory
import com.worksap.nlp.sudachi.Tokenizer

def text = args[0]

new DictionaryFactory().create().withCloseable { dic ->
    def tokenizer = dic.create()
    def ts = tokenizer.tokenize(Tokenizer.SplitMode.C, text)

    ts.each { t ->
        def pos = dic.getPartOfSpeechString(t.partOfSpeechId())

        println "term=${t.surface()}, partOfSpeech=${pos}"
    }
}
c1 結果
> groovy c1.groovy "WebAssemblyがサーバーレス分野へ大きな影響を与えるだろうと答えた回答者は全体の56%だっ た。"

term=WebAssembly, partOfSpeech=[名詞, 普通名詞, 一般, *, *, *]
term=が, partOfSpeech=[助詞, 格助詞, *, *, *, *]
term=サーバーレス, partOfSpeech=[名詞, 普通名詞, 一般, *, *, *]
term=分野, partOfSpeech=[名詞, 普通名詞, 一般, *, *, *]
term=へ, partOfSpeech=[助詞, 格助詞, *, *, *, *]
term=大きな, partOfSpeech=[連体詞, *, *, *, *, *]
term=影響, partOfSpeech=[名詞, 普通名詞, サ変可能, *, *, *]
term=を, partOfSpeech=[助詞, 格助詞, *, *, *, *]
term=与える, partOfSpeech=[動詞, 一般, *, *, 下一段-ア行, 終止形-一般]
term=だろう, partOfSpeech=[助動詞, *, *, *, 助動詞-ダ, 意志推量形]
term=と, partOfSpeech=[助詞, 格助詞, *, *, *, *]
term=答え, partOfSpeech=[動詞, 一般, *, *, 下一段-ア行, 連用形-一般]
term=た, partOfSpeech=[助動詞, *, *, *, 助動詞-タ, 連体形-一般]
term=回答者, partOfSpeech=[名詞, 普通名詞, 一般, *, *, *]
term=は, partOfSpeech=[助詞, 係助詞, *, *, *, *]
term=全体, partOfSpeech=[名詞, 普通名詞, 一般, *, *, *]
term=の, partOfSpeech=[助詞, 格助詞, *, *, *, *]
term=56, partOfSpeech=[名詞, 数詞, *, *, *, *]
term=%, partOfSpeech=[名詞, 普通名詞, 助数詞可能, *, *, *]
term=だっ, partOfSpeech=[助動詞, *, *, *, 助動詞-ダ, 連用形-促音便]
term=た, partOfSpeech=[助動詞, *, *, *, 助動詞-タ, 終止形-一般]
term=。, partOfSpeech=[補助記号, 句点, *, *, *, *]

"サーバーレス"、"だろう"、"回答者"、"56" となっているのが特徴。

(d) Kuromoji.js

https://github.com/takuyaa/kuromoji.js/

Kuromoji の JavaScript 実装。

kuromoji.js/d1.mjs
import kuromoji from 'kuromoji'

const dicPath = 'node_modules/kuromoji/dict'
const text = process.argv[2]

kuromoji.builder({ dicPath }).build((err, tokenizer) => {
    if (err) {
        console.error(err)
        return
    }

    const ts = tokenizer.tokenize(text)

    for (const t of ts) {
        const pos = [t.pos, t.pos_detail_1, t.pos_detail_2, t.pos_detail_3]

        console.log(`term=${t.surface_form}, partOfSpeech=${pos}`)
    }
})
d1 結果
> node d1.mjs "WebAssemblyがサーバーレス分野へ大きな影響を与えるだろうと答えた回答者は全体の56%だった。"

term=WebAssembly, partOfSpeech=名詞,固有名詞,組織,*
term=が, partOfSpeech=助詞,格助詞,一般,*
term=サーバー, partOfSpeech=名詞,一般,*,*
term=レス, partOfSpeech=名詞,サ変接続,*,*
term=分野, partOfSpeech=名詞,一般,*,*
term=へ, partOfSpeech=助詞,格助詞,一般,*
term=大きな, partOfSpeech=連体詞,*,*,*
term=影響, partOfSpeech=名詞,サ変接続,*,*
term=を, partOfSpeech=助詞,格助詞,一般,*
term=与える, partOfSpeech=動詞,自立,*,*
term=だろ, partOfSpeech=助動詞,*,*,*
term=う, partOfSpeech=助動詞,*,*,*
term=と, partOfSpeech=助詞,格助詞,引用,*
term=答え, partOfSpeech=動詞,自立,*,*
term=た, partOfSpeech=助動詞,*,*,*
term=回答, partOfSpeech=名詞,サ変接続,*,*
term=者, partOfSpeech=名詞,接尾,一般,*
term=は, partOfSpeech=助詞,係助詞,*,*
term=全体, partOfSpeech=名詞,副詞可能,*,*
term=の, partOfSpeech=助詞,連体化,*,*
term=5, partOfSpeech=名詞,数,*,*
term=6, partOfSpeech=名詞,数,*,*
term=%, partOfSpeech=名詞,接尾,助数詞,*
term=だっ, partOfSpeech=助動詞,*,*,*
term=た, partOfSpeech=助動詞,*,*,*
term=。, partOfSpeech=記号,句点,*,*

a1 と同じ結果になりました。

(e) Fugashi

https://github.com/polm/fugashi

MeCabPython 用ラッパー。

辞書として unidic-lite と unidic のパッケージが用意されていましたが、 ここでは JUMAN 辞書を使いました。

  • JUMAN
fugashi/e1.py
from fugashi import GenericTagger
import sys

text = sys.argv[1]

tagger = GenericTagger()

for t in tagger(text):
    pos = t.feature[0:4]
    print(f"term={t.surface}, partOfSpeech={pos}")
e1 結果
> python e1.py "WebAssemblyがサーバーレス分野へ大きな影響を与えるだろうと答えた回答者は全体の56%だった。"

term=WebAssembly, partOfSpeech=('名詞', '組織名', '*', '*')
term=が, partOfSpeech=('助詞', '格助詞', '*', '*')
term=サーバーレス, partOfSpeech=('名詞', '人名', '*', '*')
term=分野, partOfSpeech=('名詞', '普通名詞', '*', '*')
term=へ, partOfSpeech=('助詞', '格助詞', '*', '*')
term=大きな, partOfSpeech=('連体詞', '*', '*', '*')
term=影響, partOfSpeech=('名詞', 'サ変名詞', '*', '*')
term=を, partOfSpeech=('助詞', '格助詞', '*', '*')
term=与える, partOfSpeech=('動詞', '*', '母音動詞', '基本形')
term=だろう, partOfSpeech=('助動詞', '*', '助動詞だろう型', '基本形')
term=と, partOfSpeech=('助詞', '格助詞', '*', '*')
term=答えた, partOfSpeech=('動詞', '*', '母音動詞', 'タ形')
term=回答, partOfSpeech=('名詞', 'サ変名詞', '*', '*')
term=者, partOfSpeech=('接尾辞', '名詞性名詞接尾辞', '*', '*')
term=は, partOfSpeech=('助詞', '副助詞', '*', '*')
term=全体, partOfSpeech=('名詞', '普通名詞', '*', '*')
term=の, partOfSpeech=('助詞', '接続助詞', '*', '*')
term=56, partOfSpeech=('名詞', '数詞', '*', '*')
term=%, partOfSpeech=('接尾辞', '名詞性名詞助数辞', '*', '*')
term=だった, partOfSpeech=('判定詞', '*', '判定詞', 'ダ列タ形')
term=。, partOfSpeech=('特殊', '句点', '*', '*')

同じ辞書を使っている b2 と同じ結果になりました。("サーバーレス" が人名なのも同じ)。

(f) Kagome

https://github.com/ikawaha/kagome

ここでは以下の辞書を使用します。

  • IPADIC(mecab-ipadic-2.7.0-20070801)
  • UniDic(2.1.2)

なお、Tokenize を呼び出した場合は、分割モードとして Normal が適用されるようです。

f1

IPADIC 版

kagome/f1.go
package main

import (
    "fmt"
    "os"

    "github.com/ikawaha/kagome-dict/ipa"
    "github.com/ikawaha/kagome/v2/tokenizer"
)

func main() {
    text := os.Args[1]

    t, err := tokenizer.New(ipa.Dict(), tokenizer.OmitBosEos())

    if err != nil {
        panic(err)
    }
    // 分割モード Normal
    ts := t.Tokenize(text)

    for _, t := range ts {
        fmt.Printf("term=%s, partOfSpeech=%v\n", t.Surface, t.POS())
    }
}
f1 結果
> go run f1.go "WebAssemblyがサーバーレス分野へ大きな影響を与えるだろうと答えた回答者は全体の56%だった。"

term=WebAssembly, partOfSpeech=[名詞 固有名詞 組織 *]
term=が, partOfSpeech=[助詞 格助詞 一般 *]
term=サーバー, partOfSpeech=[名詞 一般 * *]
term=レス, partOfSpeech=[名詞 サ変接続 * *]
term=分野, partOfSpeech=[名詞 一般 * *]
term=へ, partOfSpeech=[助詞 格助詞 一般 *]
term=大きな, partOfSpeech=[連体詞 * * *]
term=影響, partOfSpeech=[名詞 サ変接続 * *]
term=を, partOfSpeech=[助詞 格助詞 一般 *]
term=与える, partOfSpeech=[動詞 自立 * *]
term=だろ, partOfSpeech=[助動詞 * * *]
term=う, partOfSpeech=[助動詞 * * *]
term=と, partOfSpeech=[助詞 格助詞 引用 *]
term=答え, partOfSpeech=[動詞 自立 * *]
term=た, partOfSpeech=[助動詞 * * *]
term=回答, partOfSpeech=[名詞 サ変接続 * *]
term=者, partOfSpeech=[名詞 接尾 一般 *]
term=は, partOfSpeech=[助詞 係助詞 * *]
term=全体, partOfSpeech=[名詞 副詞可能 * *]
term=の, partOfSpeech=[助詞 連体化 * *]
term=5, partOfSpeech=[名詞 数 * *]
term=6, partOfSpeech=[名詞 数 * *]
term=%, partOfSpeech=[名詞 接尾 助数詞 *]
term=だっ, partOfSpeech=[助動詞 * * *]
term=た, partOfSpeech=[助動詞 * * *]
term=。, partOfSpeech=[記号 句点 * *]

同じ辞書を使っている a1 と同じ結果になりました。

f2

UniDic 版

kagome/f2.go
package main

import (
    "fmt"
    "os"

    "github.com/ikawaha/kagome-dict/uni"
    "github.com/ikawaha/kagome/v2/tokenizer"
)

func main() {
    text := os.Args[1]

    t, err := tokenizer.New(uni.Dict(), tokenizer.OmitBosEos())

    ・・・
}
f2 結果
> go run f2.go "WebAssemblyがサーバーレス分野へ大きな影響を与えるだろうと答えた回答者は全体の56%だった。"

term=WebAssembly, partOfSpeech=[名詞 普通名詞 一般 *]
term=が, partOfSpeech=[助詞 格助詞 * *]
term=サーバー, partOfSpeech=[名詞 普通名詞 一般 *]
term=レス, partOfSpeech=[名詞 普通名詞 一般 *]
term=分野, partOfSpeech=[名詞 普通名詞 一般 *]
term=へ, partOfSpeech=[助詞 格助詞 * *]
term=大きな, partOfSpeech=[連体詞 * * *]
term=影響, partOfSpeech=[名詞 普通名詞 サ変可能 *]
term=を, partOfSpeech=[助詞 格助詞 * *]
term=与える, partOfSpeech=[動詞 一般 * *]
term=だろう, partOfSpeech=[助動詞 * * *]
term=と, partOfSpeech=[助詞 格助詞 * *]
term=答え, partOfSpeech=[動詞 一般 * *]
term=た, partOfSpeech=[助動詞 * * *]
term=回答, partOfSpeech=[名詞 普通名詞 サ変可能 *]
term=者, partOfSpeech=[接尾辞 名詞的 一般 *]
term=は, partOfSpeech=[助詞 係助詞 * *]
term=全体, partOfSpeech=[名詞 普通名詞 一般 *]
term=の, partOfSpeech=[助詞 格助詞 * *]
term=5, partOfSpeech=[名詞 数詞 * *]
term=6, partOfSpeech=[名詞 数詞 * *]
term=%, partOfSpeech=[名詞 普通名詞 助数詞可能 *]
term=だっ, partOfSpeech=[助動詞 * * *]
term=た, partOfSpeech=[助動詞 * * *]
term=。, partOfSpeech=[補助記号 句点 * *]

同じ辞書を使っている b1 と同じ結果になりました。

(g) Lindera

https://github.com/lindera-morphology/lindera

kuromoji-rs のフォークのようで、辞書は IPADIC です。

  • IPADIC(mecab-ipadic-2.7.0-20070801)
lindera/src/main.rs
use lindera::tokenizer::Tokenizer;
use lindera_core::LinderaResult;

use std::env;

fn main() -> LinderaResult<()> {
    let text = env::args().nth(1).unwrap_or("".to_string());

    let mut tokenizer = Tokenizer::new()?;
    let ts = tokenizer.tokenize(&text)?;

    for t in ts {
        let pos = t.detail.get(0..4).unwrap_or(&t.detail);

        println!("text={}, partOfSpeech={:?}", t.text, pos);
    }

    Ok(())
}
結果
> cargo run "WebAssemblyがサーバーレス分野へ大きな影響を与えるだろうと答えた回答者は全体の56%だった。"

・・・
text=WebAssembly, partOfSpeech=["UNK"]
text=が, partOfSpeech=["助詞", "格助詞", "一般", "*"]
text=サーバー, partOfSpeech=["名詞", "一般", "*", "*"]
text=レス, partOfSpeech=["名詞", "サ変接続", "*", "*"]
text=分野, partOfSpeech=["名詞", "一般", "*", "*"]
text=へ, partOfSpeech=["助詞", "格助詞", "一般", "*"]
text=大きな, partOfSpeech=["連体詞", "*", "*", "*"]
text=影響, partOfSpeech=["名詞", "サ変接続", "*", "*"]
text=を, partOfSpeech=["助詞", "格助詞", "一般", "*"]
text=与える, partOfSpeech=["動詞", "自立", "*", "*"]
text=だろ, partOfSpeech=["助動詞", "*", "*", "*"]
text=う, partOfSpeech=["助動詞", "*", "*", "*"]
text=と, partOfSpeech=["助詞", "格助詞", "引用", "*"]
text=答え, partOfSpeech=["動詞", "自立", "*", "*"]
text=た, partOfSpeech=["助動詞", "*", "*", "*"]
text=回答, partOfSpeech=["名詞", "サ変接続", "*", "*"]
text=者, partOfSpeech=["名詞", "接尾", "一般", "*"]
text=は, partOfSpeech=["助詞", "係助詞", "*", "*"]
text=全体, partOfSpeech=["名詞", "副詞可能", "*", "*"]
text=の, partOfSpeech=["助詞", "連体化", "*", "*"]
text=5, partOfSpeech=["名詞", "数", "*", "*"]
text=6, partOfSpeech=["名詞", "数", "*", "*"]
text=%, partOfSpeech=["名詞", "接尾", "助数詞", "*"]
text=だっ, partOfSpeech=["助動詞", "*", "*", "*"]
text=た, partOfSpeech=["助動詞", "*", "*", "*"]
text=。, partOfSpeech=["記号", "句点", "*", "*"]

a1 と概ね同じ結果となりましたが、"WebAssembly" が名詞になっていないのが特徴。

Node.js で GraphQL over gRPC 的な事をやってみる

gRPC 上で GraphQL を扱う GraphQL over gRPC 的な処理を Node.js で試しに実装してみました。

今回のコードは http://github.com/fits/try_samples/tree/master/blog/20201124/

はじめに

GraphQL はクエリ言語なので基本的に通信プロトコルには依存していません。

Web フロントエンドの用途では Apollo GraphQL が公開している GraphQL over WebSocket Protocol が有力そうですが、マイクロサービス等の用途で GraphQL を利用する事を考えると WebSocket よりも gRPC の方が適しているように思います。

  • GraphQL の Query や Mutation は gRPC の Unary RPC で実現可能
  • GraphQL の Subscription は gRPC の Server streaming RPC で実現可能 ※

そこで、とりあえず実装し確認してみたというのが本件の趣旨となっています。

 ※ GraphQL の Subscription を使わずに
    gRPC の streaming RPC で代用する事も考えられる

なお、GraphQL の処理に関しては「Deno で GraphQL」の内容をベースに、gRPC は「Node.js で gRPC を試す」の静的コード生成を用いて実装しています。

Query と Mutation - sample1

まずは Subscription を除いた Query と Mutation について実装してみます。

gRPC サービス定義

gRPC のサービス定義を下記のように定義してみました。

GraphQL のクエリは文字列で扱うので型は string、結果は実質的に JSON となるので型は google.protobuf.Struct としました。

ついでに、クエリの変数も渡せるようにして型は google.protobuf.Value としています。

ここで、google.protobuf.StructJSON Object を Protocol Buffers で表現するための型として定義されたもので、google.protobuf.ValueJSON Value(null、文字列、数値、配列、JSON Object 等)を表現するための型です。(参照 google/protobuf/struct.proto

proto/graphql.proto
syntax = "proto3";

import "google/protobuf/struct.proto";

package gql;

message QueryRequest {
    string query = 1;
    google.protobuf.Value variables = 2;
}

service GraphQL {
    rpc Query(QueryRequest) returns (google.protobuf.Struct);
}

この proto/graphql.proto から gRPC のコードを生成しておきます。

静的コード生成
> mkdir generated

> npm run gen-grpc
・・・
grpc_tools_node_protoc --grpc_out=grpc_js:generated --js_out=import_style=commonjs:generated proto/*.proto

package.json の内容は以下の通りです。

package.json
{
  "name": "sample1",
  "version": "1.0.0",
  "description": "",
  "scripts": {
    "gen-grpc": "grpc_tools_node_protoc --grpc_out=grpc_js:generated --js_out=import_style=commonjs:generated proto/*.proto"
  },
  "dependencies": {
    "@grpc/grpc-js": "^1.2.1",
    "google-protobuf": "^3.14.0",
    "graphql": "^15.4.0",
    "uuid": "^8.3.1"
  },
  "devDependencies": {
    "grpc-tools": "^1.10.0"
  }
}

サーバー実装

GraphQL を扱う gRPC サーバーを実装します。

gRPC のリクエストから GraphQL のクエリやその変数の内容を取得し、graphql 関数で処理した結果をレスポンスとして返すような処理となります。

google.protobuf.Struct や Value に該当する型は google-protobuf/google/protobuf/struct_pb.js にて StructValue として定義されており、プレーンな JavaScript オブジェクトと相互変換するための fromJavaScripttoJavaScript メソッドが用意されています。

下記コードでは QueryRequest の variables の内容を JavaScript オブジェクトとして取得するために toJavaScript を、graphql の処理結果を Struct で返すために fromJavaScript をそれぞれ使用しています。

server.js
const grpc = require('@grpc/grpc-js')
const { GraphQLService } = require('./generated/proto/graphql_grpc_pb')
const { Struct } = require('google-protobuf/google/protobuf/struct_pb')

const { graphql, buildSchema } = require('graphql')

const { v4: uuidv4 } = require('uuid')

// GraphQL スキーマ定義
const schema = buildSchema(`
    enum Category {
        Standard
        Extra
    }

    input CreateItem {
        category: Category!
        value: Int!
    }

    type Item {
        id: ID!
        category: Category!
        value: Int!
    }

    type Mutation {
        create(input: CreateItem!): Item
    }

    type Query {
        find(id: ID!): Item
    }
`)

const store = {}
// スキーマ定義に応じた GraphQL 処理の実装
const root = {
    create: ({ input: { category, value } }) => {
        console.log(`*** call create: category = ${category}, value = ${value}`)

        const id = `item-${uuidv4()}`
        const item = { id, category, value }

        store[id] = item

        return item
    },
    find: ({ id }) => {
        console.log(`*** call find: ${id}`)
        return store[id]
    }
}

const server = new grpc.Server()

server.addService(GraphQLService, {
    async query(call, callback) {
        try {
            const query = call.request.getQuery()
            const variables = call.request.getVariables().toJavaScript()
            // GraphQL の処理
            const r = await graphql(schema, query, root, {}, variables)

            callback(null, Struct.fromJavaScript(r))

        } catch(e) {
            console.error(e)
            callback(e)
        }
    }
})

server.bindAsync(
    '127.0.0.1:50051',
    grpc.ServerCredentials.createInsecure(),
    (err, port) => {
        if (err) {
            console.error(err)
            return
        }

        console.log(`start server: ${port}`)

        server.start()
    }
)

クライアント実装

クライアント側は以下のようになります。

client.js
const grpc = require('@grpc/grpc-js')
const { QueryRequest } = require('./generated/proto/graphql_pb')
const { GraphQLClient } = require('./generated/proto/graphql_grpc_pb')
const { Value } = require('google-protobuf/google/protobuf/struct_pb')

const client = new GraphQLClient(
    '127.0.0.1:50051',
    grpc.credentials.createInsecure()
)

const promisify = (obj, methodName) => args => 
    new Promise((resolve, reject) => {
        obj[methodName](args, (err, res) => {
            if (err) {
                reject(err)
            }
            else {
                resolve(res)
            }
        })
    })

const query = promisify(client, 'query')

const createRequest = (q, v = null) => {
    const req = new QueryRequest()

    req.setQuery(q)
    req.setVariables(Value.fromJavaScript(v))

    return req
}

const run = async () => {
    // Item の作成
    const r1 = await query(createRequest(`
        mutation {
            create(input: { category: Extra, value: 123 }) {
                id
            }
        }
    `))

    console.log(r1.toJavaScript())
    // 存在しない Item の find
    const r2 = await query(createRequest(`
        {
            find(id: "a1") {
                id
                value
            }
        }
    `))

    console.log(r2.toJavaScript())

    const id = r1.toJavaScript().data.create.id
    // 作成した Item の find (クエリ変数の使用)
    const r3 = await query(createRequest(
        `
            query findItem($id: ID!) {
                find(id: $id) {
                    id
                    category
                    value
                }
            }
        `,
        { id }
    ))

    console.log(r3.toJavaScript())
}

run().catch(err => console.error(err))

動作確認

server.js を実行しておきます。

server.js 実行
> node server.js
start server: 50051

client.js を実行した結果は以下の通りで、特に問題無く動作しているようです。

client.js 実行
> node client.js
{
  data: { create: { id: 'item-63bb7704-27b6-44ae-b955-61cbad83248d' } }
}
{ data: { find: null } }
{
  data: {
    find: {
      category: 'Extra',
      id: 'item-63bb7704-27b6-44ae-b955-61cbad83248d',
      value: 123
    }
  }
}

Subscription - sample2

次は Subscription の機能を追加します。

gRPC サービス定義

gRPC のサービス定義に Subscription 用のメソッドを追加し、sample1 と同様にコードを生成しておきます。

proto/graphql.proto
syntax = "proto3";

import "google/protobuf/struct.proto";

package gql;

message QueryRequest {
    string query = 1;
    google.protobuf.Value variables = 2;
}

service GraphQL {
    rpc Query(QueryRequest) returns (google.protobuf.Struct);
    rpc Subscription(QueryRequest) returns (stream google.protobuf.Struct);
}

サーバー実装

Deno で GraphQL」では単一の Subscription を処理するだけの実装だったので、複数クライアントからの Subscription を処理するために PubSub というクラスを追加し、subscription の呼び出し毎に MessageBox を作成、(クライアントが接続中の)有効な全ての MessageBox へメッセージを配信するようにしています。

server.js
const grpc = require('@grpc/grpc-js')
const { GraphQLService } = require('./generated/proto/graphql_grpc_pb')
const { Struct } = require('google-protobuf/google/protobuf/struct_pb')

const { graphql, buildSchema, subscribe, parse } = require('graphql')

const { v4: uuidv4 } = require('uuid')
// GraphQL スキーマ定義
const schema = buildSchema(`
    enum Category {
        Standard
        Extra
    }

    input CreateItem {
        category: Category!
        value: Int!
    }

    type Item {
        id: ID!
        category: Category!
        value: Int!
    }

    type Mutation {
        create(input: CreateItem!): Item
    }

    type Query {
        find(id: ID!): Item
    }

    type Subscription {
        created: Item
    }
`)

class MessageBox {
    #promises = []
    #resolves = []

    #appendPromise = () => this.#promises.push(
        new Promise(res => this.#resolves.push(res))
    )

    publish(msg) {
        if (this.#resolves.length == 0) {
            this.#appendPromise()
        }

        this.#resolves.shift()(msg)
    }

    [Symbol.asyncIterator]() {
        return {
            next: async () => {
                console.log('*** asyncIterator next')

                if (this.#promises.length == 0) {
                    this.#appendPromise()
                }

                const value = await this.#promises.shift()
                return { value, done: false }
            }
        }
    }
}
// クライアント毎の MessageBox を管理
class PubSub {
    #subscribes = []

    publish(msg) {
        this.#subscribes.forEach(s => s.publish(msg))
    }

    subscribe() {
        const sub = new MessageBox()
        this.#subscribes.push(sub)

        return sub
    }

    unsubscribe(sub) {
        this.#subscribes = this.#subscribes.filter(s => s != sub)
    }
}

const store = {}
const pubsub = new PubSub()

const root = {
    create: ({ input: { category, value } }) => {
        ・・・
    },
    find: ({ id }) => {
        ・・・
    }
}

const server = new grpc.Server()

server.addService(GraphQLService, {
   async query(call, callback) {
       ・・・
    },
    async subscription(call) {
        console.log('*** subscribed')

        try {
            const query = call.request.getQuery()
            const variables = call.request.getVariables().toJavaScript()

            const sub = pubsub.subscribe()

            call.on('cancelled', () => {
                console.log('*** unsubscribed')
                pubsub.unsubscribe(sub)
            })

            const subRoot = {
                created: () => sub
            }
            // GraphQL の Subscription 処理
            const aiter = await subscribe(schema, parse(query), subRoot, {}, variables)

            for await (const r of aiter) {
                // メッセージの配信
                call.write(Struct.fromJavaScript(r))
            }
        } catch(e) {
            console.error(e)
            call.destroy(e)
        }
    }
})

server.bindAsync(
    ・・・
)

Subscription 用クライアント実装

Subscription を呼び出すクライアントは以下のようになります。

client_subscribe.js
const grpc = require('@grpc/grpc-js')
const { QueryRequest } = require('./generated/proto/graphql_pb')
const { GraphQLClient } = require('./generated/proto/graphql_grpc_pb')
const { Value } = require('google-protobuf/google/protobuf/struct_pb')

const client = new GraphQLClient(
    '127.0.0.1:50051',
    grpc.credentials.createInsecure()
)

const req = new QueryRequest()

req.setQuery(`
    subscription {
        created {
            id
            category
        }
    }
`)

req.setVariables(Value.fromJavaScript(null))

const stream = client.subscription(req)

stream.on('data', msg => {
    const event = msg.toJavaScript()
    console.log(`*** received event = ${JSON.stringify(event)}`)
})

stream.on('end', () => console.log('*** stream end'))
stream.on('error', err => console.log(`*** stream error: ${err}`))

動作確認

server.js を実行した後、client_subscribe.js を 2つ実行して sample1 で作成した client.js を実行すると以下のようになりました。

server.js の出力結果
> node server.js
start server: 50051
*** subscribed
*** asyncIterator next
*** subscribed
*** asyncIterator next
*** call create: category = Extra, value = 123
*** asyncIterator next
*** asyncIterator next
*** call find: a1
*** call find: item-5e3f81ed-774a-4f7f-afc5-000a2db34859
client_subscribe.js の出力結果
> node client_subscribe.js
*** received event = {"data":{"created":{"category":"Extra","id":"item-5e3f81ed-774a-4f7f-afc5-000a2db34859"}}}

Node.js で gRPC を試す

gRPC Server Reflection のクライアント処理」では Node.js で gRPC クライアントを実装しましたが、今回はサーバー側も実装してみます。

サンプルコードは http://github.com/fits/try_samples/tree/master/blog/20201115/

はじめに

gRPC on Node.js では、以下の 2通りの手法が用意されており、それぞれ使用するパッケージが異なります。

  • (a) 動的コード生成 (@grpc/proto-loader パッケージを使用)
  • (b) 静的コード生成 (grpc-tools パッケージを使用)

更に、gRPC の実装ライブラリとして以下の 2種類が用意されており、どちらかを使う事になります。

  • C-based Client and Server (grpc パッケージを使用)
  • Pure JavaScript Client (@grpc/grpc-js パッケージを使用)

@grpc/grpc-js は現時点で Pure JavaScript Client と表現されていますが、クライアントだけではなくサーバーの実装にも使えます。

ここでは、Pure JavaScript 実装の @grpc/grpc-js を使って、(a) と (b) の両方を試してみます。

サービス定義(proto ファイル)

gRPC のサービス定義として下記ファイルを使用します。

Unary RPC(1リクエスト / 1レスポンス)と Server streaming RPC(1リクエスト / 多レスポンス)、message の oneofgoogle.protobuf.Empty の扱い等を確認するような内容にしてみました。

proto/item.proto
syntax = "proto3";

import "google/protobuf/empty.proto";

package item;

message AddItemRequest {
    string item_id = 1;
    uint64 price = 2;
}

message ItemRequest {
    string item_id = 1;
}

message Item {
    string item_id = 1;
    uint64 price = 2;
}

message ItemSubscribeRequest {
}

message AddedItem {
    string item_id = 1;
    uint64 price = 2;
}

message RemovedItem {
    string item_id = 1;
}

message ItemEvent {
    oneof event {
        AddedItem added = 1;
        RemovedItem removed = 2;
    }
}

service ItemManage {
    rpc AddItem(AddItemRequest) returns (google.protobuf.Empty);
    rpc RemoveItem(ItemRequest) returns (google.protobuf.Empty);
    rpc GetItem(ItemRequest) returns (Item);

    rpc Subscribe(ItemSubscribeRequest) returns (stream ItemEvent);
}

(a) 動的コード生成(@grpc/proto-loader)

まずは、@grpc/proto-loader を使った動的コード生成を試します。

インストール

@grpc/proto-loader@grpc/grpc-js をインストールしておきます。

> npm install --save @grpc/proto-loader @grpc/grpc-js

サーバー実装

proto-loader の loadSync 関数 ※ で proto ファイルをロードした結果を grpc-js の loadPackageDefinition で処理する事で型定義などを動的に生成します。

 ※ 非同期版の load 関数も用意されています

addService で gRPC のサービス定義と処理をマッピングし、bindAsync 後に start を呼び出す事でサーバー処理を開始します。

proto ファイルで定義したメッセージ型と同じフィールドを持つ JavaScript オブジェクトを gRPC のリクエストやレスポンスで使う事ができるようです。

Unary RPC の場合は、第二引数の callback へ失敗時の値と成功時の値をそれぞれ渡す事で処理結果を返します。

任意のエラーを返したい場合は、code で gRPC のステータスコードを、details でエラー内容を指定します。

google.protobuf.Empty の箇所は null もしくは undefined で代用できるようです。

Server streaming RPC の場合は、第一引数(下記コードでは call)の write を呼び出す事で処理結果を返す事ができます。

クライアントが途中で切断したりすると cancelled が発生するようになっており、cancelled 発生後に write を呼び出してもエラー等は発生しないようになっていました。

server.js
const protoLoader = require('@grpc/proto-loader')
const grpc = require('@grpc/grpc-js')

const protoFile = './proto/item.proto'
// proto ファイルのロード
const pd = protoLoader.loadSync(protoFile)
// gPRC 用の動的な型定義生成
const proto = grpc.loadPackageDefinition(pd)

let store = []
let subscribeList = []

const findItem = itemId => store.find(i => i.itemId == itemId)

const addItem = (itemId, price) => {
    if (findItem(itemId)) {
        return undefined
    }

    const item = { itemId, price }

    store.push(item)

    return item
}

const removeItem = itemId => {
    const item = findItem(itemId)

    if (item) {
        store = store.filter(i => i.itemId != item.itemId)
    }

    return item
}
// ItemEvent の配信
const publishEvent = event => {
    console.log(`*** publish event: ${JSON.stringify(event)}`)
    subscribeList.forEach(s => s.write(event))
}

const server = new grpc.Server()
// サービス定義と処理のマッピング
server.addService(proto.item.ItemManage.service, {
    AddItem(call, callback) {
        const itemId = call.request.itemId
        const price = call.request.price

        const item = addItem(itemId, price)

        if (item) {
            callback()
            publishEvent({ added: { itemId, price }})
        }
        else {
            const err = { code: grpc.status.ALREADY_EXISTS, details: 'exists item' }
            callback(err)
        }
    },
    RemoveItem(call, callback) {
        const itemId = call.request.itemId

        if (removeItem(itemId)) {
            callback()
            publishEvent({ removed: { itemId }})
        }
        else {
            const err = { code: grpc.status.NOT_FOUND, details: 'item not found' }
            callback(err)
        }
    },
    GetItem(call, callback) {
        const itemId = call.request.itemId
        const item = findItem(itemId)

        if (item) {
            callback(null, item)
        }
        else {
            const err = { code: grpc.status.NOT_FOUND, details: 'item not found' }
            callback(err)
        }
    },
    Subscribe(call) {
        console.log('*** subscribed')
        subscribeList.push(call)
        // クライアント切断時の処理
        call.on('cancelled', () => {
            console.log('*** unsubscribed')
            subscribeList = subscribeList.filter(s => s != call)
        })
    }
})

server.bindAsync(
    '127.0.0.1:50051',
    grpc.ServerCredentials.createInsecure(),
    (err, port) => {
        if (err) {
            console.error(err)
            return
        }

        console.log(`start server: ${port}`)
        // 開始
        server.start()
    }
)

クライアント実装1

まずは、Unary RPC の API のみ(Subscribe 以外)を呼び出すクライアントを実装してみます。

loadPackageDefinition を実施するところまではサーバーと同じです。

Unary RPC はコールバック関数を伴ったメソッドとして用意されますが、このメソッドに Node.js の util.promisify を直接適用すると不都合が生じたため、Promise 化は自前の関数(下記の promisify)で実施するようにしました。

client.js
const protoLoader = require('@grpc/proto-loader')
const grpc = require('@grpc/grpc-js')

const protoFile = './proto/item.proto'

const pd = protoLoader.loadSync(protoFile)
const proto = grpc.loadPackageDefinition(pd)

const id = process.argv[2]

const client = new proto.item.ItemManage(
    '127.0.0.1:50051',
    grpc.credentials.createInsecure()
)
// Unary RPC の Promise 化
const promisify = (obj, methodName) => args => 
    new Promise((resolve, reject) => {
        obj[methodName](args, (err, res) => {
            if (err) {
                reject(err)
            }
            else {
                resolve(res)
            }
        })
    })

const addItem = promisify(client, 'AddItem')
const removeItem = promisify(client, 'RemoveItem')
const getItem = promisify(client, 'GetItem')

const printItem = item => {
    console.log(`id = ${item.itemId}, price = ${item.price}`)
}

const run = async () => {
    await addItem({ itemId: `${id}_item-1`, price: 100 })

    const item1 = await getItem({ itemId: `${id}_item-1` })
    printItem(item1)

    await addItem({ itemId: `${id}_item-2`, price: 20 })

    const item2 = await getItem({ itemId: `${id}_item-2` })
    printItem(item2)

    await addItem({ itemId: `${id}_item-1`, price: 50 })
        .catch(err => console.error(`*** ERROR = ${err.message}`))

    await removeItem({ itemId: `${id}_item-1` })

    await getItem({ itemId: `${id}_item-1` })
        .catch(err => console.error(`*** ERROR = ${err.message}`))

    await removeItem({ itemId: `${id}_item-2` })
}

run().catch(err => console.error(err))

クライアント実装2

次は Server streaming RPC のクライアント実装です。

client_subscribe.js
const protoLoader = require('@grpc/proto-loader')
const grpc = require('@grpc/grpc-js')

const protoFile = './proto/item.proto'

const pd = protoLoader.loadSync(protoFile)
const proto = grpc.loadPackageDefinition(pd)

const client = new proto.item.ItemManage(
    '127.0.0.1:50051',
    grpc.credentials.createInsecure()
)

const stream = client.Subscribe({})
// メッセージ受信時
stream.on('data', event => {
    console.log(`*** received event = ${JSON.stringify(event)}`)
})

// サーバー終了時
stream.on('end', () => console.log('*** stream end'))
stream.on('error', err => console.log(`*** stream error: ${err}`))

動作確認

server.js の実行後、client_subscribe.js を 2つ起動した後に client.js を実行してみます。

Server 実行
> node server.js
start server: 50051
Client2-1 実行
> node client_subscribe.js
Client2-2 実行
> node client_subscribe.js
Client1 実行
> node client.js a1
id = a1_item-1, price = 100
id = a1_item-2, price = 20
*** ERROR = 6 ALREADY_EXISTS: exists item
*** ERROR = 5 NOT_FOUND: item not found

この時点で出力内容は以下のようになりました。

Server 出力内容
> node server.js
start server: 50051
*** subscribed
*** subscribed
*** publish event: {"added":{"itemId":"a1_item-1","price":100}}
*** publish event: {"added":{"itemId":"a1_item-2","price":20}}
*** publish event: {"removed":{"itemId":"a1_item-1"}}
*** publish event: {"removed":{"itemId":"a1_item-2"}}
Client2-1、Client2-2 出力内容
> node client_subscribe.js
*** received event = {"added":{"itemId":"a1_item-1","price":{"low":100,"high":0,"unsigned":true}}}
*** received event = {"added":{"itemId":"a1_item-2","price":{"low":20,"high":0,"unsigned":true}}}
*** received event = {"removed":{"itemId":"a1_item-1"}}
*** received event = {"removed":{"itemId":"a1_item-2"}}

Client2-2 を Ctrl + c で終了後に、Server も Ctrl + c で終了すると以下のようになり、Client2-1 のプロセスは終了しました。

Server 出力内容
> node server.js
start server: 50051
・・・
*** publish event: {"removed":{"itemId":"a1_item-2"}}
*** unsubscribed
^C
Client2-1 出力内容
> node client_subscribe.js
・・・
*** received event = {"removed":{"itemId":"a1_item-2"}}
*** stream error: Error: 13 INTERNAL: Received RST_STREAM with code 2 (Internal server error)
*** stream end

特に問題はなく、正常に動作しているようです。

(b) 静的コード生成(grpc-tools)

grpc-tools を使った静的コード生成を試します。

インストールとコード生成

grpc-tools@grpc/grpc-jsgoogle-protobuf をインストールしておきます。

> npm install --save-dev grpc-tools
・・・
> npm install --save @grpc/grpc-js google-protobuf
・・・

grpc-tools をインストールする事で使えるようになる grpc_tools_node_protoc コマンドで proto ファイルからコードを生成します。

grpc_tools_node_protoc コマンドは内部的に protoc コマンドを grpc_node_plugin プラグインを伴って呼び出すようになっています。

--grpc_out でサービス定義用のファイル xxx_grpc_pb.js が生成され、--js_out でメッセージ定義用のファイルが生成されます。

サービス定義 xxx_grpc_pb.js は --js_out で import_style=commonjs オプションを指定する事を前提としたコードになっています。※

 ※ import_style=commonjs オプションを指定した際に生成される
    xxx_pb.js を参照するようになっている

また、--grpc_out はデフォルトで grpc パッケージ用のコードを生成するため、ここでは grpc_js オプションを指定して @grpc/grpc-js 用のコードを生成するようにしています。

静的コード生成例(grpc_tools_node_protoc コマンド)
> mkdir generated

> grpc_tools_node_protoc --grpc_out=grpc_js:generated --js_out=import_style=commonjs:generated proto/*.proto
・・・

サーバー実装

(a) の場合と処理内容に大きな違いはありませんが、リクエストやレスポンスでは生成された型を使います。

アクセサメソッド(getter、setter)で値の取得や設定ができるようになっており、new 時に配列として全フィールドの値を指定する事もできるようです。 JavaScript オブジェクトへ変換したい場合は toObject メソッドを使用します。

addService でマッピングする際のメソッド名の一文字目が小文字になっています。

proto ファイルで定義したサービス名の後に Service を付けた型(ここでは ItemManageService)がサーバー処理用、Client を付けた型がクライアント処理用の型定義となるようです。

server.js
const grpc = require('@grpc/grpc-js')

const { Item, AddedItem, RemovedItem, ItemEvent } = require('./generated/proto/item_pb')
const { ItemManageService } = require('./generated/proto/item_grpc_pb')
const { Empty } = require('google-protobuf/google/protobuf/empty_pb')

let store = []
let subscribeList = []

const findItem = itemId => store.find(i => i.getItemId() == itemId)

const addItem = (itemId, price) => {
    if (findItem(itemId)) {
        return undefined
    }

    const item = new Item([itemId, price])

    store.push(item)

    return item
}

const removeItem = itemId => {
    const item = findItem(itemId)

    if (item) {
        store = store.filter(i => i.getItemId() != item.getItemId())
    }

    return item
}

const createAddedEvent = (itemId, price) => {
    const event = new ItemEvent()
    event.setAdded(new AddedItem([itemId, price]))

    return event
}

const createRemovedEvent = itemId => {
    const event = new ItemEvent()
    event.setRemoved(new RemovedItem([itemId]))

    return event
}

const publishEvent = event => {
    // toObject で JavaScript オブジェクトへ変換
    console.log(`*** publish event: ${JSON.stringify(event.toObject())}`)
    subscribeList.forEach(s => s.write(event))
}

const server = new grpc.Server()

server.addService(ItemManageService, {
    addItem(call, callback) {
        const itemId = call.request.getItemId()
        const price = call.request.getPrice()

        const item = addItem(itemId, price)

        if (item) {
            callback(null, new Empty())
            publishEvent(createAddedEvent(itemId, price))
        }
        else {
            const err = { code: grpc.status.ALREADY_EXISTS, details: 'exists item' }
            callback(err)
        }
    },
    removeItem(call, callback) {
        const itemId = call.request.getItemId()

        if (removeItem(itemId)) {
            callback(null, new Empty())
            publishEvent(createRemovedEvent(itemId))
        }
        else {
            const err = { code: grpc.status.NOT_FOUND, details: 'item not found' }
            callback(err)
        }
    },
    getItem(call, callback) {
        const itemId = call.request.getItemId()
        const item = findItem(itemId)

        if (item) {
            callback(null, item)
        }
        else {
            const err = { code: grpc.status.NOT_FOUND, details: 'item not found' }
            callback(err)
        }
    },
    subscribe(call) {
        console.log('*** subscribed')
        subscribeList.push(call)

        call.on('cancelled', () => {
            console.log('*** unsubscribed')
            subscribeList = subscribeList.filter(s => s != call)
        })
    }
})

server.bindAsync(
    ・・・
)

クライアント実装1

生成された型を使う点とメソッド名の先頭が小文字になっている点を除くと、基本的に (a) と同じです。

client.js
const grpc = require('@grpc/grpc-js')

const { AddItemRequest, ItemRequest } = require('./generated/proto/item_pb')
const { ItemManageClient } = require('./generated/proto/item_grpc_pb')

const id = process.argv[2]

const client = new ItemManageClient(
    '127.0.0.1:50051',
    grpc.credentials.createInsecure()
)

const promisify = (obj, methodName) => args => 
    new Promise((resolve, reject) => {
        ・・・
    })

const addItem = promisify(client, 'addItem')
const removeItem = promisify(client, 'removeItem')
const getItem = promisify(client, 'getItem')

const printItem = item => {
    console.log(`id = ${item.getItemId()}, price = ${item.getPrice()}`)
}

const run = async () => {
    await addItem(new AddItemRequest([`${id}_item-1`, 100]))

    const item1 = await getItem(new ItemRequest([`${id}_item-1`]))
    printItem(item1)

    await addItem(new AddItemRequest([`${id}_item-2`, 20]))

    const item2 = await getItem(new ItemRequest([`${id}_item-2`]))
    printItem(item2)

    await addItem(new AddItemRequest([`${id}_item-1`, 50]))
        .catch(err => console.error(`*** ERROR = ${err.message}`))

    await removeItem(new ItemRequest([`${id}_item-1`]))

    await getItem(new ItemRequest([`${id}_item-1`]))
        .catch(err => console.error(`*** ERROR = ${err.message}`))

    await removeItem(new ItemRequest([`${id}_item-2`]))
}

run().catch(err => console.error(err))

クライアント実装2

こちらも同様です。

client_subscribe.js
const grpc = require('@grpc/grpc-js')

const { ItemSubscribeRequest } = require('./generated/proto/item_pb')
const { ItemManageClient } = require('./generated/proto/item_grpc_pb')

const client = new ItemManageClient(
    ・・・
)

const stream = client.subscribe(new ItemSubscribeRequest())

stream.on('data', event => {
    // toObject で JavaScript オブジェクトへ変換
    console.log(`*** received event = ${JSON.stringify(event.toObject())}`)
})

・・・

動作確認

(a) と同じ操作を行った結果は以下のようになりました。

Server 出力内容
> node server.js
start server: 50051
*** subscribed
*** subscribed
*** publish event: {"added":{"itemId":"a1_item-1","price":100}}
*** publish event: {"added":{"itemId":"a1_item-2","price":20}}
*** publish event: {"removed":{"itemId":"a1_item-1"}}
*** publish event: {"removed":{"itemId":"a1_item-2"}}
*** unsubscribed
^C
Client1 出力内容
> node client.js a1
id = a1_item-1, price = 100
id = a1_item-2, price = 20
*** ERROR = 6 ALREADY_EXISTS: exists item
*** ERROR = 5 NOT_FOUND: item not found
Client2-1 出力内容
> node client_subscribe.js
*** received event = {"added":{"itemId":"a1_item-1","price":100}}
*** received event = {"added":{"itemId":"a1_item-2","price":20}}
*** received event = {"removed":{"itemId":"a1_item-1"}}
*** received event = {"removed":{"itemId":"a1_item-2"}}
*** stream error: Error: 13 INTERNAL: Received RST_STREAM with code 2 (Internal server error)
*** stream end
Client2-2 出力内容
> node client_subscribe.js
*** received event = {"added":{"itemId":"a1_item-1","price":100}}
*** received event = {"added":{"itemId":"a1_item-2","price":20}}
*** received event = {"removed":{"itemId":"a1_item-1"}}
*** received event = {"removed":{"itemId":"a1_item-2"}}
^C

(a) と (b) は同一の gRPC サービス(proto ファイル)を実装したものなので当然ですが、(a) と (b) を相互接続しても特に問題はありませんでした。

Deno で GraphQL

GraphQL を Deno で試してみました。

https://deno.land/x に Deno 用の GraphQL モジュールがいくつかありましたが(基本的には GraphQL.js のポーティング)、ここでは GraphQL.js を直接使う事にします。

今回のサンプルコードは http://github.com/fits/try_samples/tree/master/blog/20200817/

1. GraphQL.js の使用

GraphQL.js を Deno から使うには、以下のように SkypackPika CDN の後継)という CDN から import するのが簡単そうです。

import { ・・・ } from 'https://cdn.skypack.dev/graphql'

2. 単独実行

GraphQL.js を使って GraphQL の Query・Mutation・Subscription を一通り実行してみます。

まずは、buildSchema 関数に GraphQL の型定義を渡して GraphQLSchema を取得します。

これをクエリや GraphQL 処理の実装と共に graphql 関数や subscribe 関数へ渡す事で処理結果を得ます。

graphql 関数の方は第二引数に文字列を使えましたが(型は Source | string)、subscribe 関数の方は使えなかったので(型は DocumentNode)、こちらは parse した結果を渡すようにしています。

GraphQL 処理の実装(下記サンプルの root 変数の箇所)では、Query・Mutation・Subscription の処理に応じた関数を用意し、型定義に応じた値を返すように実装します。

Subscription の場合は、([Symbol.asyncIterator] プロパティで AsyncIterator を返す)AsyncIterable を戻り値としなければならないようなので、下記では MessageBox クラスとして実装しています。

sample1.js
import {
    graphql, buildSchema, subscribe, parse
} from 'https://cdn.skypack.dev/graphql'

import { v4 } from 'https://deno.land/std/uuid/mod.ts'

// GraphQL 型定義
const schema = buildSchema(`
    enum Category {
        Standard
        Extra
    }

    input CreateItem {
        category: Category!
        value: Int!
    }

    type Item {
        id: ID!
        category: Category!
        value: Int!
    }

    type Mutation {
        create(input: CreateItem!): Item
    }

    type Query {
        find(id: ID!): Item
    }

    type Subscription {
        created: Item
    }
`)

// Subscription 用の AsyncIterable
class MessageBox {
    #messages = []
    #resolves = []

    publish(value) {
        const resolve = this.#resolves.shift()

        if (resolve) {
            resolve({ value })
            // 以下でも可
            // resolve({ value, done: false })
        }
        else {
            this.#messages.push(value)
        }
    }

    [Symbol.asyncIterator]() {
        return {
            next: () => {
                console.log('** asyncIterator next')

                return new Promise(resolve => {
                    const value = this.#messages.shift()

                    if (value) {
                        resolve({ value })
                        // 以下でも可
                        // resolve({ value, done: false })
                    }
                    else {
                        this.#resolves.push(resolve)
                    }
                })
            }
        }
    }
}

const store = {}
const box = new MessageBox()

// GraphQL 処理の実装
const root = {
    create: ({ input: { category, value } }) => {
        console.log(`* call create: category = ${category}, value = ${value}`)

        const id = `item-${v4.generate()}`
        const item = { id, category, value }

        store[id] = item
        box.publish({ created: item })

        return item
    },
    find: ({ id }) => {
        console.log(`* call find: ${id}`)
        return store[id]
    },
    // Subscription の実装では戻り値を AsyncIterable とする
    created: () => box
}

const run = async () => {
    const m1 = `
        mutation {
            create(input: { category: Standard, value: 10 }) {
                id
            }
        }
    `
    // Mutation の実行1
    const mr1 = await graphql(schema, m1, root)
    console.log(mr1)

    const m2 = `
        mutation Create($p: CreateItem!) {
            create(input: $p) {
                id
            }
        }
    `

    const vars = {
        p: {
            category: 'Extra',
            value: 123
        }
    }
    // Mutation の実行2(変数利用)
    const mr2 = await graphql(schema, m2, root, null, vars)
    console.log(mr2)

    const q = `
        {
            find(id: "${mr2.data.create.id}") {
                id
                category
                value
            }
        }
    `
    // Query の実行
    const qr = await graphql(schema, q, root)
    console.log(qr)

    const s = parse(`
        subscription {
            created {
                id
                category
            }
        }
    `)
    // Subscription 処理
    const subsc = await subscribe(schema, s, root)

    for await (const r of subsc) {
        console.log('*** subscribe')
        console.log(r)
    }
}

run().catch(err => console.error(err))

実行結果は以下の通りです。

実行結果
> deno run sample1.js

・・・
* call create: category = Standard, value = 10
{ data: { create: { id: "item-11ca8326-832b-4e13-9b47-d2c70c1e95e9" } } }
* call create: category = Extra, value = 123
{ data: { create: { id: "item-29f1851b-4cbb-4f23-872f-11856f1c0bf7" } } }
* call find: item-29f1851b-4cbb-4f23-872f-11856f1c0bf7
{
  data: {
    find: { id: "item-29f1851b-4cbb-4f23-872f-11856f1c0bf7", category: "Extra", value: 123 }
  }
}
** asyncIterator next
*** subscribe
{
  data: {
    created: { id: "item-11ca8326-832b-4e13-9b47-d2c70c1e95e9", category: "Standard" }
  }
}
** asyncIterator next
*** subscribe
{
  data: {
    created: { id: "item-29f1851b-4cbb-4f23-872f-11856f1c0bf7", category: "Extra" }
  }
}
** asyncIterator next

3. Web サーバー化

Deno の http を使って、上記処理を Web サーバー化してみました。

sample2.js
import {
    graphql, buildSchema, subscribe, parse
} from 'https://cdn.skypack.dev/graphql'

import { v4 } from 'https://deno.land/std/uuid/mod.ts'
import { serve } from 'https://deno.land/std@0.65.0/http/server.ts'

const schema = buildSchema(`
    enum Category {
        Standard
        Extra
    }

    ・・・
`)

class MessageBox {
    ・・・
}

const store = {}
const box = new MessageBox()

const root = {
    create: ({ input: { category, value } }) => {
        const id = `item-${v4.generate()}`
        const item = { id, category, value }

        store[id] = item
        box.publish({ created: item })

        return item
    },
    find: ({ id }) => {
        return store[id]
    },
    created: () => box
}

// Web サーバー処理
const run = async () => {
    const server = serve({ port: 8080 })

    // リクエスト毎の処理
    for await (const req of server) {
        const buf = await Deno.readAll(req.body)
        const query = new TextDecoder().decode(buf)

        console.log(`* query: ${query}`)

        const res = await graphql(schema, query, root)

        req.respond({ body: JSON.stringify(res) })
    }
}

const runSubscribe = async () => {
    const s = parse(`
        subscription {
            created {
                id
                category
                value
            }
        }
    `)

    const subsc = await subscribe(schema, s, root)

    for await (const r of subsc) {
        console.log(`*** subscribe: ${JSON.stringify(r)}`)
    }
}

Promise.all([
    run(),
    runSubscribe()
]).catch(err => console.error(err))

実行

deno run で Web サーバーを起動します。 --allow-net でネットワーク処理を許可する必要があります。

Web サーバー起動
> deno run --allow-net sample2.js

create を実行した結果です。

Mutation の実行
$ curl -s http://localhost:8080 -d 'mutation { create(input: { category: Extra, value: 5 }) { id } }'

{"data":{"create":{"id":"item-7fc5e3bb-286f-48fe-b027-9ca34dcc6451"}}}

create で返された id を指定して find した結果です。

Query の実行1
$ curl -s http://localhost:8080 -d '{ find(id: "item-7fc5e3bb-286f-48fe-b027-9ca34dcc6451") { id category value } }'

{"data":{"find":{"id":"item-7fc5e3bb-286f-48fe-b027-9ca34dcc6451","category":"Extra","value":5}}}

存在しない id を指定して find した結果です。

Query の実行2
$ curl -s http://localhost:8080 -d '{ find(id: "item-invalid") { id category value } }'

{"data":{"find":null}}

サーバー側の出力結果は以下の通りです。

Web サーバー出力結果
> deno run --allow-net sample2.js

・・・
** asyncIterator next
* query: mutation { create(input: { category: Extra, value: 5 }) { id } }
*** subscribe: {"data":{"created":{"id":"item-7fc5e3bb-286f-48fe-b027-9ca34dcc6451","category":"Extra","value":5}}}
** asyncIterator next
* query: { find(id: "item-7fc5e3bb-286f-48fe-b027-9ca34dcc6451") { id category value } }
* query: { find(id: "item-invalid") { id category value } }

rusty_v8 を使って Rust から JavaScript を実行

Node.js の製作者が新たに作り直した Deno という JavaScript/TypeScript 実行環境があります。

Deno の内部では、V8 JavaScript エンジンの呼び出しに rusty_v8 という Rust 用バインディングを使っていたので、今回はこの rusty_v8 を使って Rust コード内で JavaScript コードを実行してみました。

今回のサンプルコードは http://github.com/fits/try_samples/tree/master/blog/20200705/

設定

rusty_v8 を使うための Cargo 用の dependencies 設定は以下のようになります。

Cargo.toml
・・・
[dependencies]
rusty_v8 = "0.6"

JavaScript コード実行

以下の JavaScript コードを実行し結果(1 ~ 5 の合計値)を出力する処理を Rust で実装してみます。

実行する JavaScript コード
const vs = [1, 2, 3, 4, 5]
console.log(vs)
vs.reduce((acc, v) => acc + v, 0)

基本的に、下記 V8 API による手順と同様の処理を rusty_v8 の API で実装すればよさそうです。

V8 エンジンのインスタンスである Isolate(独自のヒープを持ち他のインスタンスとは隔離される)、GC で管理するオブジェクトへの参照をまとめて管理する HandleScopeサンドボックス化された実行コンテキストの Context(組み込みのオブジェクト・関数を管理)をそれぞれ作成していきます。

そして、JavaScript のコードを Script::compileコンパイルして、run で実行します。

run の戻り値は Option<Local<Value>> となっているので、ここでは to_string を使って JavaScript の String(Option<Local<rusty_v8::String>>)として取得し、rusty_v8::String を to_rust_string_lossy を使って Rust の String へ変換して出力しています。

src/sample1.rs
use rusty_v8 as v8;

fn main() {
    let platform = v8::new_default_platform().unwrap();
    v8::V8::initialize_platform(platform);
    v8::V8::initialize();

    let isolate = &mut v8::Isolate::new(Default::default());

    let scope = &mut v8::HandleScope::new(isolate);
    let context = v8::Context::new(scope);
    let scope = &mut v8::ContextScope::new(scope, context);

    // JavaScript コード
    let src = r#"
        const vs = [1, 2, 3, 4, 5]
        console.log(vs)
        vs.reduce((acc, v) => acc + v, 0)
    "#;

    v8::String::new(scope, src)
        .map(|code| {
            println!("code: {}", code.to_rust_string_lossy(scope));
            code
        })
        .and_then(|code| v8::Script::compile(scope, code, None)) //コンパイル
        .and_then(|script| script.run(scope)) //実行
        .and_then(|value| value.to_string(scope)) // rusty_v8::Value を rusty_v8::String へ
        .iter()
        .for_each(|s| println!("result: {}", s.to_rust_string_lossy(scope)));
}

実行

複数の実行ファイルに対応した下記 Cargo.toml を使って実行します。

Cargo.toml
[package]
・・・
default-run = "sample1"

[dependencies]
rusty_v8 = "0.6"

[[bin]]
name = "sample1"
path = "src/sample1.rs"

[[bin]]
name = "sample2"
path = "src/sample2.rs"

[[bin]]
name = "sample3"
path = "src/sample3.rs"

sample1(src/sample1.rs)の実行結果は以下の通りです。

console.log に関しては何も処理されていませんが、JavaScript の実行結果は取得できています。

sample1 実行結果
> cargo run

・・・
     Running `target\debug\sample1.exe`
code:
        const vs = [1, 2, 3, 4, 5]
        console.log(vs)
        vs.reduce((acc, v) => acc + v, 0)

result: 15

Inspector 機能

次に、console.log されたログメッセージを Rust から出力するようにしてみます。

V8 で console.log (デバッグコンソールへのログ出力)のようなデバッグ機能を処理するには Inspector という機能を使うようです。

rusty_v8 では、console.log 等の呼び出し時に V8InspectorClientImpl トレイトの console_api_message が呼びされるようになっているため、これを実装した構造体のインスタンスから V8Inspector を作成して context_created を実行する事で実現できます。

context_created のシグネチャfn context_created(&mut self, context: Local<Context>, context_group_id: i32, human_readable_name: StringView) となっており、context_group_id 引数へ指定した値が、console_api_message の context_group_id 引数の値となります。(human_readable_name の用途に関してはよく分からなかった)

また、console_api_message の level 引数の値はログレベル(V8 API の MessageErrorLevel) のようです。

ちなみに、V8InspectorClientImpl トレイトは basebase_mut の実装が必須でした。(console_api_message は空実装されている)

src/sample2.rs
use rusty_v8 as v8;
use rusty_v8::inspector::*;

struct InspectorClient(V8InspectorClientBase);

impl InspectorClient {
    fn new() -> Self {
        Self(V8InspectorClientBase::new::<Self>())
    }
}

impl V8InspectorClientImpl for InspectorClient {
    fn base(&self) -> &V8InspectorClientBase {
        &self.0
    }

    fn base_mut(&mut self) -> &mut V8InspectorClientBase {
        &mut self.0
    }

    fn console_api_message(&mut self, _context_group_id: i32, 
        _level: i32, message: &StringView, _url: &StringView, 
        _line_number: u32, _column_number: u32, _stack_trace: &mut V8StackTrace) {
        // ログメッセージの出力
        println!("{}", message);
    }
}

fn main() {
    ・・・

    let isolate = &mut v8::Isolate::new(Default::default());

    // V8Inspector の作成
    let mut client = InspectorClient::new();
    let mut inspector = V8Inspector::create(isolate, &mut client);

    let scope = &mut v8::HandleScope::new(isolate);
    let context = v8::Context::new(scope);
    let scope = &mut v8::ContextScope::new(scope, context);

    // context_created の実行
    inspector.context_created(context, 1, StringView::empty());

    let src = r#"
        const vs = [1, 2, 3, 4, 5]
        console.log(vs)
        vs.reduce((acc, v) => acc + v, 0)
    "#;

    ・・・
}

実行結果は以下の通り。

console.log(vs) を処理して 1,2,3,4,5 が出力されるようになりました。

sample2 実行結果
> cargo run --bin sample2

・・・
     Running `target\debug\sample2.exe`
code:
        const vs = [1, 2, 3, 4, 5]
        console.log(vs)
        vs.reduce((acc, v) => acc + v, 0)

1,2,3,4,5
result: 15

最後に、context_group_idlevel の値も出力するようにしてみます。

src/sample3.rs
・・・

impl V8InspectorClientImpl for InspectorClient {
    ・・・

    fn console_api_message(&mut self, context_group_id: i32, 
        level: i32, message: &StringView, _url: &StringView, 
        _line_number: u32, _column_number: u32, _stack_trace: &mut V8StackTrace) {

            println!(
                "*** context_group_id={}, level={}, message={}", 
                context_group_id, 
                level, 
                message
            );
    }
}

fn main() {
    ・・・

    inspector.context_created(context, 123, StringView::empty());

    let src = r#"
        console.log('log')
        console.debug('debug')
        console.info('info')
        console.error('error')
        console.warn('warn')
    "#;

    v8::String::new(scope, src)
        .and_then(|code| v8::Script::compile(scope, code, None))
        .and_then(|script| script.run(scope))
        .and_then(|value| value.to_string(scope))
        .iter()
        .for_each(|s| println!("result: {}", s.to_rust_string_lossy(scope)));
}

実行結果は以下の通りです。

sample3 実行結果
> cargo run --bin sample3

・・・
     Running `target\debug\sample3.exe`
*** context_group_id=123, level=4, message=log
*** context_group_id=123, level=2, message=debug
*** context_group_id=123, level=4, message=info
*** context_group_id=123, level=8, message=error
*** context_group_id=123, level=16, message=warn
result: undefined

Rust で WASI 対応の WebAssembly を作成して実行

Rust で WASI 対応の WebAssembly を作って、スタンドアロン実行や Web ブラウザ上での実行を試してみました。

WASI(WebAssembly System Interface) は WebAssembly のコードを様々なプラットフォームで実行するためのインターフェースで、これに対応した WebAssembly であれば Web ブラウザ外で実行できます。

Rust で WASI 対応の WebAssembly を作るのは簡単で、ビルドターゲットに wasm32-wasi を追加しておいて、rustccargo build によるビルド時に --target wasm32-wasi を指定するだけでした。

wasm32-wasi の追加
> rustup target add wasm32-wasi

標準出力へ文字列を出力するだけの下記サンプルコードを --target wasm32-wasi でビルドすると sample1.wasm ファイルが作られました。

sample1.rs
fn main() {
    for i in 1..=3 {
        println!("count-{}", i);
    }

    print!("aaa");
    print!("bbb");
}
WASI 対応の WebAssembly として sample1.rs をビルド
> rustc --target wasm32-wasi sample1.rs

なお、今回のビルドに使用した Rust のバージョンは以下の通りです。

  • Rust 1.43.0

また、使用したソースコードhttp://github.com/fits/try_samples/tree/master/blog/20200429/ に置いてあります。

(1) スタンドアロン用ランタイムで実行

sample1.wasm を WebAssembly のランタイム wasmtimewasmer でそれぞれ実行してみます。

(1-a) wasmtime で実行

wasmtime v0.15.0 による sample1.wasm 実行結果
> wasmtime sample1.wasm
count-1
count-2
count-3
aaabbb

(1-b) wasmer で実行

wasmer v0.16.2 による sample1.wasm 実行結果
> wasmer sample1.wasm
count-1
count-2
count-3
aaabbb

どちらのランタイムでも問題なく実行できました。

(2) Web ブラウザ上で実行

次は、sample1.wasm を外部ライブラリ等を使わずに Web ブラウザ上で実行してみます。

主要な Web ブラウザや Node.js は JavaScript 用の WebAssembly API に対応済みのため、WebAssembly を実行可能です。

WASI 対応 WebAssembly の場合、実行対象の WebAssembly がインポートしている WASI の関数(の実装)を WebAssembly インスタンス化関数(WebAssembly.instantiate()WebAssembly.instantiateStreaming())の第二引数(引数名 importObject)として渡す必要があるようです。

(2-a) WebAssembly のインポート内容を確認

WebAssembly.compile() 関数で取得した WebAssembly.Module オブジェクトを WebAssembly.Module.imports() 関数へ渡す事で、その WebAssembly がインポートしている内容を取得できます。

ここでは、以下の Node.js スクリプトを使って WebAssembly のインポート内容を確認してみました。

wasm_listup_imports.js (WebAssembly のインポート内容を出力)
const fs = require('fs')

const wasmFile = process.argv[2]

const run = async () => {
    const module = await WebAssembly.compile(fs.readFileSync(wasmFile))

    const imports = WebAssembly.Module.imports(module)

    console.log(imports)
}

run().catch(err => console.error(err))

sample1.wasm へ適用してみると以下のような結果となりました。

インポート内容の出力結果(Node.js v12.16.2 で実行)
> node wasm_listup_imports.js sample1.wasm
[
  {
    module: 'wasi_snapshot_preview1',
    name: 'proc_exit',
    kind: 'function'
  },
  {
    module: 'wasi_snapshot_preview1',
    name: 'fd_write',
    kind: 'function'
  },
  {
    module: 'wasi_snapshot_preview1',
    name: 'fd_prestat_get',
    kind: 'function'
  },
  {
    module: 'wasi_snapshot_preview1',
    name: 'fd_prestat_dir_name',
    kind: 'function'
  },
  {
    module: 'wasi_snapshot_preview1',
    name: 'environ_sizes_get',
    kind: 'function'
  },
  {
    module: 'wasi_snapshot_preview1',
    name: 'environ_get',
    kind: 'function'
  }
]

この結果から、sample1.wasm は以下のようにしてインスタンス化できる事になります。

WebAssembly インスタンス化の例
const importObject = {
    wasi_snapshot_preview1: {
        proc_exit: () => {・・・},
        fd_write: () => {・・・},
        fd_prestat_get: () => {・・・},
        fd_prestat_dir_name: () => {・・・},
        environ_sizes_get: () => {・・・},
        environ_get: () => {・・・}
    }
}

WebAssembly.instantiate(・・・, importObject)
    ・・・

(2-b) fd_write 関数の実装

Rust の println! で呼び出される WASI の関数は fd_write なので、これを実装してみます。

fd_write の引数は 4つで、第一引数 fd は出力先のファイルディスクリプタで標準出力の場合は 1、第二引数 iovs は出力内容へのポインタ、第三引数 iovsLen は出力内容の数、第四引数 nwritten は出力済みのバイト数を設定するポインタとなっています。

なお、ポインタの対象は WebAssembly.instantiate() で取得した WebAssembly のインスタンスに含まれている WebAssembly.Memory です。

出力内容は iovs ポインタの位置から 4バイト毎に以下のような並びで情報が格納されているようなので、これを基に出力対象の文字列を取得して出力する事になります。

  • 1個目の出力内容の格納先ポインタ(4バイト)
  • 1個目の出力内容のバイトサイズ(4バイト)
  • ・・・
  • iovsLen 個目の出力内容の格納先ポインタ(4バイト)
  • iovsLen 個目の出力内容のバイトサイズ(4バイト)

何処まで処理を行ったか(出力したか)を返すために、nwritten ポインタの位置へ出力の完了したバイトサイズを設定します。

fd_write の実装例(wasmInstance には WebAssembly のインスタンスを設定)
・・・
fd_write: (fd, iovs, iovsLen, nwritten) => {
    const memory = wasmInstance.exports.memory.buffer
    const view = new DataView(memory)

    const sizeList = Array.from(Array(iovsLen), (v, i) => {
        const ptr = iovs + i * 8

        // 出力内容の格納先のポインタ取得
        const bufStart = view.getUint32(ptr, true)
        // 出力内容のバイトサイズを取得
        const bufLen = view.getUint32(ptr + 4, true)

        const buf = new Uint8Array(memory, bufStart, bufLen)

        // 出力内容の String 化
        const msg = String.fromCharCode(...buf)

        // 出力
        console.log(msg)

        return buf.byteLength
    })

    // 出力済みのバイトサイズ合計
    const totalSize = sizeList.reduce((acc, v) => acc + v)

    // 出力済みのバイトサイズを設定
    view.setUint32(nwritten, totalSize, true)

    return 0
},
・・・

最終的な HTML は下記のようになりました。

fd_write 以外の WASI 関数を空実装にして main 関数を呼び出して実行するようにしていますが、WASI の仕様としては _start 関数を呼び出すのが正しいようです ※。(WASI Application ABI 参照)

 ※ _start 関数を使う場合、fd_prestat_get 等の実装も必要となります

WebAssembly がインポートしている WASI 関数の実装をインスタンス化時(WebAssembly.instantiateStreaming)に渡す事になりますが、WASI の関数(fd_write 等)はインスタンス化の結果を使って処理する点に注意が必要です。

index.html(main 関数版)
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8" />
</head>
<body>
  <h1>WASI WebAssembly Sample</h1>
  <div id="res"></div>

  <script>
    const WASM_URL = './sample1.wasm'

    const wasiObj = {
      wasmInstance: null,
      importObject: {
        wasi_snapshot_preview1: {
          fd_write: (fd, iovs, iovsLen, nwritten) => {
            console.log(`*** call fd_write: fd=${fd}, iovs=${iovs}, iovsLen=${iovsLen}, nwritten=${nwritten}`)

            const memory = wasiObj.wasmInstance.exports.memory.buffer
            const view = new DataView(memory)

            const sizeList = Array.from(Array(iovsLen), (v, i) => {
              const ptr = iovs + i * 8

              const bufStart = view.getUint32(ptr, true)
              const bufLen = view.getUint32(ptr + 4, true)

              const buf = new Uint8Array(memory, bufStart, bufLen)

              const msg = String.fromCharCode(...buf)

              // 出力
              console.log(msg)
              document.getElementById('res').innerHTML += `<p>${msg}</p>`

              return buf.byteLength
            })

            const totalSize = sizeList.reduce((acc, v) => acc + v)

            view.setUint32(nwritten, totalSize, true)

            return 0
          },
          proc_exit: () => {},
          fd_prestat_get: () => {},
          fd_prestat_dir_name: () => {},
          environ_sizes_get: () => {},
          environ_get: () => {}
        }
      }
    }

    WebAssembly.instantiateStreaming(fetch(WASM_URL), wasiObj.importObject)
      .then(res => {
        console.log(res)

        // fd_write で参照できるようにインスタンスを wasmInstance へ設定
        wasiObj.wasmInstance = res.instance

        // main 関数の実行
        wasiObj.wasmInstance.exports.main()
      })
      .catch(err => console.error(err))
  </script>
</body>
</html>

main 関数の代わりに _start 関数を呼び出す場合は下記のようになりました。

_start 関数版の場合、fd_prestat_get の実装が重要となります ※。

 ※ fd_prestat_get を正しく実装していないと、
    fd_prestat_get の呼び出しが延々と繰り返されてしまいました

今回はファイル等を使っていないので(file descriptor 3 以降を開いていない)、fd_prestat_get は単に 8(WASI_EBADF, Bad file descriptor)を返すだけで良さそうです。

index2.html(_start 関数版)
・・・
  <script>
    const WASM_URL = './sample1.wasm'

    const wasiObj = {
      wasmInstance: null,
      importObject: {
        wasi_snapshot_preview1: {
          ・・・
          fd_prestat_get: () => 8,
          ・・・
        }
      }
    }

    WebAssembly.instantiateStreaming(fetch(WASM_URL), wasiObj.importObject)
      .then(res => {
        console.log(res)

        wasiObj.wasmInstance = res.instance
        // _start 関数の実行
        wasiObj.wasmInstance.exports._start()
      })
      .catch(err => console.error(err))
  </script>
・・・

(2-c) 実行

上記の .html ファイルを Web ブラウザで直接開いても WebAssembly を実行できないため、HTTP サーバーを使う事になります。

更に、Web ブラウザ上で WebAssembly を実行するには、.wasm ファイルを MIME Type application/wasm で取得する必要があるようです。

Python の http.server は application/wasm に対応していたため(Python 3.8.2 と 3.7.6 で確認)、以下のスクリプトで HTTP サーバーを立ち上げる事にしました。

web_server.py
import http.server
import socketserver

PORT = 8080

Handler = http.server.SimpleHTTPRequestHandler

with socketserver.TCPServer(("", PORT), Handler) as httpd:
    print(f"start server port:{PORT}")
    httpd.serve_forever()
HTTP サーバー起動(Python 3.8.2 で実行)
> python web_server.py
start server port:8080

Web ブラウザ(Chrome)で http://localhost:8080/index.html へアクセスしたところ(index2.html でも同様)、sample1.wasm の実行を確認できました。

Chrome の実行結果

f:id:fits:20200429200738p:plain

(3) Node.js で組み込み実行

次は、Node.js で WebAssembly を組み込み実行してみます。

(3-a) fd_write 実装

上記 index2.html の処理をベースにローカルの .wasm ファイルを読み込んで実行するようにしました。

sample1.wasm のインポート内容に合わせたものなので、インポート内容の異なる WebAssembly の実行には使えません。

wasm_run_sample.js
const fs = require('fs')

const WASI_ESUCCESS = 0;
const WASI_EBADF = 8; // Bad file descriptor

const wasmFile = process.argv[2]

const wasiObj = {
    wasmInstance: null,
    importObject: {
        wasi_snapshot_preview1: {
            fd_write: (fd, iovs, iovsLen, nwritten) => {
                ・・・
                
                const sizeList = Array.from(Array(iovsLen), (v, i) => {
                    ・・・
                    
                    process.stdout.write(msg)
                    
                    return buf.byteLength
                })
                
                ・・・
                
                return WASI_ESUCCESS
            },
            ・・・
            fd_prestat_get: (fd, bufPtr) => { 
                console.log(`*** call fd_prestat_get: fd=${fd}, bufPtr=${bufPtr}`)
                return WASI_EBADF
            },
            ・・・
        }
    }
}

const buf = fs.readFileSync(wasmFile)

WebAssembly.instantiate(buf, wasiObj.importObject)
    .then(res => {
        wasiObj.wasmInstance = res.instance
        wasiObj.wasmInstance.exports._start()
    })
    .catch(err => console.error(err))
実行結果(Node.js v12.16.2 で実行)
> node wasm_run_sample.js sample1.wasm
*** call fd_prestat_get : fd=3, bufPtr=1048568
*** call fd_write: fd=1, iovs=1047968, iovsLen=1, nwritten=1047948
count-1
*** call fd_write: fd=1, iovs=1047968, iovsLen=1, nwritten=1047948
count-2
*** call fd_write: fd=1, iovs=1047968, iovsLen=1, nwritten=1047948
count-3
*** call fd_write: fd=1, iovs=1048432, iovsLen=1, nwritten=1048412
aaabbb

(3-b) Wasmer-JS 使用

Wasmer-JS@wasmer/wasi モジュールを使って、もっと汎用的に組み込み実行できるようにしてみます。

@wasmer/wasi インストール例
> npm install @wasmer/wasi

@wasmer/wasi の WASI を使う事で、インポート内容に合わせた WASI 関数の取得や _start 関数の呼び出しを任せる事ができます。

run_wasmer_js/index.js
const fs = require('fs')
const { WASI } = require('@wasmer/wasi')

const wasmFile = process.argv[2]

const wasi = new WASI()

const run = async () => {
    const module = await WebAssembly.compile(fs.readFileSync(wasmFile))
    // インポート内容に合わせた WASI 関数の実装を取得
    const importObject = wasi.getImports(module)

    const instance = await WebAssembly.instantiate(module, importObject)

    // 実行
    wasi.start(instance)
}

run().catch(err => console.error(err))
実行結果(Node.js v12.16.2 で実行)
> node index.js ../sample1.wasm
count-1
count-2
count-3
aaabbb

(4) 標準出力以外の機能

最後に、現時点でどんな機能を使えるのか気になったので、いくつか試してみました。

まず、TcpStream を使ったコードの wasm32-wasi ビルドは一応成功しました。

sample2.rs
use std::net::TcpStream;

fn main() {
    let res = TcpStream::connect("127.0.0.1:8080");
    println!("{:?}", res);
}
sample2.rs ビルド
> rustc --target wasm32-wasi sample2.rs

ただし、実行してみると以下のような結果となりました。(wasmtime 以外で実行しても同じ)

wasmtime による sample2.wasm 実行結果
> wasmtime sample2.wasm

Err(Custom { kind: Other, error: "operation not supported on wasm yet" })

Rust のソースコードで該当(すると思われる)箇所を確認してみると、unsupported() を返すようになっていました。

https://github.com/rust-lang/rust/blob/master/src/libstd/sys/wasi/net.rs(2020/4/26 時点)
・・・
impl TcpStream {
    pub fn connect(_: io::Result<&SocketAddr>) -> io::Result<TcpStream> {
        unsupported()
    }
    ・・・
}
・・・

https://github.com/rust-lang/rust/blob/master/src/libstd/sys/wasi/ のソースを確認してみると、(スレッド系の処理等)他にも未対応の機能がいくつもありました。

一方で、環境変数・システム時間・スリープ処理は使えそうだったので、以下のコードで確認してみました。

sample3.rs
use std::env;
use std::thread::sleep;
use std::time::{ Duration, SystemTime };

fn main() {
    // 環境変数 SLEEP_TIME からスリープする秒数を取得(デフォルトは 1)
    let sleep_sec = env::var("SLEEP_TIME").ok()
                         .and_then(|v| v.parse::<u64>().ok())
                         .unwrap_or(1);

    // システム時間の取得
    let time = SystemTime::now();

    println!("start: sleep {}s", sleep_sec);

    // スリープの実施
    sleep(Duration::from_secs(sleep_sec));

    // 経過時間の出力
    match time.elapsed() {
        Ok(s) => println!("end: elapsed {}s", s.as_secs()),
        Err(e) => println!("error: {:?}", e),
    }
}
sample3.rs のビルド
> rustc --target wasm32-wasi sample3.rs

wasmtime では正常に実行できましたが、wasmer は今のところスリープ処理に対応していないようでエラーとなりました。

ちなみに、環境変数はどちらのコマンドも --env で指定できました。

wasmtime v0.15.0 による sample3.wasm 実行結果
> wasmtime --env SLEEP_TIME=5 sample3.wasm
start: sleep 5s
end: elapsed 5s
wasmer v0.16.2 による sample3.wasm 実行結果
> wasmer sample3.wasm --env SLEEP_TIME=5
start: sleep 5s
thread 'main' panicked at 'not yet implemented: Polling not implemented for clocks yet', lib\wasi\src\syscalls\mod.rs:2373:21
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace.
Error: error: "unhandled trap at 7fffd474a799 - code #e06d7363: unknown exception code"

Node.js のマイクロサービスフレームワーク Moleculer

Moleculer という名のなかなか期待できそうな Node.js 用マイクロサービスフレームワークを見つけたのでご紹介します。

ロードバランサー、サーキットブレーカー、メトリクス収集、トレーシング、キャッシュ機能等、多彩な機能を備えていますが。

個人的に、イベント駆動アーキテクチャを採用し Kafka や NATS 等のメッセージブローカーと容易に連携できる事から、コレオグラフィ型のマイクロサービスを手軽に作れそうな点に注目しています。

なんとなく Akka と似ている気もしますが、Akka を使ったマイクロサービスフレームワークLagom などと比べても、簡単かつ手軽に使えるのが利点かと思います。

今回のソースは http://github.com/fits/try_samples/tree/master/blog/20200224/

(1) ローカルアプリケーション

とりあえず簡単なローカルアプリケーションを作成してみます。

以下のモジュールを使うのでインストールしておきます。

  • moleculer 0.14.2
moleculer インストール例
> npm install --save moleculer

Moleculer では、メソッド・アクション・イベントハンドリング等を定義した ServiceServiceBroker へ登録する事で処理を組み立てます。

ServiceBroker はノード毎にインスタンスを作る事になり、アクションの呼び出しやイベントの通知、別ノードとの連携等を担います。

アクションは call('<サービス名>.<アクション名>', <パラメータ>) で呼び出す事ができ、アクションで return した値が call の戻り値となります。

イベントの発火(通知)は emit('<イベント名>', <イベント内容>)broadcast('<イベント名>', <イベント内容>) ※ で行います。

 ※ emit と broadcast の違いは、
    broadcast は全ノードの全サービスへ通知されますが、
    emit は同じサービスを複数のノードで実行していると
    その中の 1つのノードへ通知されます
order1.js
const { ServiceBroker } = require('moleculer')

const broker = new ServiceBroker()

broker.createService({
    name: 'order-service',
    actions: {
        order(ctx) {
            console.log(`# order-service.order: ${JSON.stringify(ctx.params)}`)
            // order.ordered イベントの発火
            ctx.emit('order.ordered', ctx.params)

            return ctx.params.id
        }
    }
})

broker.createService({
    name: 'order-check-service',
    events: {
        // order.ordered イベントのハンドリング
        'order.ordered'(ctx) {
            console.log(`## order-check-service ${ctx.eventName}: ${JSON.stringify(ctx.params)}`)
        }
    }
})

const run = async () => {
    await broker.start()

    const order1 = {id: 'order1', item: 'item1', qty: 2}
    // order アクションの呼び出し
    const res1 = await broker.call('order-service.order', order1)

    console.log(`order result: ${res1}`)
}

run().catch(err => console.error(err))
実行結果
> node order1.js

[2020-02-18T14:08:35.336Z] INFO  ・・・: Moleculer v0.14.2 is starting...
・・・
# order-service.order: {"id":"order1","item":"item1","qty":2}
## order-check-service order.ordered: {"id":"order1","item":"item1","qty":2}
order result: order1
・・・
[2020-02-18T14:08:35.440Z] INFO  ・・・: ServiceBroker is stopped. Good bye.

上記の処理へサービスを 1つ追加してみます。

イベントのハンドリングではワイルドカード * も使えるようになっていますので使ってみます。

order2.js
const { ServiceBroker } = require('moleculer')

const broker = new ServiceBroker()

broker.createService({
    name: 'order-service',
    actions: {
        order(ctx) {
            console.log(`# order-service.order: ${JSON.stringify(ctx.params)}`)
            ctx.emit('order.ordered', ctx.params)
            return ctx.params.id
        }
    },
    events: {
        // order.validated と order.invalidated イベントをハンドリング
        'order.*validated'(ctx) {
            console.log(`# order-service ${ctx.eventName}: ${JSON.stringify(ctx.params)}`)
        }
    }
})

broker.createService({
    name: 'order-check-service',
    events: {
        async 'order.ordered'(ctx) {
            console.log(`## order-check-service ${ctx.eventName}: ${JSON.stringify(ctx.params)}`)

            // item-service の check アクション呼び出し
            const isValid = await ctx.call('item-service.check', ctx.params)

            if (isValid) {
                ctx.emit('order.validated', ctx.params)
            }
            else {
                ctx.emit('order.invalidated', ctx.params)
            }
        }
    }
})

broker.createService({
    name: 'item-service',
    actions: {
        check(ctx) {
            console.log(`### item-service.check: ${JSON.stringify(ctx.params)}`)
            return ['item1', 'item2'].includes(ctx.params.item)
        }
    }
})

const run = async () => {
    await broker.start()

    const order1 = {id: 'order1', item: 'item1', qty: 2}
    const order2 = {id: 'order2', item: 'item3', qty: 1}

    const res1 = await broker.call('order-service.order', order1)

    console.log(`order result: ${res1}`)

    const res2 = await broker.call('order-service.order', order2)

    console.log(`order result: ${res2}`)
}

run().catch(err => console.error(err))
実行結果
> node order2.js

・・・
# order-service.order: {"id":"order1","item":"item1","qty":2}
## order-check-service order.ordered: {"id":"order1","item":"item1","qty":2}
### item-service.check: {"id":"order1","item":"item1","qty":2}
order result: order1
# order-service.order: {"id":"order2","item":"item3","qty":1}
## order-check-service order.ordered: {"id":"order2","item":"item3","qty":1}
### item-service.check: {"id":"order2","item":"item3","qty":1}
# order-service order.validated: {"id":"order1","item":"item1","qty":2}
order result: order2
# order-service order.invalidated: {"id":"order2","item":"item3","qty":1}
・・・

(2) メッセージブローカー(NATS Streaming)利用

次に、別ノードのサービスとメッセージブローカー経由で連携してみます。

処理内容としては Web API を通して受け取ったメッセージを別ノードのサービスに call・emit・broadcast します。

ここでは、メッセージブローカーとして Go言語で実装されており手軽に実行できる NATS Streaming を使用します。

まずは、以下のモジュールを使うのでインストールしておきます。

  • moleculer 0.14.2
  • moleculer-web 0.9.0
  • node-nats-streaming 0.2.6
モジュールインストール例
> npm install --save moleculer moleculer-web node-nats-streaming

Web API としてリクエストを受け付ける処理を実装します。

moleculer-web を require した結果を mixins して routes で HTTP リクエストとそれに応じて実行するアクションをマッピングします。

NATS Streaming を使用するため transporterSTAN を設定します。

nodeID はデフォルトでホスト名とプロセスIDから生成されるようになっていますが、他のノードとの接続に使うので、ここでは明示的に設定しています。

pub.js
const { ServiceBroker } = require('moleculer')

const HTTPServer = require('moleculer-web')

const broker = new ServiceBroker({
    nodeID: 'pub',
    transporter: 'STAN'
})

broker.createService({
    name: 'web',
    mixins: [HTTPServer],
    settings: {
        routes: [
            { aliases: {
                'PUT /pub': 'pub.publish'
            }}
        ]
    }
})

broker.createService({
    name: 'pub',
    actions: {
        async publish(ctx) {
            const params = ctx.params

            // アクション呼び出し
            const res = await ctx.call('sub.direct', params)

            // イベントの emit
            ctx.emit('event.emit', params)
            // イベントの broadcast
            ctx.broadcast('event.broadcast', params)

            return res
        }
    }
})

broker.start().catch(err => console.error(err))

次に、NATS Streaming 経由でアクションの呼び出しやイベント通知を受け取る処理を実装します。

こちらは複数プロセスで実行できるように nodeID をコマンド引数として渡すようにしています。

sub.js
const { ServiceBroker } = require('moleculer')

const id = process.argv[2]

const broker = new ServiceBroker({
    nodeID: id,
    transporter: 'STAN'
})

broker.createService({
    name: 'sub',
    actions: {
        direct(ctx) {
            console.log(`* sub.direct: nodeID=${broker.nodeID}, params=${JSON.stringify(ctx.params)}`)
            return broker.nodeID
        }
    },
    events: {
        'event.*'(ctx) {
            console.log(`* ${ctx.eventName}: nodeID=${broker.nodeID}, params=${JSON.stringify(ctx.params)}`)
        }
    }
})

broker.start().catch(err => console.error(err))

動作確認

まずは NATS Streaming を実行しておきます。

NATS Streaming 実行
> nats-streaming-server

・・・
[2668] 2020/02/23 19:27:33.566759 [[32mINF[0m] Server is ready
[2668] 2020/02/23 19:27:33.591774 [[32mINF[0m] STREAM: Recovering the state...
[2668] 2020/02/23 19:27:33.592779 [[32mINF[0m] STREAM: No recovered state
[2668] 2020/02/23 19:27:33.843799 [[32mINF[0m] STREAM: Message store is MEMORY
[2668] 2020/02/23 19:27:33.844764 [[32mINF[0m] STREAM: ---------- Store Limits ----------
[2668] 2020/02/23 19:27:33.846740 [[32mINF[0m] STREAM: Channels:                  100 *
[2668] 2020/02/23 19:27:33.848741 [[32mINF[0m] STREAM: --------- Channels Limits --------
[2668] 2020/02/23 19:27:33.849804 [[32mINF[0m] STREAM:   Subscriptions:          1000 *
[2668] 2020/02/23 19:27:33.850745 [[32mINF[0m] STREAM:   Messages     :       1000000 *
[2668] 2020/02/23 19:27:33.852756 [[32mINF[0m] STREAM:   Bytes        :     976.56 MB *
[2668] 2020/02/23 19:27:33.853740 [[32mINF[0m] STREAM:   Age          :     unlimited *
[2668] 2020/02/23 19:27:33.854745 [[32mINF[0m] STREAM:   Inactivity   :     unlimited *
[2668] 2020/02/23 19:27:33.857744 [[32mINF[0m] STREAM: ----------------------------------
[2668] 2020/02/23 19:27:33.858745 [[32mINF[0m] STREAM: Streaming Server is ready

pub.js を起動し、2つ sub.js を実行しておきます。

これによって、NATS Streaming へ MOL.DISCOVER.<nodeID>MOL.EVENT.<nodeID>MOL.REQ.<nodeID> 等のような様々なチャネルが登録され、ノード間の接続が確立します。

pub.js 実行
> node pub.js

・・・
[2020-02-23T10:29:58.028Z] INFO  pub/REGISTRY: Strategy: RoundRobinStrategy
・・・
[2020-02-23T10:29:58.102Z] INFO  pub/WEB: Register route to '/'
[2020-02-23T10:29:58.158Z] INFO  pub/WEB:      PUT /pub => pub.publish
・・・
[2020-02-23T10:30:29.936Z] INFO  pub/REGISTRY: Node 'sub1' connected.
[2020-02-23T10:30:36.918Z] INFO  pub/REGISTRY: Node 'sub2' connected.
sub.js 実行1(sub1)
> node sub.js sub1

・・・
[2020-02-23T10:30:34.026Z] INFO  sub1/REGISTRY: Node 'pub' connected.
[2020-02-23T10:30:36.919Z] INFO  sub1/REGISTRY: Node 'sub2' connected.
sub.js 実行2(sub2)
> node sub.js sub2

・・・
[2020-02-23T10:30:39.027Z] INFO  sub2/REGISTRY: Node 'pub' connected.
[2020-02-23T10:30:40.111Z] INFO  sub2/REGISTRY: Node 'sub1' connected.

この状態で 2回 HTTP リクエストを実施してみます。

HTTP リクエスト 1回目
$ curl -s -XPUT -H "Content-Type: application/json" http://localhost:3000/pub -d "{\"no\": 1}"
"sub1"
HTTP リクエスト 2回目
$ curl -s -XPUT -H "Content-Type: application/json" http://localhost:3000/pub -d "{\"no\": 2}"
"sub2"

1回目の call と emit は sub1 側で、2回目は sub2 側で実行しておりロードバランスされています(デフォルトではラウンドロビンとなる)

broadcast は sub1 と sub2 のどちらにも通知されています。

sub.js sub1 の結果
・・・
* sub.direct: nodeID=sub1, params={"no":1}
* event.emit: nodeID=sub1, params={"no":1}
* event.broadcast: nodeID=sub1, params={"no":1}
* event.broadcast: nodeID=sub1, params={"no":2}
sub.js sub2 の結果
・・・
* event.broadcast: nodeID=sub2, params={"no":1}
* sub.direct: nodeID=sub2, params={"no":2}
* event.emit: nodeID=sub2, params={"no":2}
* event.broadcast: nodeID=sub2, params={"no":2}

(3) MongoDB への CRUDREST API

最後に、MongoDB へデータを CRUD する処理を REST API 化してみます。

以下のモジュールを使うのでインストールしておきます。

  • moleculer 0.14.2
  • moleculer-web 0.9.0
  • node-nats-streaming 0.2.6
  • moleculer-db 0.8.5
  • moleculer-db-adapter-mongo 0.4.7
モジュールインストール例
> npm install --save moleculer moleculer-web node-nats-streaming moleculer-db moleculer-db-adapter-mongo

これまでは createService でサービスを定義していましたが、ここでは別ファイルでサービスを定義する方法を試してみます。

ServiceBroker の loadServices を使うと、デフォルトで services/xxx.service.js (xxx は任意) ファイルをロードして登録してくれます。

server.js
const { ServiceBroker } = require('moleculer')

const id = process.argv[2]

const broker = new ServiceBroker({
    nodeID: id,
    transporter: 'STAN'
})

broker.loadServices()

broker.start().catch(err => console.error(err))

REST API のサービスを実装します。

aliases で REST とするとアクション側で "POST /" のような HTTP メソッドとパスを(rest 項目で)設定する事になります。

services/api.service.js
const HTTPServer = require('moleculer-web')

module.exports = {
    name: 'api',
    mixins: [HTTPServer],
    settings: {
        routes: [
            { aliases: { 'REST items': 'item' } }
        ]
    }
}

moleculer-db がデフォルトで REST API に対応した CRUD 処理をアクションとして定義してくれているので、今回はこれをそのまま使う事にします。

ここでは entityCreated 等を使って更新時のイベント通知のみを実装しています。

なお、MongoDB の接続文字列やコレクション名は環境変数で設定するようにしています。

services/item.service.js
const DbService = require('moleculer-db')
const MongoDBAdapter = require('moleculer-db-adapter-mongo')

const mongoUri = process.env['MONGO_URI']
const colName = process.env['MONGO_COLLECTION_NAME']

module.exports = {
    name: 'item',
    mixins: [DbService],
    adapter: new MongoDBAdapter(mongoUri, {
         useUnifiedTopology: true
    }),
    collection: colName,
    entityCreated(entity, ctx) {
        console.log(`entityCreated: ${JSON.stringify(entity)}`)
        ctx.emit('item.created', entity)
    },
    entityUpdated(entity, ctx) {
        console.log(`entityUpdated: ${JSON.stringify(entity)}`)
        ctx.emit('item.updated', entity)
    },
    entityRemoved(entity, ctx) {
        console.log(`entityRemoved: ${JSON.stringify(entity)}`)
        ctx.emit('item.removed', entity)
    }
}

動作確認用にイベントをハンドリングする処理も用意しました。

handler.js
const { ServiceBroker } = require('moleculer')

const id = process.argv[2]

const broker = new ServiceBroker({
    nodeID: id,
    transporter: 'STAN'
})

broker.createService({
    name: 'handler',
    events: {
        'item.*'(ctx) {
            console.log(`* ${ctx.eventName}: nodeID=${broker.nodeID}, params=${JSON.stringify(ctx.params)}`)
        }
    }
})

broker.start().catch(err => console.error(err))

動作確認

MongoDB と NATS Streaming を起動しておきます。

MongoDB 実行
> mongod --dbpath itemdb
NATS Streaming 実行
> nats-streaming-server

接続先の MongoDB やコレクション名を環境変数で設定してから、server.js を実行します。

server.js 実行
> set MONGO_URI=mongodb://localhost/sample
> set MONGO_COLLECTION_NAME=items

> node server.js server1

・・・
[2020-02-23T11:41:06.978Z] INFO  server1/API: Register route to '/'
[2020-02-23T11:41:07.031Z] INFO  server1/API:      GET /items => item.list
[2020-02-23T11:41:07.034Z] INFO  server1/API:      GET /items/:id => item.get
[2020-02-23T11:41:07.035Z] INFO  server1/API:     POST /items => item.create
[2020-02-23T11:41:07.036Z] INFO  server1/API:      PUT /items/:id => item.update
[2020-02-23T11:41:07.037Z] INFO  server1/API:    PATCH /items/:id => item.patch
[2020-02-23T11:41:07.038Z] INFO  server1/API:   DELETE /items/:id => item.remove
・・・
[2020-02-23T11:41:47.389Z] INFO  server1/REGISTRY: Node 'handler1' connected.

イベントハンドラも別途実行しておきます。

handler.js 実行
> node handler.js handler1

・・・
[2020-02-23T11:41:48.322Z] INFO  handler1/REGISTRY: Node 'server1' connected.

準備が整ったので REST API を呼び出して CRUD 処理を順番に呼び出してみます。

create 実施
$ curl -s -XPOST -H "Content-Type: application/json" http://localhost:3000/items -d "{\"name\": \"item1\", \"qty\": 1}"

{"name":"item1","qty":1,"_id":"5e5266a735ebff2c9083a0af"}
read 実施
$ curl -s http://localhost:3000/items/5e5266a735ebff2c9083a0af

{"_id":"5e5266a735ebff2c9083a0af","name":"item1","qty":1}
update 実施
$ curl -s -XPUT -H "Content-Type: application/json" http://localhost:3000/items/5e5266a735ebff2c9083a0af -d "{\"qty\": 2}"

{"_id":"5e5266a735ebff2c9083a0af","name":"item1","qty":2}
delete 実施
$ curl -s -XDELETE http://localhost:3000/items/5e5266a735ebff2c9083a0af

{"_id":"5e5266a735ebff2c9083a0af","name":"item1","qty":2}
read 実施
$ curl -s http://localhost:3000/items/5e5266a735ebff2c9083a0af

{
  "name": "EntityNotFoundError",
  "message": "Entity not found",
  "code": 404,
  "type": null,
  "data": {
    "id": "5e5266a735ebff2c9083a0af"
  }
}

問題なく動作しているようです。 handler.js の出力結果も以下のようになりました。

handler.js の出力結果
・・・
* item.created: nodeID=handler1, params={"name":"item1","qty":1,"_id":"5e5266a735ebff2c9083a0af"}
* item.updated: nodeID=handler1, params={"_id":"5e5266a735ebff2c9083a0af","name":"item1","qty":2}
* item.removed: nodeID=handler1, params={"_id":"5e5266a735ebff2c9083a0af","name":"item1","qty":2}