辞書ベースの日本語 Tokenizer - Kuromoji, Sudachi, Fugashi, Kagome, Lindera
辞書をベースに処理する日本語 Tokenizer のいくつかをコードを書いて実行してみました。
- (a) Lucene Kuromoji
- (b) atilika Kuromoji
- (c) Sudachi
- (d) Kuromoji.js
- (e) Fugashi
- (f) Kagome
- (g) Lindera
今回は以下の文を処理して分割された単語と品詞を出力します。
処理対象文
WebAssemblyがサーバーレス分野へ大きな影響を与えるだろうと答えた回答者は全体の56%だった。
システム辞書だけを使用し、分割モードを指定する場合は固有名詞などをそのままにする(細かく分割しない)モードを選ぶ事にします。
ソースコードは https://github.com/fits/try_samples/tree/master/blog/20220106/
(a) Lucene Kuromoji
Lucene に組み込まれた Kuromoji で Elasticsearch や Solr で使われます。
kuromoji.gradle を見ると、システム辞書は以下のどちらかを使うようになっているようです。
a1
lucene-analyzers-kuromoji の JapaneseTokenizer を使います。 辞書は IPADIC のようです。
lucene/a1.groovy
@Grab('org.apache.lucene:lucene-analyzers-kuromoji:8.11.1') import org.apache.lucene.analysis.ja.JapaneseTokenizer; import org.apache.lucene.analysis.ja.JapaneseTokenizer.Mode import org.apache.lucene.analysis.tokenattributes.CharTermAttribute import org.apache.lucene.analysis.ja.tokenattributes.PartOfSpeechAttribute def text = args[0] new JapaneseTokenizer(null, false, Mode.NORMAL).withCloseable { tokenizer -> def term = tokenizer.addAttribute(CharTermAttribute) def pos = tokenizer.addAttribute(PartOfSpeechAttribute) tokenizer.reader = new StringReader(text) tokenizer.reset() while(tokenizer.incrementToken()) { println "term=${term}, partOfSpeech=${pos.partOfSpeech}" } }
a1 結果
> groovy a1.groovy "WebAssemblyがサーバーレス分野へ大きな影響を与えるだろうと答えた回答者は全体の56%だった。" term=WebAssembly, partOfSpeech=名詞-固有名詞-組織 term=が, partOfSpeech=助詞-格助詞-一般 term=サーバー, partOfSpeech=名詞-一般 term=レス, partOfSpeech=名詞-サ変接続 term=分野, partOfSpeech=名詞-一般 term=へ, partOfSpeech=助詞-格助詞-一般 term=大きな, partOfSpeech=連体詞 term=影響, partOfSpeech=名詞-サ変接続 term=を, partOfSpeech=助詞-格助詞-一般 term=与える, partOfSpeech=動詞-自立 term=だろ, partOfSpeech=助動詞 term=う, partOfSpeech=助動詞 term=と, partOfSpeech=助詞-格助詞-引用 term=答え, partOfSpeech=動詞-自立 term=た, partOfSpeech=助動詞 term=回答, partOfSpeech=名詞-サ変接続 term=者, partOfSpeech=名詞-接尾-一般 term=は, partOfSpeech=助詞-係助詞 term=全体, partOfSpeech=名詞-副詞可能 term=の, partOfSpeech=助詞-連体化 term=5, partOfSpeech=名詞-数 term=6, partOfSpeech=名詞-数 term=%, partOfSpeech=名詞-接尾-助数詞 term=だっ, partOfSpeech=助動詞 term=た, partOfSpeech=助動詞 term=。, partOfSpeech=記号-句点
a2
org.codelibs が上記の ipadic-neologd 版を提供していたので、ついでに試してみました。
処理内容はそのままで、モジュールとパッケージ名を変えるだけです。
lucene/a2.groovy
@GrabResolver('https://maven.codelibs.org/') @Grab('org.codelibs:lucene-analyzers-kuromoji-ipadic-neologd:8.2.0-20200120') import org.apache.lucene.analysis.tokenattributes.CharTermAttribute import org.codelibs.neologd.ipadic.lucene.analysis.ja.JapaneseTokenizer import org.codelibs.neologd.ipadic.lucene.analysis.ja.JapaneseTokenizer.Mode import org.codelibs.neologd.ipadic.lucene.analysis.ja.tokenattributes.PartOfSpeechAttribute def text = args[0] new JapaneseTokenizer(null, false, Mode.NORMAL).withCloseable { tokenizer -> ・・・ }
a2 結果
> groovy a2.groovy "WebAssemblyがサーバーレス分野へ大きな影響を与えるだろうと答えた回答者は全体の56%だった。" term=WebAssembly, partOfSpeech=名詞-固有名詞-組織 term=が, partOfSpeech=助詞-格助詞-一般 term=サーバーレス, partOfSpeech=名詞-固有名詞-一般 term=分野, partOfSpeech=名詞-一般 term=へ, partOfSpeech=助詞-格助詞-一般 term=大きな, partOfSpeech=連体詞 term=影響, partOfSpeech=名詞-サ変接続 term=を, partOfSpeech=助詞-格助詞-一般 term=与える, partOfSpeech=動詞-自立 term=だろ, partOfSpeech=助動詞 term=う, partOfSpeech=助動詞 term=と, partOfSpeech=助詞-格助詞-引用 term=答え, partOfSpeech=動詞-自立 term=た, partOfSpeech=助動詞 term=回答者, partOfSpeech=名詞-固有名詞-一般 term=は, partOfSpeech=助詞-係助詞 term=全体, partOfSpeech=名詞-副詞可能 term=の, partOfSpeech=助詞-連体化 term=5, partOfSpeech=名詞-数 term=6, partOfSpeech=名詞-数 term=%, partOfSpeech=名詞-接尾-助数詞 term=だっ, partOfSpeech=助動詞 term=た, partOfSpeech=助動詞 term=。, partOfSpeech=記号-句点
a1 と違って "サーバーレス" や "回答者" となりました。
(b) atilika Kuromoji
https://github.com/atilika/kuromoji
Lucene Kuromoji のベースとなった Kuromoji。 更新は途絶えているようですが、色々な辞書に対応しています。
ここでは以下の 2種類の辞書を試してみました。
- UniDic(2.1.2)
- JUMAN(7.0-20130310)
b1
まずは、UniDic 版です。
kuromoji/b1.groovy
@Grab('com.atilika.kuromoji:kuromoji-unidic:0.9.0') import com.atilika.kuromoji.unidic.Tokenizer def text = args[0] def tokenizer = new Tokenizer() tokenizer.tokenize(args[0]).each { def pos = [ it.partOfSpeechLevel1, it.partOfSpeechLevel2, it.partOfSpeechLevel3, it.partOfSpeechLevel4 ] println "term=${it.surface}, partOfSpeech=${pos}" }
b1 結果
> groovy b1.groovy "WebAssemblyがサーバーレス分野へ大きな影響を与えるだろうと答えた回答者は全体の56%だ った。" term=WebAssembly, partOfSpeech=[名詞, 普通名詞, 一般, *] term=が, partOfSpeech=[助詞, 格助詞, *, *] term=サーバー, partOfSpeech=[名詞, 普通名詞, 一般, *] term=レス, partOfSpeech=[名詞, 普通名詞, 一般, *] term=分野, partOfSpeech=[名詞, 普通名詞, 一般, *] term=へ, partOfSpeech=[助詞, 格助詞, *, *] term=大きな, partOfSpeech=[連体詞, *, *, *] term=影響, partOfSpeech=[名詞, 普通名詞, サ変可能, *] term=を, partOfSpeech=[助詞, 格助詞, *, *] term=与える, partOfSpeech=[動詞, 一般, *, *] term=だろう, partOfSpeech=[助動詞, *, *, *] term=と, partOfSpeech=[助詞, 格助詞, *, *] term=答え, partOfSpeech=[動詞, 一般, *, *] term=た, partOfSpeech=[助動詞, *, *, *] term=回答, partOfSpeech=[名詞, 普通名詞, サ変可能, *] term=者, partOfSpeech=[接尾辞, 名詞的, 一般, *] term=は, partOfSpeech=[助詞, 係助詞, *, *] term=全体, partOfSpeech=[名詞, 普通名詞, 一般, *] term=の, partOfSpeech=[助詞, 格助詞, *, *] term=5, partOfSpeech=[名詞, 数詞, *, *] term=6, partOfSpeech=[名詞, 数詞, *, *] term=%, partOfSpeech=[名詞, 普通名詞, 助数詞可能, *] term=だっ, partOfSpeech=[助動詞, *, *, *] term=た, partOfSpeech=[助動詞, *, *, *] term=。, partOfSpeech=[補助記号, 句点, *, *]
"だろう" が分割されていないのが特徴。
b2
JUMAN 辞書版です。
kuromoji/b2.groovy
@Grab('com.atilika.kuromoji:kuromoji-jumandic:0.9.0') import com.atilika.kuromoji.jumandic.Tokenizer def text = args[0] def tokenizer = new Tokenizer() ・・・
b2 結果
> groovy b2.groovy "WebAssemblyがサーバーレス分野へ大きな影響を与えるだろうと答えた回答者は全体の56%だ った。" term=WebAssembly, partOfSpeech=[名詞, 組織名, *, *] term=が, partOfSpeech=[助詞, 格助詞, *, *] term=サーバーレス, partOfSpeech=[名詞, 人名, *, *] term=分野, partOfSpeech=[名詞, 普通名詞, *, *] term=へ, partOfSpeech=[助詞, 格助詞, *, *] term=大きな, partOfSpeech=[連体詞, *, *, *] term=影響, partOfSpeech=[名詞, サ変名詞, *, *] term=を, partOfSpeech=[助詞, 格助詞, *, *] term=与える, partOfSpeech=[動詞, *, 母音動詞, 基本形] term=だろう, partOfSpeech=[助動詞, *, 助動詞だろう型, 基本形] term=と, partOfSpeech=[助詞, 格助詞, *, *] term=答えた, partOfSpeech=[動詞, *, 母音動詞, タ形] term=回答, partOfSpeech=[名詞, サ変名詞, *, *] term=者, partOfSpeech=[接尾辞, 名詞性名詞接尾辞, *, *] term=は, partOfSpeech=[助詞, 副助詞, *, *] term=全体, partOfSpeech=[名詞, 普通名詞, *, *] term=の, partOfSpeech=[助詞, 接続助詞, *, *] term=56, partOfSpeech=[名詞, 数詞, *, *] term=%, partOfSpeech=[接尾辞, 名詞性名詞助数辞, *, *] term=だった, partOfSpeech=[判定詞, *, 判定詞, ダ列タ形] term=。, partOfSpeech=[特殊, 句点, *, *]
"サーバーレス"、"だろう"、"答えた"、"56"、"だった" が分割されていないのが特徴。 "サーバーレス" が人名となっているのは不思議。
(c) Sudachi
https://github.com/WorksApplications/Sudachi
辞書は UniDic と NEologd をベースに調整したものらしく、3種類(Small, Core, Full)用意されています。
辞書が継続的にメンテナンスされており最新のものを使えるのが魅力だと思います。
ここではデフォルトの Core 辞書を使いました。(system_core.dic ファイルをカレントディレクトリに配置して実行)
- Core 辞書(20211220 版)
また、Elasticsearch 用のプラグイン analysis-sudachi も用意されています。
sudachi/c1.groovy
@Grab('com.worksap.nlp:sudachi:0.5.3') import com.worksap.nlp.sudachi.DictionaryFactory import com.worksap.nlp.sudachi.Tokenizer def text = args[0] new DictionaryFactory().create().withCloseable { dic -> def tokenizer = dic.create() def ts = tokenizer.tokenize(Tokenizer.SplitMode.C, text) ts.each { t -> def pos = dic.getPartOfSpeechString(t.partOfSpeechId()) println "term=${t.surface()}, partOfSpeech=${pos}" } }
c1 結果
> groovy c1.groovy "WebAssemblyがサーバーレス分野へ大きな影響を与えるだろうと答えた回答者は全体の56%だっ た。" term=WebAssembly, partOfSpeech=[名詞, 普通名詞, 一般, *, *, *] term=が, partOfSpeech=[助詞, 格助詞, *, *, *, *] term=サーバーレス, partOfSpeech=[名詞, 普通名詞, 一般, *, *, *] term=分野, partOfSpeech=[名詞, 普通名詞, 一般, *, *, *] term=へ, partOfSpeech=[助詞, 格助詞, *, *, *, *] term=大きな, partOfSpeech=[連体詞, *, *, *, *, *] term=影響, partOfSpeech=[名詞, 普通名詞, サ変可能, *, *, *] term=を, partOfSpeech=[助詞, 格助詞, *, *, *, *] term=与える, partOfSpeech=[動詞, 一般, *, *, 下一段-ア行, 終止形-一般] term=だろう, partOfSpeech=[助動詞, *, *, *, 助動詞-ダ, 意志推量形] term=と, partOfSpeech=[助詞, 格助詞, *, *, *, *] term=答え, partOfSpeech=[動詞, 一般, *, *, 下一段-ア行, 連用形-一般] term=た, partOfSpeech=[助動詞, *, *, *, 助動詞-タ, 連体形-一般] term=回答者, partOfSpeech=[名詞, 普通名詞, 一般, *, *, *] term=は, partOfSpeech=[助詞, 係助詞, *, *, *, *] term=全体, partOfSpeech=[名詞, 普通名詞, 一般, *, *, *] term=の, partOfSpeech=[助詞, 格助詞, *, *, *, *] term=56, partOfSpeech=[名詞, 数詞, *, *, *, *] term=%, partOfSpeech=[名詞, 普通名詞, 助数詞可能, *, *, *] term=だっ, partOfSpeech=[助動詞, *, *, *, 助動詞-ダ, 連用形-促音便] term=た, partOfSpeech=[助動詞, *, *, *, 助動詞-タ, 終止形-一般] term=。, partOfSpeech=[補助記号, 句点, *, *, *, *]
"サーバーレス"、"だろう"、"回答者"、"56" となっているのが特徴。
(d) Kuromoji.js
https://github.com/takuyaa/kuromoji.js/
Kuromoji の JavaScript 実装。
kuromoji.js/d1.mjs
import kuromoji from 'kuromoji' const dicPath = 'node_modules/kuromoji/dict' const text = process.argv[2] kuromoji.builder({ dicPath }).build((err, tokenizer) => { if (err) { console.error(err) return } const ts = tokenizer.tokenize(text) for (const t of ts) { const pos = [t.pos, t.pos_detail_1, t.pos_detail_2, t.pos_detail_3] console.log(`term=${t.surface_form}, partOfSpeech=${pos}`) } })
d1 結果
> node d1.mjs "WebAssemblyがサーバーレス分野へ大きな影響を与えるだろうと答えた回答者は全体の56%だった。" term=WebAssembly, partOfSpeech=名詞,固有名詞,組織,* term=が, partOfSpeech=助詞,格助詞,一般,* term=サーバー, partOfSpeech=名詞,一般,*,* term=レス, partOfSpeech=名詞,サ変接続,*,* term=分野, partOfSpeech=名詞,一般,*,* term=へ, partOfSpeech=助詞,格助詞,一般,* term=大きな, partOfSpeech=連体詞,*,*,* term=影響, partOfSpeech=名詞,サ変接続,*,* term=を, partOfSpeech=助詞,格助詞,一般,* term=与える, partOfSpeech=動詞,自立,*,* term=だろ, partOfSpeech=助動詞,*,*,* term=う, partOfSpeech=助動詞,*,*,* term=と, partOfSpeech=助詞,格助詞,引用,* term=答え, partOfSpeech=動詞,自立,*,* term=た, partOfSpeech=助動詞,*,*,* term=回答, partOfSpeech=名詞,サ変接続,*,* term=者, partOfSpeech=名詞,接尾,一般,* term=は, partOfSpeech=助詞,係助詞,*,* term=全体, partOfSpeech=名詞,副詞可能,*,* term=の, partOfSpeech=助詞,連体化,*,* term=5, partOfSpeech=名詞,数,*,* term=6, partOfSpeech=名詞,数,*,* term=%, partOfSpeech=名詞,接尾,助数詞,* term=だっ, partOfSpeech=助動詞,*,*,* term=た, partOfSpeech=助動詞,*,*,* term=。, partOfSpeech=記号,句点,*,*
a1 と同じ結果になりました。
(e) Fugashi
https://github.com/polm/fugashi
辞書として unidic-lite と unidic のパッケージが用意されていましたが、 ここでは JUMAN 辞書を使いました。
- JUMAN
fugashi/e1.py
from fugashi import GenericTagger import sys text = sys.argv[1] tagger = GenericTagger() for t in tagger(text): pos = t.feature[0:4] print(f"term={t.surface}, partOfSpeech={pos}")
e1 結果
> python e1.py "WebAssemblyがサーバーレス分野へ大きな影響を与えるだろうと答えた回答者は全体の56%だった。" term=WebAssembly, partOfSpeech=('名詞', '組織名', '*', '*') term=が, partOfSpeech=('助詞', '格助詞', '*', '*') term=サーバーレス, partOfSpeech=('名詞', '人名', '*', '*') term=分野, partOfSpeech=('名詞', '普通名詞', '*', '*') term=へ, partOfSpeech=('助詞', '格助詞', '*', '*') term=大きな, partOfSpeech=('連体詞', '*', '*', '*') term=影響, partOfSpeech=('名詞', 'サ変名詞', '*', '*') term=を, partOfSpeech=('助詞', '格助詞', '*', '*') term=与える, partOfSpeech=('動詞', '*', '母音動詞', '基本形') term=だろう, partOfSpeech=('助動詞', '*', '助動詞だろう型', '基本形') term=と, partOfSpeech=('助詞', '格助詞', '*', '*') term=答えた, partOfSpeech=('動詞', '*', '母音動詞', 'タ形') term=回答, partOfSpeech=('名詞', 'サ変名詞', '*', '*') term=者, partOfSpeech=('接尾辞', '名詞性名詞接尾辞', '*', '*') term=は, partOfSpeech=('助詞', '副助詞', '*', '*') term=全体, partOfSpeech=('名詞', '普通名詞', '*', '*') term=の, partOfSpeech=('助詞', '接続助詞', '*', '*') term=56, partOfSpeech=('名詞', '数詞', '*', '*') term=%, partOfSpeech=('接尾辞', '名詞性名詞助数辞', '*', '*') term=だった, partOfSpeech=('判定詞', '*', '判定詞', 'ダ列タ形') term=。, partOfSpeech=('特殊', '句点', '*', '*')
同じ辞書を使っている b2 と同じ結果になりました。("サーバーレス" が人名なのも同じ)。
(f) Kagome
https://github.com/ikawaha/kagome
ここでは以下の辞書を使用します。
- IPADIC(mecab-ipadic-2.7.0-20070801)
- UniDic(2.1.2)
なお、Tokenize
を呼び出した場合は、分割モードとして Normal
が適用されるようです。
f1
IPADIC 版
kagome/f1.go
package main import ( "fmt" "os" "github.com/ikawaha/kagome-dict/ipa" "github.com/ikawaha/kagome/v2/tokenizer" ) func main() { text := os.Args[1] t, err := tokenizer.New(ipa.Dict(), tokenizer.OmitBosEos()) if err != nil { panic(err) } // 分割モード Normal ts := t.Tokenize(text) for _, t := range ts { fmt.Printf("term=%s, partOfSpeech=%v\n", t.Surface, t.POS()) } }
f1 結果
> go run f1.go "WebAssemblyがサーバーレス分野へ大きな影響を与えるだろうと答えた回答者は全体の56%だった。" term=WebAssembly, partOfSpeech=[名詞 固有名詞 組織 *] term=が, partOfSpeech=[助詞 格助詞 一般 *] term=サーバー, partOfSpeech=[名詞 一般 * *] term=レス, partOfSpeech=[名詞 サ変接続 * *] term=分野, partOfSpeech=[名詞 一般 * *] term=へ, partOfSpeech=[助詞 格助詞 一般 *] term=大きな, partOfSpeech=[連体詞 * * *] term=影響, partOfSpeech=[名詞 サ変接続 * *] term=を, partOfSpeech=[助詞 格助詞 一般 *] term=与える, partOfSpeech=[動詞 自立 * *] term=だろ, partOfSpeech=[助動詞 * * *] term=う, partOfSpeech=[助動詞 * * *] term=と, partOfSpeech=[助詞 格助詞 引用 *] term=答え, partOfSpeech=[動詞 自立 * *] term=た, partOfSpeech=[助動詞 * * *] term=回答, partOfSpeech=[名詞 サ変接続 * *] term=者, partOfSpeech=[名詞 接尾 一般 *] term=は, partOfSpeech=[助詞 係助詞 * *] term=全体, partOfSpeech=[名詞 副詞可能 * *] term=の, partOfSpeech=[助詞 連体化 * *] term=5, partOfSpeech=[名詞 数 * *] term=6, partOfSpeech=[名詞 数 * *] term=%, partOfSpeech=[名詞 接尾 助数詞 *] term=だっ, partOfSpeech=[助動詞 * * *] term=た, partOfSpeech=[助動詞 * * *] term=。, partOfSpeech=[記号 句点 * *]
同じ辞書を使っている a1 と同じ結果になりました。
f2
UniDic 版
kagome/f2.go
package main import ( "fmt" "os" "github.com/ikawaha/kagome-dict/uni" "github.com/ikawaha/kagome/v2/tokenizer" ) func main() { text := os.Args[1] t, err := tokenizer.New(uni.Dict(), tokenizer.OmitBosEos()) ・・・ }
f2 結果
> go run f2.go "WebAssemblyがサーバーレス分野へ大きな影響を与えるだろうと答えた回答者は全体の56%だった。" term=WebAssembly, partOfSpeech=[名詞 普通名詞 一般 *] term=が, partOfSpeech=[助詞 格助詞 * *] term=サーバー, partOfSpeech=[名詞 普通名詞 一般 *] term=レス, partOfSpeech=[名詞 普通名詞 一般 *] term=分野, partOfSpeech=[名詞 普通名詞 一般 *] term=へ, partOfSpeech=[助詞 格助詞 * *] term=大きな, partOfSpeech=[連体詞 * * *] term=影響, partOfSpeech=[名詞 普通名詞 サ変可能 *] term=を, partOfSpeech=[助詞 格助詞 * *] term=与える, partOfSpeech=[動詞 一般 * *] term=だろう, partOfSpeech=[助動詞 * * *] term=と, partOfSpeech=[助詞 格助詞 * *] term=答え, partOfSpeech=[動詞 一般 * *] term=た, partOfSpeech=[助動詞 * * *] term=回答, partOfSpeech=[名詞 普通名詞 サ変可能 *] term=者, partOfSpeech=[接尾辞 名詞的 一般 *] term=は, partOfSpeech=[助詞 係助詞 * *] term=全体, partOfSpeech=[名詞 普通名詞 一般 *] term=の, partOfSpeech=[助詞 格助詞 * *] term=5, partOfSpeech=[名詞 数詞 * *] term=6, partOfSpeech=[名詞 数詞 * *] term=%, partOfSpeech=[名詞 普通名詞 助数詞可能 *] term=だっ, partOfSpeech=[助動詞 * * *] term=た, partOfSpeech=[助動詞 * * *] term=。, partOfSpeech=[補助記号 句点 * *]
同じ辞書を使っている b1 と同じ結果になりました。
(g) Lindera
https://github.com/lindera-morphology/lindera
kuromoji-rs のフォークのようで、辞書は IPADIC です。
- IPADIC(mecab-ipadic-2.7.0-20070801)
lindera/src/main.rs
use lindera::tokenizer::Tokenizer; use lindera_core::LinderaResult; use std::env; fn main() -> LinderaResult<()> { let text = env::args().nth(1).unwrap_or("".to_string()); let mut tokenizer = Tokenizer::new()?; let ts = tokenizer.tokenize(&text)?; for t in ts { let pos = t.detail.get(0..4).unwrap_or(&t.detail); println!("text={}, partOfSpeech={:?}", t.text, pos); } Ok(()) }
結果
> cargo run "WebAssemblyがサーバーレス分野へ大きな影響を与えるだろうと答えた回答者は全体の56%だった。" ・・・ text=WebAssembly, partOfSpeech=["UNK"] text=が, partOfSpeech=["助詞", "格助詞", "一般", "*"] text=サーバー, partOfSpeech=["名詞", "一般", "*", "*"] text=レス, partOfSpeech=["名詞", "サ変接続", "*", "*"] text=分野, partOfSpeech=["名詞", "一般", "*", "*"] text=へ, partOfSpeech=["助詞", "格助詞", "一般", "*"] text=大きな, partOfSpeech=["連体詞", "*", "*", "*"] text=影響, partOfSpeech=["名詞", "サ変接続", "*", "*"] text=を, partOfSpeech=["助詞", "格助詞", "一般", "*"] text=与える, partOfSpeech=["動詞", "自立", "*", "*"] text=だろ, partOfSpeech=["助動詞", "*", "*", "*"] text=う, partOfSpeech=["助動詞", "*", "*", "*"] text=と, partOfSpeech=["助詞", "格助詞", "引用", "*"] text=答え, partOfSpeech=["動詞", "自立", "*", "*"] text=た, partOfSpeech=["助動詞", "*", "*", "*"] text=回答, partOfSpeech=["名詞", "サ変接続", "*", "*"] text=者, partOfSpeech=["名詞", "接尾", "一般", "*"] text=は, partOfSpeech=["助詞", "係助詞", "*", "*"] text=全体, partOfSpeech=["名詞", "副詞可能", "*", "*"] text=の, partOfSpeech=["助詞", "連体化", "*", "*"] text=5, partOfSpeech=["名詞", "数", "*", "*"] text=6, partOfSpeech=["名詞", "数", "*", "*"] text=%, partOfSpeech=["名詞", "接尾", "助数詞", "*"] text=だっ, partOfSpeech=["助動詞", "*", "*", "*"] text=た, partOfSpeech=["助動詞", "*", "*", "*"] text=。, partOfSpeech=["記号", "句点", "*", "*"]
a1 と概ね同じ結果となりましたが、"WebAssembly" が名詞になっていないのが特徴。
Node.js で GraphQL over gRPC 的な事をやってみる
gRPC 上で GraphQL を扱う GraphQL over gRPC 的な処理を Node.js で試しに実装してみました。
今回のコードは http://github.com/fits/try_samples/tree/master/blog/20201124/
はじめに
GraphQL はクエリ言語なので基本的に通信プロトコルには依存していません。
Web フロントエンドの用途では Apollo GraphQL が公開している GraphQL over WebSocket Protocol が有力そうですが、マイクロサービス等の用途で GraphQL を利用する事を考えると WebSocket よりも gRPC の方が適しているように思います。
- GraphQL の Query や Mutation は gRPC の Unary RPC で実現可能
- GraphQL の Subscription は gRPC の Server streaming RPC で実現可能 ※
そこで、とりあえず実装し確認してみたというのが本件の趣旨となっています。
※ GraphQL の Subscription を使わずに gRPC の streaming RPC で代用する事も考えられる
なお、GraphQL の処理に関しては「Deno で GraphQL」の内容をベースに、gRPC は「Node.js で gRPC を試す」の静的コード生成を用いて実装しています。
Query と Mutation - sample1
まずは Subscription を除いた Query と Mutation について実装してみます。
gRPC サービス定義
gRPC のサービス定義を下記のように定義してみました。
GraphQL のクエリは文字列で扱うので型は string
、結果は実質的に JSON となるので型は google.protobuf.Struct
としました。
ついでに、クエリの変数も渡せるようにして型は google.protobuf.Value
としています。
ここで、google.protobuf.Struct
は JSON Object を Protocol Buffers で表現するための型として定義されたもので、google.protobuf.Value
は JSON Value(null、文字列、数値、配列、JSON Object 等)を表現するための型です。(参照 google/protobuf/struct.proto)
proto/graphql.proto
syntax = "proto3"; import "google/protobuf/struct.proto"; package gql; message QueryRequest { string query = 1; google.protobuf.Value variables = 2; } service GraphQL { rpc Query(QueryRequest) returns (google.protobuf.Struct); }
この proto/graphql.proto から gRPC のコードを生成しておきます。
静的コード生成
> mkdir generated > npm run gen-grpc ・・・ grpc_tools_node_protoc --grpc_out=grpc_js:generated --js_out=import_style=commonjs:generated proto/*.proto
package.json の内容は以下の通りです。
package.json
{ "name": "sample1", "version": "1.0.0", "description": "", "scripts": { "gen-grpc": "grpc_tools_node_protoc --grpc_out=grpc_js:generated --js_out=import_style=commonjs:generated proto/*.proto" }, "dependencies": { "@grpc/grpc-js": "^1.2.1", "google-protobuf": "^3.14.0", "graphql": "^15.4.0", "uuid": "^8.3.1" }, "devDependencies": { "grpc-tools": "^1.10.0" } }
サーバー実装
GraphQL を扱う gRPC サーバーを実装します。
gRPC のリクエストから GraphQL のクエリやその変数の内容を取得し、graphql
関数で処理した結果をレスポンスとして返すような処理となります。
google.protobuf.Struct や Value に該当する型は google-protobuf/google/protobuf/struct_pb.js
にて Struct
や Value
として定義されており、プレーンな JavaScript オブジェクトと相互変換するための fromJavaScript
や toJavaScript
メソッドが用意されています。
下記コードでは QueryRequest の variables の内容を JavaScript オブジェクトとして取得するために toJavaScript
を、graphql の処理結果を Struct で返すために fromJavaScript
をそれぞれ使用しています。
server.js
const grpc = require('@grpc/grpc-js') const { GraphQLService } = require('./generated/proto/graphql_grpc_pb') const { Struct } = require('google-protobuf/google/protobuf/struct_pb') const { graphql, buildSchema } = require('graphql') const { v4: uuidv4 } = require('uuid') // GraphQL スキーマ定義 const schema = buildSchema(` enum Category { Standard Extra } input CreateItem { category: Category! value: Int! } type Item { id: ID! category: Category! value: Int! } type Mutation { create(input: CreateItem!): Item } type Query { find(id: ID!): Item } `) const store = {} // スキーマ定義に応じた GraphQL 処理の実装 const root = { create: ({ input: { category, value } }) => { console.log(`*** call create: category = ${category}, value = ${value}`) const id = `item-${uuidv4()}` const item = { id, category, value } store[id] = item return item }, find: ({ id }) => { console.log(`*** call find: ${id}`) return store[id] } } const server = new grpc.Server() server.addService(GraphQLService, { async query(call, callback) { try { const query = call.request.getQuery() const variables = call.request.getVariables().toJavaScript() // GraphQL の処理 const r = await graphql(schema, query, root, {}, variables) callback(null, Struct.fromJavaScript(r)) } catch(e) { console.error(e) callback(e) } } }) server.bindAsync( '127.0.0.1:50051', grpc.ServerCredentials.createInsecure(), (err, port) => { if (err) { console.error(err) return } console.log(`start server: ${port}`) server.start() } )
クライアント実装
クライアント側は以下のようになります。
client.js
const grpc = require('@grpc/grpc-js') const { QueryRequest } = require('./generated/proto/graphql_pb') const { GraphQLClient } = require('./generated/proto/graphql_grpc_pb') const { Value } = require('google-protobuf/google/protobuf/struct_pb') const client = new GraphQLClient( '127.0.0.1:50051', grpc.credentials.createInsecure() ) const promisify = (obj, methodName) => args => new Promise((resolve, reject) => { obj[methodName](args, (err, res) => { if (err) { reject(err) } else { resolve(res) } }) }) const query = promisify(client, 'query') const createRequest = (q, v = null) => { const req = new QueryRequest() req.setQuery(q) req.setVariables(Value.fromJavaScript(v)) return req } const run = async () => { // Item の作成 const r1 = await query(createRequest(` mutation { create(input: { category: Extra, value: 123 }) { id } } `)) console.log(r1.toJavaScript()) // 存在しない Item の find const r2 = await query(createRequest(` { find(id: "a1") { id value } } `)) console.log(r2.toJavaScript()) const id = r1.toJavaScript().data.create.id // 作成した Item の find (クエリ変数の使用) const r3 = await query(createRequest( ` query findItem($id: ID!) { find(id: $id) { id category value } } `, { id } )) console.log(r3.toJavaScript()) } run().catch(err => console.error(err))
動作確認
server.js を実行しておきます。
server.js 実行
> node server.js start server: 50051
client.js を実行した結果は以下の通りで、特に問題無く動作しているようです。
client.js 実行
> node client.js { data: { create: { id: 'item-63bb7704-27b6-44ae-b955-61cbad83248d' } } } { data: { find: null } } { data: { find: { category: 'Extra', id: 'item-63bb7704-27b6-44ae-b955-61cbad83248d', value: 123 } } }
Subscription - sample2
次は Subscription の機能を追加します。
gRPC サービス定義
gRPC のサービス定義に Subscription 用のメソッドを追加し、sample1 と同様にコードを生成しておきます。
proto/graphql.proto
syntax = "proto3"; import "google/protobuf/struct.proto"; package gql; message QueryRequest { string query = 1; google.protobuf.Value variables = 2; } service GraphQL { rpc Query(QueryRequest) returns (google.protobuf.Struct); rpc Subscription(QueryRequest) returns (stream google.protobuf.Struct); }
サーバー実装
「Deno で GraphQL」では単一の Subscription を処理するだけの実装だったので、複数クライアントからの Subscription を処理するために PubSub
というクラスを追加し、subscription の呼び出し毎に MessageBox を作成、(クライアントが接続中の)有効な全ての MessageBox へメッセージを配信するようにしています。
server.js
const grpc = require('@grpc/grpc-js') const { GraphQLService } = require('./generated/proto/graphql_grpc_pb') const { Struct } = require('google-protobuf/google/protobuf/struct_pb') const { graphql, buildSchema, subscribe, parse } = require('graphql') const { v4: uuidv4 } = require('uuid') // GraphQL スキーマ定義 const schema = buildSchema(` enum Category { Standard Extra } input CreateItem { category: Category! value: Int! } type Item { id: ID! category: Category! value: Int! } type Mutation { create(input: CreateItem!): Item } type Query { find(id: ID!): Item } type Subscription { created: Item } `) class MessageBox { #promises = [] #resolves = [] #appendPromise = () => this.#promises.push( new Promise(res => this.#resolves.push(res)) ) publish(msg) { if (this.#resolves.length == 0) { this.#appendPromise() } this.#resolves.shift()(msg) } [Symbol.asyncIterator]() { return { next: async () => { console.log('*** asyncIterator next') if (this.#promises.length == 0) { this.#appendPromise() } const value = await this.#promises.shift() return { value, done: false } } } } } // クライアント毎の MessageBox を管理 class PubSub { #subscribes = [] publish(msg) { this.#subscribes.forEach(s => s.publish(msg)) } subscribe() { const sub = new MessageBox() this.#subscribes.push(sub) return sub } unsubscribe(sub) { this.#subscribes = this.#subscribes.filter(s => s != sub) } } const store = {} const pubsub = new PubSub() const root = { create: ({ input: { category, value } }) => { ・・・ }, find: ({ id }) => { ・・・ } } const server = new grpc.Server() server.addService(GraphQLService, { async query(call, callback) { ・・・ }, async subscription(call) { console.log('*** subscribed') try { const query = call.request.getQuery() const variables = call.request.getVariables().toJavaScript() const sub = pubsub.subscribe() call.on('cancelled', () => { console.log('*** unsubscribed') pubsub.unsubscribe(sub) }) const subRoot = { created: () => sub } // GraphQL の Subscription 処理 const aiter = await subscribe(schema, parse(query), subRoot, {}, variables) for await (const r of aiter) { // メッセージの配信 call.write(Struct.fromJavaScript(r)) } } catch(e) { console.error(e) call.destroy(e) } } }) server.bindAsync( ・・・ )
Subscription 用クライアント実装
Subscription を呼び出すクライアントは以下のようになります。
client_subscribe.js
const grpc = require('@grpc/grpc-js') const { QueryRequest } = require('./generated/proto/graphql_pb') const { GraphQLClient } = require('./generated/proto/graphql_grpc_pb') const { Value } = require('google-protobuf/google/protobuf/struct_pb') const client = new GraphQLClient( '127.0.0.1:50051', grpc.credentials.createInsecure() ) const req = new QueryRequest() req.setQuery(` subscription { created { id category } } `) req.setVariables(Value.fromJavaScript(null)) const stream = client.subscription(req) stream.on('data', msg => { const event = msg.toJavaScript() console.log(`*** received event = ${JSON.stringify(event)}`) }) stream.on('end', () => console.log('*** stream end')) stream.on('error', err => console.log(`*** stream error: ${err}`))
動作確認
server.js を実行した後、client_subscribe.js を 2つ実行して sample1 で作成した client.js を実行すると以下のようになりました。
server.js の出力結果
> node server.js start server: 50051 *** subscribed *** asyncIterator next *** subscribed *** asyncIterator next *** call create: category = Extra, value = 123 *** asyncIterator next *** asyncIterator next *** call find: a1 *** call find: item-5e3f81ed-774a-4f7f-afc5-000a2db34859
client_subscribe.js の出力結果
> node client_subscribe.js *** received event = {"data":{"created":{"category":"Extra","id":"item-5e3f81ed-774a-4f7f-afc5-000a2db34859"}}}
Node.js で gRPC を試す
「gRPC Server Reflection のクライアント処理」では Node.js で gRPC クライアントを実装しましたが、今回はサーバー側も実装してみます。
サンプルコードは http://github.com/fits/try_samples/tree/master/blog/20201115/
はじめに
gRPC on Node.js では、以下の 2通りの手法が用意されており、それぞれ使用するパッケージが異なります。
- (a) 動的コード生成 (
@grpc/proto-loader
パッケージを使用) - (b) 静的コード生成 (
grpc-tools
パッケージを使用)
更に、gRPC の実装ライブラリとして以下の 2種類が用意されており、どちらかを使う事になります。
- C-based Client and Server (
grpc
パッケージを使用) - Pure JavaScript Client (
@grpc/grpc-js
パッケージを使用)
@grpc/grpc-js
は現時点で Pure JavaScript Client と表現されていますが、クライアントだけではなくサーバーの実装にも使えます。
ここでは、Pure JavaScript 実装の @grpc/grpc-js
を使って、(a) と (b) の両方を試してみます。
サービス定義(proto ファイル)
gRPC のサービス定義として下記ファイルを使用します。
Unary RPC(1リクエスト / 1レスポンス)と Server streaming RPC(1リクエスト / 多レスポンス)、message の oneof
、google.protobuf.Empty
の扱い等を確認するような内容にしてみました。
proto/item.proto
syntax = "proto3"; import "google/protobuf/empty.proto"; package item; message AddItemRequest { string item_id = 1; uint64 price = 2; } message ItemRequest { string item_id = 1; } message Item { string item_id = 1; uint64 price = 2; } message ItemSubscribeRequest { } message AddedItem { string item_id = 1; uint64 price = 2; } message RemovedItem { string item_id = 1; } message ItemEvent { oneof event { AddedItem added = 1; RemovedItem removed = 2; } } service ItemManage { rpc AddItem(AddItemRequest) returns (google.protobuf.Empty); rpc RemoveItem(ItemRequest) returns (google.protobuf.Empty); rpc GetItem(ItemRequest) returns (Item); rpc Subscribe(ItemSubscribeRequest) returns (stream ItemEvent); }
(a) 動的コード生成(@grpc/proto-loader)
まずは、@grpc/proto-loader
を使った動的コード生成を試します。
インストール
@grpc/proto-loader
と @grpc/grpc-js
をインストールしておきます。
> npm install --save @grpc/proto-loader @grpc/grpc-js
サーバー実装
proto-loader の loadSync
関数 ※ で proto ファイルをロードした結果を grpc-js の loadPackageDefinition
で処理する事で型定義などを動的に生成します。
※ 非同期版の load 関数も用意されています
addService
で gRPC のサービス定義と処理をマッピングし、bindAsync
後に start
を呼び出す事でサーバー処理を開始します。
proto ファイルで定義したメッセージ型と同じフィールドを持つ JavaScript オブジェクトを gRPC のリクエストやレスポンスで使う事ができるようです。
Unary RPC の場合は、第二引数の callback
へ失敗時の値と成功時の値をそれぞれ渡す事で処理結果を返します。
任意のエラーを返したい場合は、code
で gRPC のステータスコードを、details
でエラー内容を指定します。
google.protobuf.Empty の箇所は null
もしくは undefined
で代用できるようです。
Server streaming RPC の場合は、第一引数(下記コードでは call
)の write
を呼び出す事で処理結果を返す事ができます。
クライアントが途中で切断したりすると cancelled
が発生するようになっており、cancelled 発生後に write
を呼び出してもエラー等は発生しないようになっていました。
server.js
const protoLoader = require('@grpc/proto-loader') const grpc = require('@grpc/grpc-js') const protoFile = './proto/item.proto' // proto ファイルのロード const pd = protoLoader.loadSync(protoFile) // gPRC 用の動的な型定義生成 const proto = grpc.loadPackageDefinition(pd) let store = [] let subscribeList = [] const findItem = itemId => store.find(i => i.itemId == itemId) const addItem = (itemId, price) => { if (findItem(itemId)) { return undefined } const item = { itemId, price } store.push(item) return item } const removeItem = itemId => { const item = findItem(itemId) if (item) { store = store.filter(i => i.itemId != item.itemId) } return item } // ItemEvent の配信 const publishEvent = event => { console.log(`*** publish event: ${JSON.stringify(event)}`) subscribeList.forEach(s => s.write(event)) } const server = new grpc.Server() // サービス定義と処理のマッピング server.addService(proto.item.ItemManage.service, { AddItem(call, callback) { const itemId = call.request.itemId const price = call.request.price const item = addItem(itemId, price) if (item) { callback() publishEvent({ added: { itemId, price }}) } else { const err = { code: grpc.status.ALREADY_EXISTS, details: 'exists item' } callback(err) } }, RemoveItem(call, callback) { const itemId = call.request.itemId if (removeItem(itemId)) { callback() publishEvent({ removed: { itemId }}) } else { const err = { code: grpc.status.NOT_FOUND, details: 'item not found' } callback(err) } }, GetItem(call, callback) { const itemId = call.request.itemId const item = findItem(itemId) if (item) { callback(null, item) } else { const err = { code: grpc.status.NOT_FOUND, details: 'item not found' } callback(err) } }, Subscribe(call) { console.log('*** subscribed') subscribeList.push(call) // クライアント切断時の処理 call.on('cancelled', () => { console.log('*** unsubscribed') subscribeList = subscribeList.filter(s => s != call) }) } }) server.bindAsync( '127.0.0.1:50051', grpc.ServerCredentials.createInsecure(), (err, port) => { if (err) { console.error(err) return } console.log(`start server: ${port}`) // 開始 server.start() } )
クライアント実装1
まずは、Unary RPC の API のみ(Subscribe 以外)を呼び出すクライアントを実装してみます。
loadPackageDefinition を実施するところまではサーバーと同じです。
Unary RPC はコールバック関数を伴ったメソッドとして用意されますが、このメソッドに Node.js の util.promisify
を直接適用すると不都合が生じたため、Promise 化は自前の関数(下記の promisify
)で実施するようにしました。
client.js
const protoLoader = require('@grpc/proto-loader') const grpc = require('@grpc/grpc-js') const protoFile = './proto/item.proto' const pd = protoLoader.loadSync(protoFile) const proto = grpc.loadPackageDefinition(pd) const id = process.argv[2] const client = new proto.item.ItemManage( '127.0.0.1:50051', grpc.credentials.createInsecure() ) // Unary RPC の Promise 化 const promisify = (obj, methodName) => args => new Promise((resolve, reject) => { obj[methodName](args, (err, res) => { if (err) { reject(err) } else { resolve(res) } }) }) const addItem = promisify(client, 'AddItem') const removeItem = promisify(client, 'RemoveItem') const getItem = promisify(client, 'GetItem') const printItem = item => { console.log(`id = ${item.itemId}, price = ${item.price}`) } const run = async () => { await addItem({ itemId: `${id}_item-1`, price: 100 }) const item1 = await getItem({ itemId: `${id}_item-1` }) printItem(item1) await addItem({ itemId: `${id}_item-2`, price: 20 }) const item2 = await getItem({ itemId: `${id}_item-2` }) printItem(item2) await addItem({ itemId: `${id}_item-1`, price: 50 }) .catch(err => console.error(`*** ERROR = ${err.message}`)) await removeItem({ itemId: `${id}_item-1` }) await getItem({ itemId: `${id}_item-1` }) .catch(err => console.error(`*** ERROR = ${err.message}`)) await removeItem({ itemId: `${id}_item-2` }) } run().catch(err => console.error(err))
クライアント実装2
次は Server streaming RPC のクライアント実装です。
client_subscribe.js
const protoLoader = require('@grpc/proto-loader') const grpc = require('@grpc/grpc-js') const protoFile = './proto/item.proto' const pd = protoLoader.loadSync(protoFile) const proto = grpc.loadPackageDefinition(pd) const client = new proto.item.ItemManage( '127.0.0.1:50051', grpc.credentials.createInsecure() ) const stream = client.Subscribe({}) // メッセージ受信時 stream.on('data', event => { console.log(`*** received event = ${JSON.stringify(event)}`) }) // サーバー終了時 stream.on('end', () => console.log('*** stream end')) stream.on('error', err => console.log(`*** stream error: ${err}`))
動作確認
server.js の実行後、client_subscribe.js を 2つ起動した後に client.js を実行してみます。
Server 実行
> node server.js start server: 50051
Client2-1 実行
> node client_subscribe.js
Client2-2 実行
> node client_subscribe.js
Client1 実行
> node client.js a1 id = a1_item-1, price = 100 id = a1_item-2, price = 20 *** ERROR = 6 ALREADY_EXISTS: exists item *** ERROR = 5 NOT_FOUND: item not found
この時点で出力内容は以下のようになりました。
Server 出力内容
> node server.js start server: 50051 *** subscribed *** subscribed *** publish event: {"added":{"itemId":"a1_item-1","price":100}} *** publish event: {"added":{"itemId":"a1_item-2","price":20}} *** publish event: {"removed":{"itemId":"a1_item-1"}} *** publish event: {"removed":{"itemId":"a1_item-2"}}
Client2-1、Client2-2 出力内容
> node client_subscribe.js *** received event = {"added":{"itemId":"a1_item-1","price":{"low":100,"high":0,"unsigned":true}}} *** received event = {"added":{"itemId":"a1_item-2","price":{"low":20,"high":0,"unsigned":true}}} *** received event = {"removed":{"itemId":"a1_item-1"}} *** received event = {"removed":{"itemId":"a1_item-2"}}
Client2-2 を Ctrl + c で終了後に、Server も Ctrl + c で終了すると以下のようになり、Client2-1 のプロセスは終了しました。
Server 出力内容
> node server.js start server: 50051 ・・・ *** publish event: {"removed":{"itemId":"a1_item-2"}} *** unsubscribed ^C
Client2-1 出力内容
> node client_subscribe.js ・・・ *** received event = {"removed":{"itemId":"a1_item-2"}} *** stream error: Error: 13 INTERNAL: Received RST_STREAM with code 2 (Internal server error) *** stream end
特に問題はなく、正常に動作しているようです。
(b) 静的コード生成(grpc-tools)
grpc-tools
を使った静的コード生成を試します。
インストールとコード生成
grpc-tools
、@grpc/grpc-js
、google-protobuf
をインストールしておきます。
> npm install --save-dev grpc-tools ・・・ > npm install --save @grpc/grpc-js google-protobuf ・・・
grpc-tools をインストールする事で使えるようになる grpc_tools_node_protoc
コマンドで proto ファイルからコードを生成します。
grpc_tools_node_protoc コマンドは内部的に protoc
コマンドを grpc_node_plugin
プラグインを伴って呼び出すようになっています。
--grpc_out
でサービス定義用のファイル xxx_grpc_pb.js が生成され、--js_out
でメッセージ定義用のファイルが生成されます。
サービス定義 xxx_grpc_pb.js は --js_out で import_style=commonjs
オプションを指定する事を前提としたコードになっています。※
※ import_style=commonjs オプションを指定した際に生成される xxx_pb.js を参照するようになっている
また、--grpc_out はデフォルトで grpc
パッケージ用のコードを生成するため、ここでは grpc_js
オプションを指定して @grpc/grpc-js
用のコードを生成するようにしています。
静的コード生成例(grpc_tools_node_protoc コマンド)
> mkdir generated > grpc_tools_node_protoc --grpc_out=grpc_js:generated --js_out=import_style=commonjs:generated proto/*.proto ・・・
サーバー実装
(a) の場合と処理内容に大きな違いはありませんが、リクエストやレスポンスでは生成された型を使います。
アクセサメソッド(getter、setter)で値の取得や設定ができるようになっており、new 時に配列として全フィールドの値を指定する事もできるようです。
JavaScript オブジェクトへ変換したい場合は toObject
メソッドを使用します。
addService でマッピングする際のメソッド名の一文字目が小文字になっています。
proto ファイルで定義したサービス名の後に Service
を付けた型(ここでは ItemManageService
)がサーバー処理用、Client
を付けた型がクライアント処理用の型定義となるようです。
server.js
const grpc = require('@grpc/grpc-js') const { Item, AddedItem, RemovedItem, ItemEvent } = require('./generated/proto/item_pb') const { ItemManageService } = require('./generated/proto/item_grpc_pb') const { Empty } = require('google-protobuf/google/protobuf/empty_pb') let store = [] let subscribeList = [] const findItem = itemId => store.find(i => i.getItemId() == itemId) const addItem = (itemId, price) => { if (findItem(itemId)) { return undefined } const item = new Item([itemId, price]) store.push(item) return item } const removeItem = itemId => { const item = findItem(itemId) if (item) { store = store.filter(i => i.getItemId() != item.getItemId()) } return item } const createAddedEvent = (itemId, price) => { const event = new ItemEvent() event.setAdded(new AddedItem([itemId, price])) return event } const createRemovedEvent = itemId => { const event = new ItemEvent() event.setRemoved(new RemovedItem([itemId])) return event } const publishEvent = event => { // toObject で JavaScript オブジェクトへ変換 console.log(`*** publish event: ${JSON.stringify(event.toObject())}`) subscribeList.forEach(s => s.write(event)) } const server = new grpc.Server() server.addService(ItemManageService, { addItem(call, callback) { const itemId = call.request.getItemId() const price = call.request.getPrice() const item = addItem(itemId, price) if (item) { callback(null, new Empty()) publishEvent(createAddedEvent(itemId, price)) } else { const err = { code: grpc.status.ALREADY_EXISTS, details: 'exists item' } callback(err) } }, removeItem(call, callback) { const itemId = call.request.getItemId() if (removeItem(itemId)) { callback(null, new Empty()) publishEvent(createRemovedEvent(itemId)) } else { const err = { code: grpc.status.NOT_FOUND, details: 'item not found' } callback(err) } }, getItem(call, callback) { const itemId = call.request.getItemId() const item = findItem(itemId) if (item) { callback(null, item) } else { const err = { code: grpc.status.NOT_FOUND, details: 'item not found' } callback(err) } }, subscribe(call) { console.log('*** subscribed') subscribeList.push(call) call.on('cancelled', () => { console.log('*** unsubscribed') subscribeList = subscribeList.filter(s => s != call) }) } }) server.bindAsync( ・・・ )
クライアント実装1
生成された型を使う点とメソッド名の先頭が小文字になっている点を除くと、基本的に (a) と同じです。
client.js
const grpc = require('@grpc/grpc-js') const { AddItemRequest, ItemRequest } = require('./generated/proto/item_pb') const { ItemManageClient } = require('./generated/proto/item_grpc_pb') const id = process.argv[2] const client = new ItemManageClient( '127.0.0.1:50051', grpc.credentials.createInsecure() ) const promisify = (obj, methodName) => args => new Promise((resolve, reject) => { ・・・ }) const addItem = promisify(client, 'addItem') const removeItem = promisify(client, 'removeItem') const getItem = promisify(client, 'getItem') const printItem = item => { console.log(`id = ${item.getItemId()}, price = ${item.getPrice()}`) } const run = async () => { await addItem(new AddItemRequest([`${id}_item-1`, 100])) const item1 = await getItem(new ItemRequest([`${id}_item-1`])) printItem(item1) await addItem(new AddItemRequest([`${id}_item-2`, 20])) const item2 = await getItem(new ItemRequest([`${id}_item-2`])) printItem(item2) await addItem(new AddItemRequest([`${id}_item-1`, 50])) .catch(err => console.error(`*** ERROR = ${err.message}`)) await removeItem(new ItemRequest([`${id}_item-1`])) await getItem(new ItemRequest([`${id}_item-1`])) .catch(err => console.error(`*** ERROR = ${err.message}`)) await removeItem(new ItemRequest([`${id}_item-2`])) } run().catch(err => console.error(err))
クライアント実装2
こちらも同様です。
client_subscribe.js
const grpc = require('@grpc/grpc-js') const { ItemSubscribeRequest } = require('./generated/proto/item_pb') const { ItemManageClient } = require('./generated/proto/item_grpc_pb') const client = new ItemManageClient( ・・・ ) const stream = client.subscribe(new ItemSubscribeRequest()) stream.on('data', event => { // toObject で JavaScript オブジェクトへ変換 console.log(`*** received event = ${JSON.stringify(event.toObject())}`) }) ・・・
動作確認
(a) と同じ操作を行った結果は以下のようになりました。
Server 出力内容
> node server.js start server: 50051 *** subscribed *** subscribed *** publish event: {"added":{"itemId":"a1_item-1","price":100}} *** publish event: {"added":{"itemId":"a1_item-2","price":20}} *** publish event: {"removed":{"itemId":"a1_item-1"}} *** publish event: {"removed":{"itemId":"a1_item-2"}} *** unsubscribed ^C
Client1 出力内容
> node client.js a1 id = a1_item-1, price = 100 id = a1_item-2, price = 20 *** ERROR = 6 ALREADY_EXISTS: exists item *** ERROR = 5 NOT_FOUND: item not found
Client2-1 出力内容
> node client_subscribe.js *** received event = {"added":{"itemId":"a1_item-1","price":100}} *** received event = {"added":{"itemId":"a1_item-2","price":20}} *** received event = {"removed":{"itemId":"a1_item-1"}} *** received event = {"removed":{"itemId":"a1_item-2"}} *** stream error: Error: 13 INTERNAL: Received RST_STREAM with code 2 (Internal server error) *** stream end
Client2-2 出力内容
> node client_subscribe.js *** received event = {"added":{"itemId":"a1_item-1","price":100}} *** received event = {"added":{"itemId":"a1_item-2","price":20}} *** received event = {"removed":{"itemId":"a1_item-1"}} *** received event = {"removed":{"itemId":"a1_item-2"}} ^C
(a) と (b) は同一の gRPC サービス(proto ファイル)を実装したものなので当然ですが、(a) と (b) を相互接続しても特に問題はありませんでした。
Deno で GraphQL
GraphQL を Deno で試してみました。
https://deno.land/x に Deno 用の GraphQL モジュールがいくつかありましたが(基本的には GraphQL.js のポーティング)、ここでは GraphQL.js を直接使う事にします。
今回のサンプルコードは http://github.com/fits/try_samples/tree/master/blog/20200817/
1. GraphQL.js の使用
GraphQL.js を Deno から使うには、以下のように Skypack(Pika CDN の後継)という CDN から import するのが簡単そうです。
import { ・・・ } from 'https://cdn.skypack.dev/graphql'
2. 単独実行
GraphQL.js を使って GraphQL の Query・Mutation・Subscription を一通り実行してみます。
まずは、buildSchema
関数に GraphQL の型定義を渡して GraphQLSchema
を取得します。
これをクエリや GraphQL 処理の実装と共に graphql
関数や subscribe
関数へ渡す事で処理結果を得ます。
graphql 関数の方は第二引数に文字列を使えましたが(型は Source | string
)、subscribe 関数の方は使えなかったので(型は DocumentNode
)、こちらは parse
した結果を渡すようにしています。
GraphQL 処理の実装(下記サンプルの root
変数の箇所)では、Query・Mutation・Subscription の処理に応じた関数を用意し、型定義に応じた値を返すように実装します。
Subscription の場合は、([Symbol.asyncIterator]
プロパティで AsyncIterator
を返す)AsyncIterable
を戻り値としなければならないようなので、下記では MessageBox クラスとして実装しています。
sample1.js
import { graphql, buildSchema, subscribe, parse } from 'https://cdn.skypack.dev/graphql' import { v4 } from 'https://deno.land/std/uuid/mod.ts' // GraphQL 型定義 const schema = buildSchema(` enum Category { Standard Extra } input CreateItem { category: Category! value: Int! } type Item { id: ID! category: Category! value: Int! } type Mutation { create(input: CreateItem!): Item } type Query { find(id: ID!): Item } type Subscription { created: Item } `) // Subscription 用の AsyncIterable class MessageBox { #messages = [] #resolves = [] publish(value) { const resolve = this.#resolves.shift() if (resolve) { resolve({ value }) // 以下でも可 // resolve({ value, done: false }) } else { this.#messages.push(value) } } [Symbol.asyncIterator]() { return { next: () => { console.log('** asyncIterator next') return new Promise(resolve => { const value = this.#messages.shift() if (value) { resolve({ value }) // 以下でも可 // resolve({ value, done: false }) } else { this.#resolves.push(resolve) } }) } } } } const store = {} const box = new MessageBox() // GraphQL 処理の実装 const root = { create: ({ input: { category, value } }) => { console.log(`* call create: category = ${category}, value = ${value}`) const id = `item-${v4.generate()}` const item = { id, category, value } store[id] = item box.publish({ created: item }) return item }, find: ({ id }) => { console.log(`* call find: ${id}`) return store[id] }, // Subscription の実装では戻り値を AsyncIterable とする created: () => box } const run = async () => { const m1 = ` mutation { create(input: { category: Standard, value: 10 }) { id } } ` // Mutation の実行1 const mr1 = await graphql(schema, m1, root) console.log(mr1) const m2 = ` mutation Create($p: CreateItem!) { create(input: $p) { id } } ` const vars = { p: { category: 'Extra', value: 123 } } // Mutation の実行2(変数利用) const mr2 = await graphql(schema, m2, root, null, vars) console.log(mr2) const q = ` { find(id: "${mr2.data.create.id}") { id category value } } ` // Query の実行 const qr = await graphql(schema, q, root) console.log(qr) const s = parse(` subscription { created { id category } } `) // Subscription 処理 const subsc = await subscribe(schema, s, root) for await (const r of subsc) { console.log('*** subscribe') console.log(r) } } run().catch(err => console.error(err))
実行結果は以下の通りです。
実行結果
> deno run sample1.js ・・・ * call create: category = Standard, value = 10 { data: { create: { id: "item-11ca8326-832b-4e13-9b47-d2c70c1e95e9" } } } * call create: category = Extra, value = 123 { data: { create: { id: "item-29f1851b-4cbb-4f23-872f-11856f1c0bf7" } } } * call find: item-29f1851b-4cbb-4f23-872f-11856f1c0bf7 { data: { find: { id: "item-29f1851b-4cbb-4f23-872f-11856f1c0bf7", category: "Extra", value: 123 } } } ** asyncIterator next *** subscribe { data: { created: { id: "item-11ca8326-832b-4e13-9b47-d2c70c1e95e9", category: "Standard" } } } ** asyncIterator next *** subscribe { data: { created: { id: "item-29f1851b-4cbb-4f23-872f-11856f1c0bf7", category: "Extra" } } } ** asyncIterator next
3. Web サーバー化
Deno の http を使って、上記処理を Web サーバー化してみました。
sample2.js
import { graphql, buildSchema, subscribe, parse } from 'https://cdn.skypack.dev/graphql' import { v4 } from 'https://deno.land/std/uuid/mod.ts' import { serve } from 'https://deno.land/std@0.65.0/http/server.ts' const schema = buildSchema(` enum Category { Standard Extra } ・・・ `) class MessageBox { ・・・ } const store = {} const box = new MessageBox() const root = { create: ({ input: { category, value } }) => { const id = `item-${v4.generate()}` const item = { id, category, value } store[id] = item box.publish({ created: item }) return item }, find: ({ id }) => { return store[id] }, created: () => box } // Web サーバー処理 const run = async () => { const server = serve({ port: 8080 }) // リクエスト毎の処理 for await (const req of server) { const buf = await Deno.readAll(req.body) const query = new TextDecoder().decode(buf) console.log(`* query: ${query}`) const res = await graphql(schema, query, root) req.respond({ body: JSON.stringify(res) }) } } const runSubscribe = async () => { const s = parse(` subscription { created { id category value } } `) const subsc = await subscribe(schema, s, root) for await (const r of subsc) { console.log(`*** subscribe: ${JSON.stringify(r)}`) } } Promise.all([ run(), runSubscribe() ]).catch(err => console.error(err))
実行
deno run で Web サーバーを起動します。
--allow-net
でネットワーク処理を許可する必要があります。
Web サーバー起動
> deno run --allow-net sample2.js
create を実行した結果です。
Mutation の実行
$ curl -s http://localhost:8080 -d 'mutation { create(input: { category: Extra, value: 5 }) { id } }' {"data":{"create":{"id":"item-7fc5e3bb-286f-48fe-b027-9ca34dcc6451"}}}
create で返された id を指定して find した結果です。
Query の実行1
$ curl -s http://localhost:8080 -d '{ find(id: "item-7fc5e3bb-286f-48fe-b027-9ca34dcc6451") { id category value } }' {"data":{"find":{"id":"item-7fc5e3bb-286f-48fe-b027-9ca34dcc6451","category":"Extra","value":5}}}
存在しない id を指定して find した結果です。
Query の実行2
$ curl -s http://localhost:8080 -d '{ find(id: "item-invalid") { id category value } }' {"data":{"find":null}}
サーバー側の出力結果は以下の通りです。
Web サーバー出力結果
> deno run --allow-net sample2.js ・・・ ** asyncIterator next * query: mutation { create(input: { category: Extra, value: 5 }) { id } } *** subscribe: {"data":{"created":{"id":"item-7fc5e3bb-286f-48fe-b027-9ca34dcc6451","category":"Extra","value":5}}} ** asyncIterator next * query: { find(id: "item-7fc5e3bb-286f-48fe-b027-9ca34dcc6451") { id category value } } * query: { find(id: "item-invalid") { id category value } }
rusty_v8 を使って Rust から JavaScript を実行
Node.js の製作者が新たに作り直した Deno という JavaScript/TypeScript 実行環境があります。
Deno の内部では、V8 JavaScript エンジンの呼び出しに rusty_v8 という Rust 用バインディングを使っていたので、今回はこの rusty_v8 を使って Rust コード内で JavaScript コードを実行してみました。
今回のサンプルコードは http://github.com/fits/try_samples/tree/master/blog/20200705/
設定
rusty_v8 を使うための Cargo 用の dependencies 設定は以下のようになります。
Cargo.toml
・・・
[dependencies]
rusty_v8 = "0.6"
JavaScript コード実行
以下の JavaScript コードを実行し結果(1 ~ 5 の合計値)を出力する処理を Rust で実装してみます。
実行する JavaScript コード
const vs = [1, 2, 3, 4, 5] console.log(vs) vs.reduce((acc, v) => acc + v, 0)
基本的に、下記 V8 API による手順と同様の処理を rusty_v8 の API で実装すればよさそうです。
V8 エンジンのインスタンスである Isolate
(独自のヒープを持ち他のインスタンスとは隔離される)、GC で管理するオブジェクトへの参照をまとめて管理する HandleScope
、サンドボックス化された実行コンテキストの Context
(組み込みのオブジェクト・関数を管理)をそれぞれ作成していきます。
そして、JavaScript のコードを Script::compile
でコンパイルして、run
で実行します。
run の戻り値は Option<Local<Value>>
となっているので、ここでは to_string
を使って JavaScript の String(Option<Local<rusty_v8::String>>
)として取得し、rusty_v8::String を to_rust_string_lossy
を使って Rust の String へ変換して出力しています。
src/sample1.rs
use rusty_v8 as v8; fn main() { let platform = v8::new_default_platform().unwrap(); v8::V8::initialize_platform(platform); v8::V8::initialize(); let isolate = &mut v8::Isolate::new(Default::default()); let scope = &mut v8::HandleScope::new(isolate); let context = v8::Context::new(scope); let scope = &mut v8::ContextScope::new(scope, context); // JavaScript コード let src = r#" const vs = [1, 2, 3, 4, 5] console.log(vs) vs.reduce((acc, v) => acc + v, 0) "#; v8::String::new(scope, src) .map(|code| { println!("code: {}", code.to_rust_string_lossy(scope)); code }) .and_then(|code| v8::Script::compile(scope, code, None)) //コンパイル .and_then(|script| script.run(scope)) //実行 .and_then(|value| value.to_string(scope)) // rusty_v8::Value を rusty_v8::String へ .iter() .for_each(|s| println!("result: {}", s.to_rust_string_lossy(scope))); }
実行
複数の実行ファイルに対応した下記 Cargo.toml を使って実行します。
Cargo.toml
[package] ・・・ default-run = "sample1" [dependencies] rusty_v8 = "0.6" [[bin]] name = "sample1" path = "src/sample1.rs" [[bin]] name = "sample2" path = "src/sample2.rs" [[bin]] name = "sample3" path = "src/sample3.rs"
sample1(src/sample1.rs)の実行結果は以下の通りです。
console.log
に関しては何も処理されていませんが、JavaScript の実行結果は取得できています。
sample1 実行結果
> cargo run ・・・ Running `target\debug\sample1.exe` code: const vs = [1, 2, 3, 4, 5] console.log(vs) vs.reduce((acc, v) => acc + v, 0) result: 15
Inspector 機能
次に、console.log
されたログメッセージを Rust から出力するようにしてみます。
V8 で console.log (デバッグコンソールへのログ出力)のようなデバッグ機能を処理するには Inspector という機能を使うようです。
rusty_v8 では、console.log 等の呼び出し時に V8InspectorClientImpl
トレイトの console_api_message
が呼びされるようになっているため、これを実装した構造体のインスタンスから V8Inspector
を作成して context_created
を実行する事で実現できます。
context_created のシグネチャは fn context_created(&mut self, context: Local<Context>, context_group_id: i32, human_readable_name: StringView)
となっており、context_group_id
引数へ指定した値が、console_api_message の context_group_id 引数の値となります。(human_readable_name
の用途に関してはよく分からなかった)
また、console_api_message の level
引数の値はログレベル(V8 API の MessageErrorLevel) のようです。
ちなみに、V8InspectorClientImpl トレイトは base
と base_mut
の実装が必須でした。(console_api_message は空実装されている)
src/sample2.rs
use rusty_v8 as v8; use rusty_v8::inspector::*; struct InspectorClient(V8InspectorClientBase); impl InspectorClient { fn new() -> Self { Self(V8InspectorClientBase::new::<Self>()) } } impl V8InspectorClientImpl for InspectorClient { fn base(&self) -> &V8InspectorClientBase { &self.0 } fn base_mut(&mut self) -> &mut V8InspectorClientBase { &mut self.0 } fn console_api_message(&mut self, _context_group_id: i32, _level: i32, message: &StringView, _url: &StringView, _line_number: u32, _column_number: u32, _stack_trace: &mut V8StackTrace) { // ログメッセージの出力 println!("{}", message); } } fn main() { ・・・ let isolate = &mut v8::Isolate::new(Default::default()); // V8Inspector の作成 let mut client = InspectorClient::new(); let mut inspector = V8Inspector::create(isolate, &mut client); let scope = &mut v8::HandleScope::new(isolate); let context = v8::Context::new(scope); let scope = &mut v8::ContextScope::new(scope, context); // context_created の実行 inspector.context_created(context, 1, StringView::empty()); let src = r#" const vs = [1, 2, 3, 4, 5] console.log(vs) vs.reduce((acc, v) => acc + v, 0) "#; ・・・ }
実行結果は以下の通り。
console.log(vs)
を処理して 1,2,3,4,5
が出力されるようになりました。
sample2 実行結果
> cargo run --bin sample2 ・・・ Running `target\debug\sample2.exe` code: const vs = [1, 2, 3, 4, 5] console.log(vs) vs.reduce((acc, v) => acc + v, 0) 1,2,3,4,5 result: 15
最後に、context_group_id
と level
の値も出力するようにしてみます。
src/sample3.rs
・・・ impl V8InspectorClientImpl for InspectorClient { ・・・ fn console_api_message(&mut self, context_group_id: i32, level: i32, message: &StringView, _url: &StringView, _line_number: u32, _column_number: u32, _stack_trace: &mut V8StackTrace) { println!( "*** context_group_id={}, level={}, message={}", context_group_id, level, message ); } } fn main() { ・・・ inspector.context_created(context, 123, StringView::empty()); let src = r#" console.log('log') console.debug('debug') console.info('info') console.error('error') console.warn('warn') "#; v8::String::new(scope, src) .and_then(|code| v8::Script::compile(scope, code, None)) .and_then(|script| script.run(scope)) .and_then(|value| value.to_string(scope)) .iter() .for_each(|s| println!("result: {}", s.to_rust_string_lossy(scope))); }
実行結果は以下の通りです。
sample3 実行結果
> cargo run --bin sample3 ・・・ Running `target\debug\sample3.exe` *** context_group_id=123, level=4, message=log *** context_group_id=123, level=2, message=debug *** context_group_id=123, level=4, message=info *** context_group_id=123, level=8, message=error *** context_group_id=123, level=16, message=warn result: undefined
Rust で WASI 対応の WebAssembly を作成して実行
Rust で WASI 対応の WebAssembly を作って、スタンドアロン実行や Web ブラウザ上での実行を試してみました。
WASI(WebAssembly System Interface) は WebAssembly のコードを様々なプラットフォームで実行するためのインターフェースで、これに対応した WebAssembly であれば Web ブラウザ外で実行できます。
Rust で WASI 対応の WebAssembly を作るのは簡単で、ビルドターゲットに wasm32-wasi
を追加しておいて、rustc
や cargo build
によるビルド時に --target wasm32-wasi
を指定するだけでした。
wasm32-wasi の追加
> rustup target add wasm32-wasi
標準出力へ文字列を出力するだけの下記サンプルコードを --target wasm32-wasi
でビルドすると sample1.wasm
ファイルが作られました。
sample1.rs
fn main() { for i in 1..=3 { println!("count-{}", i); } print!("aaa"); print!("bbb"); }
WASI 対応の WebAssembly として sample1.rs をビルド
> rustc --target wasm32-wasi sample1.rs
なお、今回のビルドに使用した Rust のバージョンは以下の通りです。
- Rust 1.43.0
また、使用したソースコードは http://github.com/fits/try_samples/tree/master/blog/20200429/ に置いてあります。
(1) スタンドアロン用ランタイムで実行
sample1.wasm を WebAssembly のランタイム wasmtime と wasmer でそれぞれ実行してみます。
(1-a) wasmtime で実行
wasmtime v0.15.0 による sample1.wasm 実行結果
> wasmtime sample1.wasm count-1 count-2 count-3 aaabbb
(1-b) wasmer で実行
wasmer v0.16.2 による sample1.wasm 実行結果
> wasmer sample1.wasm count-1 count-2 count-3 aaabbb
どちらのランタイムでも問題なく実行できました。
(2) Web ブラウザ上で実行
次は、sample1.wasm を外部ライブラリ等を使わずに Web ブラウザ上で実行してみます。
主要な Web ブラウザや Node.js は JavaScript 用の WebAssembly API に対応済みのため、WebAssembly を実行可能です。
WASI 対応 WebAssembly の場合、実行対象の WebAssembly がインポートしている WASI の関数(の実装)を WebAssembly インスタンス化関数(WebAssembly.instantiate()
や WebAssembly.instantiateStreaming()
)の第二引数(引数名 importObject)として渡す必要があるようです。
(2-a) WebAssembly のインポート内容を確認
WebAssembly.compile()
関数で取得した WebAssembly.Module
オブジェクトを WebAssembly.Module.imports()
関数へ渡す事で、その WebAssembly がインポートしている内容を取得できます。
ここでは、以下の Node.js スクリプトを使って WebAssembly のインポート内容を確認してみました。
wasm_listup_imports.js (WebAssembly のインポート内容を出力)
const fs = require('fs') const wasmFile = process.argv[2] const run = async () => { const module = await WebAssembly.compile(fs.readFileSync(wasmFile)) const imports = WebAssembly.Module.imports(module) console.log(imports) } run().catch(err => console.error(err))
sample1.wasm へ適用してみると以下のような結果となりました。
インポート内容の出力結果(Node.js v12.16.2 で実行)
> node wasm_listup_imports.js sample1.wasm [ { module: 'wasi_snapshot_preview1', name: 'proc_exit', kind: 'function' }, { module: 'wasi_snapshot_preview1', name: 'fd_write', kind: 'function' }, { module: 'wasi_snapshot_preview1', name: 'fd_prestat_get', kind: 'function' }, { module: 'wasi_snapshot_preview1', name: 'fd_prestat_dir_name', kind: 'function' }, { module: 'wasi_snapshot_preview1', name: 'environ_sizes_get', kind: 'function' }, { module: 'wasi_snapshot_preview1', name: 'environ_get', kind: 'function' } ]
この結果から、sample1.wasm は以下のようにしてインスタンス化できる事になります。
WebAssembly インスタンス化の例
const importObject = { wasi_snapshot_preview1: { proc_exit: () => {・・・}, fd_write: () => {・・・}, fd_prestat_get: () => {・・・}, fd_prestat_dir_name: () => {・・・}, environ_sizes_get: () => {・・・}, environ_get: () => {・・・} } } WebAssembly.instantiate(・・・, importObject) ・・・
(2-b) fd_write 関数の実装
Rust の println!
で呼び出される WASI の関数は fd_write
なので、これを実装してみます。
fd_write の引数は 4つで、第一引数 fd
は出力先のファイルディスクリプタで標準出力の場合は 1、第二引数 iovs
は出力内容へのポインタ、第三引数 iovsLen
は出力内容の数、第四引数 nwritten
は出力済みのバイト数を設定するポインタとなっています。
なお、ポインタの対象は WebAssembly.instantiate()
で取得した WebAssembly のインスタンスに含まれている WebAssembly.Memory です。
出力内容は iovs ポインタの位置から 4バイト毎に以下のような並びで情報が格納されているようなので、これを基に出力対象の文字列を取得して出力する事になります。
- 1個目の出力内容の格納先ポインタ(4バイト)
- 1個目の出力内容のバイトサイズ(4バイト)
- ・・・
- iovsLen 個目の出力内容の格納先ポインタ(4バイト)
- iovsLen 個目の出力内容のバイトサイズ(4バイト)
何処まで処理を行ったか(出力したか)を返すために、nwritten ポインタの位置へ出力の完了したバイトサイズを設定します。
fd_write の実装例(wasmInstance には WebAssembly のインスタンスを設定)
・・・ fd_write: (fd, iovs, iovsLen, nwritten) => { const memory = wasmInstance.exports.memory.buffer const view = new DataView(memory) const sizeList = Array.from(Array(iovsLen), (v, i) => { const ptr = iovs + i * 8 // 出力内容の格納先のポインタ取得 const bufStart = view.getUint32(ptr, true) // 出力内容のバイトサイズを取得 const bufLen = view.getUint32(ptr + 4, true) const buf = new Uint8Array(memory, bufStart, bufLen) // 出力内容の String 化 const msg = String.fromCharCode(...buf) // 出力 console.log(msg) return buf.byteLength }) // 出力済みのバイトサイズ合計 const totalSize = sizeList.reduce((acc, v) => acc + v) // 出力済みのバイトサイズを設定 view.setUint32(nwritten, totalSize, true) return 0 }, ・・・
最終的な HTML は下記のようになりました。
fd_write 以外の WASI 関数を空実装にして main
関数を呼び出して実行するようにしていますが、WASI の仕様としては _start
関数を呼び出すのが正しいようです ※。(WASI Application ABI 参照)
※ _start 関数を使う場合、fd_prestat_get 等の実装も必要となります
WebAssembly がインポートしている WASI 関数の実装をインスタンス化時(WebAssembly.instantiateStreaming
)に渡す事になりますが、WASI の関数(fd_write 等)はインスタンス化の結果を使って処理する点に注意が必要です。
index.html(main 関数版)
<!DOCTYPE html> <html> <head> <meta charset="utf-8" /> </head> <body> <h1>WASI WebAssembly Sample</h1> <div id="res"></div> <script> const WASM_URL = './sample1.wasm' const wasiObj = { wasmInstance: null, importObject: { wasi_snapshot_preview1: { fd_write: (fd, iovs, iovsLen, nwritten) => { console.log(`*** call fd_write: fd=${fd}, iovs=${iovs}, iovsLen=${iovsLen}, nwritten=${nwritten}`) const memory = wasiObj.wasmInstance.exports.memory.buffer const view = new DataView(memory) const sizeList = Array.from(Array(iovsLen), (v, i) => { const ptr = iovs + i * 8 const bufStart = view.getUint32(ptr, true) const bufLen = view.getUint32(ptr + 4, true) const buf = new Uint8Array(memory, bufStart, bufLen) const msg = String.fromCharCode(...buf) // 出力 console.log(msg) document.getElementById('res').innerHTML += `<p>${msg}</p>` return buf.byteLength }) const totalSize = sizeList.reduce((acc, v) => acc + v) view.setUint32(nwritten, totalSize, true) return 0 }, proc_exit: () => {}, fd_prestat_get: () => {}, fd_prestat_dir_name: () => {}, environ_sizes_get: () => {}, environ_get: () => {} } } } WebAssembly.instantiateStreaming(fetch(WASM_URL), wasiObj.importObject) .then(res => { console.log(res) // fd_write で参照できるようにインスタンスを wasmInstance へ設定 wasiObj.wasmInstance = res.instance // main 関数の実行 wasiObj.wasmInstance.exports.main() }) .catch(err => console.error(err)) </script> </body> </html>
main 関数の代わりに _start 関数を呼び出す場合は下記のようになりました。
_start 関数版の場合、fd_prestat_get
の実装が重要となります ※。
※ fd_prestat_get を正しく実装していないと、 fd_prestat_get の呼び出しが延々と繰り返されてしまいました
今回はファイル等を使っていないので(file descriptor 3
以降を開いていない)、fd_prestat_get は単に 8
(WASI_EBADF, Bad file descriptor)を返すだけで良さそうです。
index2.html(_start 関数版)
・・・ <script> const WASM_URL = './sample1.wasm' const wasiObj = { wasmInstance: null, importObject: { wasi_snapshot_preview1: { ・・・ fd_prestat_get: () => 8, ・・・ } } } WebAssembly.instantiateStreaming(fetch(WASM_URL), wasiObj.importObject) .then(res => { console.log(res) wasiObj.wasmInstance = res.instance // _start 関数の実行 wasiObj.wasmInstance.exports._start() }) .catch(err => console.error(err)) </script> ・・・
(2-c) 実行
上記の .html ファイルを Web ブラウザで直接開いても WebAssembly を実行できないため、HTTP サーバーを使う事になります。
更に、Web ブラウザ上で WebAssembly を実行するには、.wasm ファイルを MIME Type application/wasm
で取得する必要があるようです。
Python の http.server は application/wasm
に対応していたため(Python 3.8.2 と 3.7.6 で確認)、以下のスクリプトで HTTP サーバーを立ち上げる事にしました。
web_server.py
import http.server import socketserver PORT = 8080 Handler = http.server.SimpleHTTPRequestHandler with socketserver.TCPServer(("", PORT), Handler) as httpd: print(f"start server port:{PORT}") httpd.serve_forever()
HTTP サーバー起動(Python 3.8.2 で実行)
> python web_server.py start server port:8080
Web ブラウザ(Chrome)で http://localhost:8080/index.html
へアクセスしたところ(index2.html でも同様)、sample1.wasm の実行を確認できました。
Chrome の実行結果
(3) Node.js で組み込み実行
次は、Node.js で WebAssembly を組み込み実行してみます。
(3-a) fd_write 実装
上記 index2.html の処理をベースにローカルの .wasm ファイルを読み込んで実行するようにしました。
sample1.wasm のインポート内容に合わせたものなので、インポート内容の異なる WebAssembly の実行には使えません。
wasm_run_sample.js
const fs = require('fs') const WASI_ESUCCESS = 0; const WASI_EBADF = 8; // Bad file descriptor const wasmFile = process.argv[2] const wasiObj = { wasmInstance: null, importObject: { wasi_snapshot_preview1: { fd_write: (fd, iovs, iovsLen, nwritten) => { ・・・ const sizeList = Array.from(Array(iovsLen), (v, i) => { ・・・ process.stdout.write(msg) return buf.byteLength }) ・・・ return WASI_ESUCCESS }, ・・・ fd_prestat_get: (fd, bufPtr) => { console.log(`*** call fd_prestat_get: fd=${fd}, bufPtr=${bufPtr}`) return WASI_EBADF }, ・・・ } } } const buf = fs.readFileSync(wasmFile) WebAssembly.instantiate(buf, wasiObj.importObject) .then(res => { wasiObj.wasmInstance = res.instance wasiObj.wasmInstance.exports._start() }) .catch(err => console.error(err))
実行結果(Node.js v12.16.2 で実行)
> node wasm_run_sample.js sample1.wasm *** call fd_prestat_get : fd=3, bufPtr=1048568 *** call fd_write: fd=1, iovs=1047968, iovsLen=1, nwritten=1047948 count-1 *** call fd_write: fd=1, iovs=1047968, iovsLen=1, nwritten=1047948 count-2 *** call fd_write: fd=1, iovs=1047968, iovsLen=1, nwritten=1047948 count-3 *** call fd_write: fd=1, iovs=1048432, iovsLen=1, nwritten=1048412 aaabbb
(3-b) Wasmer-JS 使用
Wasmer-JS の @wasmer/wasi
モジュールを使って、もっと汎用的に組み込み実行できるようにしてみます。
@wasmer/wasi インストール例
> npm install @wasmer/wasi
@wasmer/wasi の WASI
を使う事で、インポート内容に合わせた WASI 関数の取得や _start 関数の呼び出しを任せる事ができます。
run_wasmer_js/index.js
const fs = require('fs') const { WASI } = require('@wasmer/wasi') const wasmFile = process.argv[2] const wasi = new WASI() const run = async () => { const module = await WebAssembly.compile(fs.readFileSync(wasmFile)) // インポート内容に合わせた WASI 関数の実装を取得 const importObject = wasi.getImports(module) const instance = await WebAssembly.instantiate(module, importObject) // 実行 wasi.start(instance) } run().catch(err => console.error(err))
実行結果(Node.js v12.16.2 で実行)
> node index.js ../sample1.wasm count-1 count-2 count-3 aaabbb
(4) 標準出力以外の機能
最後に、現時点でどんな機能を使えるのか気になったので、いくつか試してみました。
まず、TcpStream
を使ったコードの wasm32-wasi ビルドは一応成功しました。
sample2.rs
use std::net::TcpStream; fn main() { let res = TcpStream::connect("127.0.0.1:8080"); println!("{:?}", res); }
sample2.rs ビルド
> rustc --target wasm32-wasi sample2.rs
ただし、実行してみると以下のような結果となりました。(wasmtime 以外で実行しても同じ)
wasmtime による sample2.wasm 実行結果
> wasmtime sample2.wasm Err(Custom { kind: Other, error: "operation not supported on wasm yet" })
Rust のソースコードで該当(すると思われる)箇所を確認してみると、unsupported()
を返すようになっていました。
https://github.com/rust-lang/rust/blob/master/src/libstd/sys/wasi/net.rs(2020/4/26 時点)
・・・ impl TcpStream { pub fn connect(_: io::Result<&SocketAddr>) -> io::Result<TcpStream> { unsupported() } ・・・ } ・・・
https://github.com/rust-lang/rust/blob/master/src/libstd/sys/wasi/ のソースを確認してみると、(スレッド系の処理等)他にも未対応の機能がいくつもありました。
一方で、環境変数・システム時間・スリープ処理は使えそうだったので、以下のコードで確認してみました。
sample3.rs
use std::env; use std::thread::sleep; use std::time::{ Duration, SystemTime }; fn main() { // 環境変数 SLEEP_TIME からスリープする秒数を取得(デフォルトは 1) let sleep_sec = env::var("SLEEP_TIME").ok() .and_then(|v| v.parse::<u64>().ok()) .unwrap_or(1); // システム時間の取得 let time = SystemTime::now(); println!("start: sleep {}s", sleep_sec); // スリープの実施 sleep(Duration::from_secs(sleep_sec)); // 経過時間の出力 match time.elapsed() { Ok(s) => println!("end: elapsed {}s", s.as_secs()), Err(e) => println!("error: {:?}", e), } }
sample3.rs のビルド
> rustc --target wasm32-wasi sample3.rs
wasmtime では正常に実行できましたが、wasmer は今のところスリープ処理に対応していないようでエラーとなりました。
ちなみに、環境変数はどちらのコマンドも --env
で指定できました。
wasmtime v0.15.0 による sample3.wasm 実行結果
> wasmtime --env SLEEP_TIME=5 sample3.wasm start: sleep 5s end: elapsed 5s
wasmer v0.16.2 による sample3.wasm 実行結果
> wasmer sample3.wasm --env SLEEP_TIME=5 start: sleep 5s thread 'main' panicked at 'not yet implemented: Polling not implemented for clocks yet', lib\wasi\src\syscalls\mod.rs:2373:21 note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace. Error: error: "unhandled trap at 7fffd474a799 - code #e06d7363: unknown exception code"
Node.js のマイクロサービスフレームワーク Moleculer
Moleculer という名のなかなか期待できそうな Node.js 用マイクロサービスフレームワークを見つけたのでご紹介します。
ロードバランサー、サーキットブレーカー、メトリクス収集、トレーシング、キャッシュ機能等、多彩な機能を備えていますが。
個人的に、イベント駆動アーキテクチャを採用し Kafka や NATS 等のメッセージブローカーと容易に連携できる事から、コレオグラフィ型のマイクロサービスを手軽に作れそうな点に注目しています。
なんとなく Akka と似ている気もしますが、Akka を使ったマイクロサービスフレームワークの Lagom などと比べても、簡単かつ手軽に使えるのが利点かと思います。
今回のソースは http://github.com/fits/try_samples/tree/master/blog/20200224/
(1) ローカルアプリケーション
とりあえず簡単なローカルアプリケーションを作成してみます。
以下のモジュールを使うのでインストールしておきます。
- moleculer 0.14.2
moleculer インストール例
> npm install --save moleculer
Moleculer では、メソッド・アクション・イベントハンドリング等を定義した Service
を ServiceBroker
へ登録する事で処理を組み立てます。
ServiceBroker はノード毎にインスタンスを作る事になり、アクションの呼び出しやイベントの通知、別ノードとの連携等を担います。
アクションは call('<サービス名>.<アクション名>', <パラメータ>)
で呼び出す事ができ、アクションで return した値が call の戻り値となります。
イベントの発火(通知)は emit('<イベント名>', <イベント内容>)
や broadcast('<イベント名>', <イベント内容>)
※ で行います。
※ emit と broadcast の違いは、 broadcast は全ノードの全サービスへ通知されますが、 emit は同じサービスを複数のノードで実行していると その中の 1つのノードへ通知されます
order1.js
const { ServiceBroker } = require('moleculer') const broker = new ServiceBroker() broker.createService({ name: 'order-service', actions: { order(ctx) { console.log(`# order-service.order: ${JSON.stringify(ctx.params)}`) // order.ordered イベントの発火 ctx.emit('order.ordered', ctx.params) return ctx.params.id } } }) broker.createService({ name: 'order-check-service', events: { // order.ordered イベントのハンドリング 'order.ordered'(ctx) { console.log(`## order-check-service ${ctx.eventName}: ${JSON.stringify(ctx.params)}`) } } }) const run = async () => { await broker.start() const order1 = {id: 'order1', item: 'item1', qty: 2} // order アクションの呼び出し const res1 = await broker.call('order-service.order', order1) console.log(`order result: ${res1}`) } run().catch(err => console.error(err))
実行結果
> node order1.js [2020-02-18T14:08:35.336Z] INFO ・・・: Moleculer v0.14.2 is starting... ・・・ # order-service.order: {"id":"order1","item":"item1","qty":2} ## order-check-service order.ordered: {"id":"order1","item":"item1","qty":2} order result: order1 ・・・ [2020-02-18T14:08:35.440Z] INFO ・・・: ServiceBroker is stopped. Good bye.
上記の処理へサービスを 1つ追加してみます。
イベントのハンドリングではワイルドカード *
も使えるようになっていますので使ってみます。
order2.js
const { ServiceBroker } = require('moleculer') const broker = new ServiceBroker() broker.createService({ name: 'order-service', actions: { order(ctx) { console.log(`# order-service.order: ${JSON.stringify(ctx.params)}`) ctx.emit('order.ordered', ctx.params) return ctx.params.id } }, events: { // order.validated と order.invalidated イベントをハンドリング 'order.*validated'(ctx) { console.log(`# order-service ${ctx.eventName}: ${JSON.stringify(ctx.params)}`) } } }) broker.createService({ name: 'order-check-service', events: { async 'order.ordered'(ctx) { console.log(`## order-check-service ${ctx.eventName}: ${JSON.stringify(ctx.params)}`) // item-service の check アクション呼び出し const isValid = await ctx.call('item-service.check', ctx.params) if (isValid) { ctx.emit('order.validated', ctx.params) } else { ctx.emit('order.invalidated', ctx.params) } } } }) broker.createService({ name: 'item-service', actions: { check(ctx) { console.log(`### item-service.check: ${JSON.stringify(ctx.params)}`) return ['item1', 'item2'].includes(ctx.params.item) } } }) const run = async () => { await broker.start() const order1 = {id: 'order1', item: 'item1', qty: 2} const order2 = {id: 'order2', item: 'item3', qty: 1} const res1 = await broker.call('order-service.order', order1) console.log(`order result: ${res1}`) const res2 = await broker.call('order-service.order', order2) console.log(`order result: ${res2}`) } run().catch(err => console.error(err))
実行結果
> node order2.js ・・・ # order-service.order: {"id":"order1","item":"item1","qty":2} ## order-check-service order.ordered: {"id":"order1","item":"item1","qty":2} ### item-service.check: {"id":"order1","item":"item1","qty":2} order result: order1 # order-service.order: {"id":"order2","item":"item3","qty":1} ## order-check-service order.ordered: {"id":"order2","item":"item3","qty":1} ### item-service.check: {"id":"order2","item":"item3","qty":1} # order-service order.validated: {"id":"order1","item":"item1","qty":2} order result: order2 # order-service order.invalidated: {"id":"order2","item":"item3","qty":1} ・・・
(2) メッセージブローカー(NATS Streaming)利用
次に、別ノードのサービスとメッセージブローカー経由で連携してみます。
処理内容としては Web API を通して受け取ったメッセージを別ノードのサービスに call・emit・broadcast します。
ここでは、メッセージブローカーとして Go言語で実装されており手軽に実行できる NATS Streaming を使用します。
まずは、以下のモジュールを使うのでインストールしておきます。
- moleculer 0.14.2
- moleculer-web 0.9.0
- node-nats-streaming 0.2.6
モジュールインストール例
> npm install --save moleculer moleculer-web node-nats-streaming
Web API としてリクエストを受け付ける処理を実装します。
moleculer-web
を require した結果を mixins
して routes
で HTTP リクエストとそれに応じて実行するアクションをマッピングします。
NATS Streaming を使用するため transporter
に STAN
を設定します。
nodeID
はデフォルトでホスト名とプロセスIDから生成されるようになっていますが、他のノードとの接続に使うので、ここでは明示的に設定しています。
pub.js
const { ServiceBroker } = require('moleculer') const HTTPServer = require('moleculer-web') const broker = new ServiceBroker({ nodeID: 'pub', transporter: 'STAN' }) broker.createService({ name: 'web', mixins: [HTTPServer], settings: { routes: [ { aliases: { 'PUT /pub': 'pub.publish' }} ] } }) broker.createService({ name: 'pub', actions: { async publish(ctx) { const params = ctx.params // アクション呼び出し const res = await ctx.call('sub.direct', params) // イベントの emit ctx.emit('event.emit', params) // イベントの broadcast ctx.broadcast('event.broadcast', params) return res } } }) broker.start().catch(err => console.error(err))
次に、NATS Streaming 経由でアクションの呼び出しやイベント通知を受け取る処理を実装します。
こちらは複数プロセスで実行できるように nodeID をコマンド引数として渡すようにしています。
sub.js
const { ServiceBroker } = require('moleculer') const id = process.argv[2] const broker = new ServiceBroker({ nodeID: id, transporter: 'STAN' }) broker.createService({ name: 'sub', actions: { direct(ctx) { console.log(`* sub.direct: nodeID=${broker.nodeID}, params=${JSON.stringify(ctx.params)}`) return broker.nodeID } }, events: { 'event.*'(ctx) { console.log(`* ${ctx.eventName}: nodeID=${broker.nodeID}, params=${JSON.stringify(ctx.params)}`) } } }) broker.start().catch(err => console.error(err))
動作確認
まずは NATS Streaming を実行しておきます。
NATS Streaming 実行
> nats-streaming-server ・・・ [2668] 2020/02/23 19:27:33.566759 [[32mINF[0m] Server is ready [2668] 2020/02/23 19:27:33.591774 [[32mINF[0m] STREAM: Recovering the state... [2668] 2020/02/23 19:27:33.592779 [[32mINF[0m] STREAM: No recovered state [2668] 2020/02/23 19:27:33.843799 [[32mINF[0m] STREAM: Message store is MEMORY [2668] 2020/02/23 19:27:33.844764 [[32mINF[0m] STREAM: ---------- Store Limits ---------- [2668] 2020/02/23 19:27:33.846740 [[32mINF[0m] STREAM: Channels: 100 * [2668] 2020/02/23 19:27:33.848741 [[32mINF[0m] STREAM: --------- Channels Limits -------- [2668] 2020/02/23 19:27:33.849804 [[32mINF[0m] STREAM: Subscriptions: 1000 * [2668] 2020/02/23 19:27:33.850745 [[32mINF[0m] STREAM: Messages : 1000000 * [2668] 2020/02/23 19:27:33.852756 [[32mINF[0m] STREAM: Bytes : 976.56 MB * [2668] 2020/02/23 19:27:33.853740 [[32mINF[0m] STREAM: Age : unlimited * [2668] 2020/02/23 19:27:33.854745 [[32mINF[0m] STREAM: Inactivity : unlimited * [2668] 2020/02/23 19:27:33.857744 [[32mINF[0m] STREAM: ---------------------------------- [2668] 2020/02/23 19:27:33.858745 [[32mINF[0m] STREAM: Streaming Server is ready
pub.js を起動し、2つ sub.js を実行しておきます。
これによって、NATS Streaming へ MOL.DISCOVER.<nodeID>
、MOL.EVENT.<nodeID>
、MOL.REQ.<nodeID>
等のような様々なチャネルが登録され、ノード間の接続が確立します。
pub.js 実行
> node pub.js ・・・ [2020-02-23T10:29:58.028Z] INFO pub/REGISTRY: Strategy: RoundRobinStrategy ・・・ [2020-02-23T10:29:58.102Z] INFO pub/WEB: Register route to '/' [2020-02-23T10:29:58.158Z] INFO pub/WEB: PUT /pub => pub.publish ・・・ [2020-02-23T10:30:29.936Z] INFO pub/REGISTRY: Node 'sub1' connected. [2020-02-23T10:30:36.918Z] INFO pub/REGISTRY: Node 'sub2' connected.
sub.js 実行1(sub1)
> node sub.js sub1 ・・・ [2020-02-23T10:30:34.026Z] INFO sub1/REGISTRY: Node 'pub' connected. [2020-02-23T10:30:36.919Z] INFO sub1/REGISTRY: Node 'sub2' connected.
sub.js 実行2(sub2)
> node sub.js sub2 ・・・ [2020-02-23T10:30:39.027Z] INFO sub2/REGISTRY: Node 'pub' connected. [2020-02-23T10:30:40.111Z] INFO sub2/REGISTRY: Node 'sub1' connected.
この状態で 2回 HTTP リクエストを実施してみます。
HTTP リクエスト 1回目
$ curl -s -XPUT -H "Content-Type: application/json" http://localhost:3000/pub -d "{\"no\": 1}" "sub1"
HTTP リクエスト 2回目
$ curl -s -XPUT -H "Content-Type: application/json" http://localhost:3000/pub -d "{\"no\": 2}" "sub2"
1回目の call と emit は sub1 側で、2回目は sub2 側で実行しておりロードバランスされています(デフォルトではラウンドロビンとなる)
broadcast は sub1 と sub2 のどちらにも通知されています。
sub.js sub1 の結果
・・・ * sub.direct: nodeID=sub1, params={"no":1} * event.emit: nodeID=sub1, params={"no":1} * event.broadcast: nodeID=sub1, params={"no":1} * event.broadcast: nodeID=sub1, params={"no":2}
sub.js sub2 の結果
・・・ * event.broadcast: nodeID=sub2, params={"no":1} * sub.direct: nodeID=sub2, params={"no":2} * event.emit: nodeID=sub2, params={"no":2} * event.broadcast: nodeID=sub2, params={"no":2}
(3) MongoDB への CRUD を REST API 化
最後に、MongoDB へデータを CRUD する処理を REST API 化してみます。
以下のモジュールを使うのでインストールしておきます。
- moleculer 0.14.2
- moleculer-web 0.9.0
- node-nats-streaming 0.2.6
- moleculer-db 0.8.5
- moleculer-db-adapter-mongo 0.4.7
モジュールインストール例
> npm install --save moleculer moleculer-web node-nats-streaming moleculer-db moleculer-db-adapter-mongo
これまでは createService
でサービスを定義していましたが、ここでは別ファイルでサービスを定義する方法を試してみます。
ServiceBroker の loadServices
を使うと、デフォルトで services/xxx.service.js
(xxx は任意) ファイルをロードして登録してくれます。
server.js
const { ServiceBroker } = require('moleculer') const id = process.argv[2] const broker = new ServiceBroker({ nodeID: id, transporter: 'STAN' }) broker.loadServices() broker.start().catch(err => console.error(err))
REST API のサービスを実装します。
aliases で REST
とするとアクション側で "POST /" のような HTTP メソッドとパスを(rest
項目で)設定する事になります。
services/api.service.js
const HTTPServer = require('moleculer-web') module.exports = { name: 'api', mixins: [HTTPServer], settings: { routes: [ { aliases: { 'REST items': 'item' } } ] } }
moleculer-db
がデフォルトで REST API に対応した CRUD 処理をアクションとして定義してくれているので、今回はこれをそのまま使う事にします。
ここでは entityCreated
等を使って更新時のイベント通知のみを実装しています。
なお、MongoDB の接続文字列やコレクション名は環境変数で設定するようにしています。
services/item.service.js
const DbService = require('moleculer-db') const MongoDBAdapter = require('moleculer-db-adapter-mongo') const mongoUri = process.env['MONGO_URI'] const colName = process.env['MONGO_COLLECTION_NAME'] module.exports = { name: 'item', mixins: [DbService], adapter: new MongoDBAdapter(mongoUri, { useUnifiedTopology: true }), collection: colName, entityCreated(entity, ctx) { console.log(`entityCreated: ${JSON.stringify(entity)}`) ctx.emit('item.created', entity) }, entityUpdated(entity, ctx) { console.log(`entityUpdated: ${JSON.stringify(entity)}`) ctx.emit('item.updated', entity) }, entityRemoved(entity, ctx) { console.log(`entityRemoved: ${JSON.stringify(entity)}`) ctx.emit('item.removed', entity) } }
動作確認用にイベントをハンドリングする処理も用意しました。
handler.js
const { ServiceBroker } = require('moleculer') const id = process.argv[2] const broker = new ServiceBroker({ nodeID: id, transporter: 'STAN' }) broker.createService({ name: 'handler', events: { 'item.*'(ctx) { console.log(`* ${ctx.eventName}: nodeID=${broker.nodeID}, params=${JSON.stringify(ctx.params)}`) } } }) broker.start().catch(err => console.error(err))
動作確認
MongoDB と NATS Streaming を起動しておきます。
MongoDB 実行
> mongod --dbpath itemdb
NATS Streaming 実行
> nats-streaming-server
接続先の MongoDB やコレクション名を環境変数で設定してから、server.js を実行します。
server.js 実行
> set MONGO_URI=mongodb://localhost/sample > set MONGO_COLLECTION_NAME=items > node server.js server1 ・・・ [2020-02-23T11:41:06.978Z] INFO server1/API: Register route to '/' [2020-02-23T11:41:07.031Z] INFO server1/API: GET /items => item.list [2020-02-23T11:41:07.034Z] INFO server1/API: GET /items/:id => item.get [2020-02-23T11:41:07.035Z] INFO server1/API: POST /items => item.create [2020-02-23T11:41:07.036Z] INFO server1/API: PUT /items/:id => item.update [2020-02-23T11:41:07.037Z] INFO server1/API: PATCH /items/:id => item.patch [2020-02-23T11:41:07.038Z] INFO server1/API: DELETE /items/:id => item.remove ・・・ [2020-02-23T11:41:47.389Z] INFO server1/REGISTRY: Node 'handler1' connected.
イベントハンドラも別途実行しておきます。
handler.js 実行
> node handler.js handler1 ・・・ [2020-02-23T11:41:48.322Z] INFO handler1/REGISTRY: Node 'server1' connected.
準備が整ったので REST API を呼び出して CRUD 処理を順番に呼び出してみます。
create 実施
$ curl -s -XPOST -H "Content-Type: application/json" http://localhost:3000/items -d "{\"name\": \"item1\", \"qty\": 1}" {"name":"item1","qty":1,"_id":"5e5266a735ebff2c9083a0af"}
read 実施
$ curl -s http://localhost:3000/items/5e5266a735ebff2c9083a0af {"_id":"5e5266a735ebff2c9083a0af","name":"item1","qty":1}
update 実施
$ curl -s -XPUT -H "Content-Type: application/json" http://localhost:3000/items/5e5266a735ebff2c9083a0af -d "{\"qty\": 2}" {"_id":"5e5266a735ebff2c9083a0af","name":"item1","qty":2}
delete 実施
$ curl -s -XDELETE http://localhost:3000/items/5e5266a735ebff2c9083a0af {"_id":"5e5266a735ebff2c9083a0af","name":"item1","qty":2}
read 実施
$ curl -s http://localhost:3000/items/5e5266a735ebff2c9083a0af { "name": "EntityNotFoundError", "message": "Entity not found", "code": 404, "type": null, "data": { "id": "5e5266a735ebff2c9083a0af" } }
問題なく動作しているようです。 handler.js の出力結果も以下のようになりました。
handler.js の出力結果
・・・ * item.created: nodeID=handler1, params={"name":"item1","qty":1,"_id":"5e5266a735ebff2c9083a0af"} * item.updated: nodeID=handler1, params={"_id":"5e5266a735ebff2c9083a0af","name":"item1","qty":2} * item.removed: nodeID=handler1, params={"_id":"5e5266a735ebff2c9083a0af","name":"item1","qty":2}