SeaORM でテーブル作成とデータ操作

githubSeaORM という Rust 用の ORM を見つけたので軽く試してみました。

sea-orm-cli というツールを使うと、既存のテーブルから Entity 定義を自動生成してくれるようですが、ここでは自前で定義した Entity を基にテーブル作成とデータ操作(INSERT, SELECT)を実施してみました。

今回のソースは こちら

1. はじめに

Cargo.toml へ依存定義を設定します。

sea-orm の features で DB のドライバーと非同期ランタイムを指定する事になります。

今回は MySQL/MariaDB へ接続するため DB ドライバーは sqlx-mysql、 非同期ランタイムは async-std で TLS の Rust 実装を用いる事にしたので runtime-async-std-rustls としています。

Cargo.toml
・・・
[dependencies]
sea-orm = { version = "0.8", features = ["sqlx-mysql", "runtime-async-std-rustls", "macros" ] }
async-std = { version = "1", features = ["attributes"] }

2. Entity 定義

ここでは、以下のようなテーブル内容を想定した Entity 定義を行います。

CREATE TABLE tasks (
  id int(10) unsigned NOT NULL AUTO_INCREMENT,
  subject varchar(255) NOT NULL,
  status enum('ready','completed') NOT NULL,
  PRIMARY KEY (id)
)

Entity 定義は DeriveEntityModel を derive した Model という名の struct を定義すれば良さそうです。

そうすると、DeriveRelation を derive した Relation enumimpl ActiveModelBehavior for ActiveModel の定義が必要となりますが、今回は特に使わないので空実装としておきます。

status カラムを DB の enum 型とするため、DeriveActiveEnum を derive した Status enum を別途定義しています。

あとは、sea_orm でテーブル名やプライマリキー、status カラムの enum 値(DB 側)等の指定を行っています。

task.rs
use sea_orm::entity::prelude::*;

#[derive(Clone, Debug, PartialEq, DeriveEntityModel)]
#[sea_orm(table_name = "tasks")]
pub struct Model {
    #[sea_orm(primary_key)]
    pub id: u32,
    pub subject: String,
    pub status: Status,
}

#[derive(Clone, Debug, PartialEq, EnumIter, DeriveActiveEnum)]
#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "status")]
pub enum Status {
    #[sea_orm(string_value = "ready")]
    Ready,
    #[sea_orm(string_value = "completed")]
    Completed,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}

impl ActiveModelBehavior for ActiveModel {}

3. 処理の実装

DB 接続

Database::connect へ DB の接続文字列 mysql://user:password@host/db を渡す事で DB へ接続します。

ここでは、環境変数から DB の接続文字列を取得するようにしてみました。

let db_uri = env::var("DB_URI")?;
let db = Database::connect(db_uri).await?;

テーブル作成

create_table_from_entity で Entity 定義から Create Table 文を作成できます。

get_database_backend で取得したバックエンドの build を用いる事で、 接続する DB(今回は MySQL/MariaDB)用の Create Table 文を取得できるので execute でテーブル作成を実施します。

let backend = db.get_database_backend();
let schema = Schema::new(backend);

let st = backend.build(&schema.create_table_from_entity(Task));
db.execute(st).await?;

INSERT

INSERT する Entity データの生成に ActiveModel を使い、値を ActiveValue で設定します。

id は自動採番を使うため NotSet としています。(明示的に設定しても可)

ActiveModel の insert を呼び出す事で INSERT を実施します。

let t1 = task::ActiveModel {
    id: ActiveValue::NotSet,
    subject: ActiveValue::Set("task1".to_owned()),
    status: ActiveValue::Set(task::Status::Ready),
};

let r1 = t1.insert(&db).await?;
println!("{:?}", r1);

なお、serde_jsonActiveModel::from_json を使う事で、JSON から生成する事も可能でした。

SELECT

find() で SELECT を実施します。 all を使う事で対象となる全レコードを取得できるようです。

let rows = Task::find().all(&db).await?;
println!("{:?}", rows);

上記処理を合わせた、最終的なコードは以下のようになりました。

main.rs
mod task;

use sea_orm::*;
use std::env;

use task::Entity as Task;

type Error = Box<dyn std::error::Error>;

#[async_std::main]
async fn main() -> Result<(), Error> {
    let db_uri = env::var("DB_URI")?;
    let db = Database::connect(db_uri).await?;

    let backend = db.get_database_backend();
    let schema = Schema::new(backend);

    let st = backend.build(&schema.create_table_from_entity(Task));
    db.execute(st).await?;

    let t1 = task::ActiveModel {
        id: ActiveValue::NotSet,
        subject: ActiveValue::Set("task1".to_owned()),
        status: ActiveValue::Set(task::Status::Ready),
    };

    let r1 = t1.insert(&db).await?;
    println!("{:?}", r1);

    let t2 = task::ActiveModel {
        id: ActiveValue::NotSet,
        subject: ActiveValue::Set("task2".to_owned()),
        status: ActiveValue::Set(task::Status::Completed),
    };

    let r2 = t2.insert(&db).await?;
    println!("{:?}", r2);

    let rows = Task::find().all(&db).await?;
    println!("{:?}", rows);

    Ok(())
}

4. 動作確認

MariaDB へ DB を作成しておきます。

DB 作成
MariaDB [(none)]> CREATE DATABASE sample1;

環境変数へ DB 接続文字列を設定し、実行します。

実行
> set DB_URI=mysql://root:@localhost/sample1
> cargo run
・・・
Model { id: 1, subject: "task1", status: Ready }
Model { id: 2, subject: "task2", status: Completed }
[Model { id: 1, subject: "task1", status: Ready }, Model { id: 2, subject: "task2", status: Completed }]

正常に実行できました。

ついでに、テーブルとレコード内容を確認してみると以下のようになりました。

テーブルとレコード内容
MariaDB [sample1]> SHOW CREATE TABLE tasks \G
*************************** 1. row ***************************
       Table: tasks
Create Table: CREATE TABLE `tasks` (
  `id` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `subject` varchar(255) NOT NULL,
  `status` enum('ready','completed') NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=latin1
1 row in set (0.000 sec)

MariaDB [sample1]> select * from tasks;
+----+---------+-----------+
| id | subject | status    |
+----+---------+-----------+
|  1 | task1   | ready     |
|  2 | task2   | completed |
+----+---------+-----------+
2 rows in set (0.001 sec)

juniper による GraphQL の処理を WebAssembly 化する

juniper を使った GraphQL の処理を WebAssembly 化し、Deno 用の JavaScript で実行してみました。

ソースコードhttps://github.com/fits/try_samples/tree/master/blog/20220224/

はじめに

今回は wasm-bindgenwasm-pack を使いません。(wasm-bindgen 版のサンプルは こちら

そのため、これらの便利ツールが上手くやってくれている箇所を自前で実装する必要があり、以下が課題となります。

  • 所有権による値の破棄にどう対処するか

ここでは、Box::into_raw を使う事でメモリー解放(値の破棄)の実行責任を呼び出し側(今回は JavaScript)に移す事で対処します。

(1) GraphQL 処理の WebAssembly 化

Cargo.toml は以下のように設定しました。

Cargo.toml
[package]
name = "sample"
version = "0.1.0"
edition = "2021"

[lib]
crate-type = ["cdylib"]

[dependencies]
juniper = "0.15"

GraphQL 処理

GraphQL の処理は juniper を使って普通に実装します。 HashMap を使って id 毎に Item を管理するだけの単純な作りにしました。

src/lib.rs (GraphQL 処理部分)
use juniper::{execute_sync, EmptySubscription, FieldError, FieldResult, Variables};

use std::collections::HashMap;
use std::sync::RwLock;

#[derive(Default, Debug)]
struct Store {
    store: RwLock<HashMap<String, Item>>,
}

impl juniper::Context for Store {}

#[derive(Debug, Clone, juniper::GraphQLObject)]
struct Item {
    id: String,
    value: i32,
}

#[derive(Debug, Clone, juniper::GraphQLInputObject)]
struct CreateItem {
    id: String,
    value: i32,
}

・・・

#[derive(Debug)]
struct Query;

#[juniper::graphql_object(context = Store)]
impl Query {
    fn find(ctx: &Store, id: String) -> FieldResult<Item> {
        ・・・
    }
}

#[derive(Debug)]
struct Mutation;

#[juniper::graphql_object(context = Store)]
impl Mutation {
    fn create(ctx: &Store, input: CreateItem) -> FieldResult<Item> {
        ・・・
    }
}

type Schema = juniper::RootNode<'static, Query, Mutation, EmptySubscription<Store>>;

・・・

WebAssembly 用の処理

ここからが本題です。

基本的に、WebAssembly とランタイム(今回は JavaScript のコード)間で文字列等を直接受け渡したりはできません。

共有メモリー ※ にデータを書き込み、その位置(ポインタ)やバイトサイズをやり取りする事で文字列等の受け渡しを実施する事になります。

 ※ JavaScript 側だと WebAssembly インスタンスの exports.memory.buffer

ここでは、JavaScript からメモリー領域の確保や破棄を実施するために以下のような処理を用意しました。

  • (a) データを保持する HashMap と GraphQL のスキーマを含む Context の生成と破棄
  • (b) GraphQL の入力文字列を書き込む領域の確保と破棄
  • (c) GraphQL 処理結果(文字列)の破棄とポインタやサイズの取得 ※

なお、(a) の Context や (b) と (c) の文字列が Rust(WebAssembly)側で勝手に破棄されては困るので、そうならないように Box::into_raw を使います。

Box::into_raw で取得した raw pointer は Box::from_raw で Box へ戻して処理する事になりますが、そのままだとスコープを外れた際にデストラクタ(drop メソッド)が呼び出されて破棄されてしまうので、そうされたくない場合は再度 Box::into_raw します。

query の戻り値である *mut String 型の raw pointer は、文字列を格納している位置では無いため、(c) では文字列の格納位置(ポインタ)を _result_ptr で、そのバイトサイズを _result_size でそれぞれ取得するようにしてみました。

また、(b) では slice を使う事で、文字列の位置をやり取りできるようにしています。

ちなみに、関数名の先頭に _ を付けたりしていますが、特に意味はありません。(単なる見た目上の区別のためだけ)

これでとりあえずは動作しましたが、処理的に何か問題があるかもしれませんのでご注意ください。

src/lib.rs (WebAssembly 用の処理部分)
・・・
struct Context {
    context: Store,
    schema: Schema,
}

// (a) Context の生成
#[no_mangle]
extern fn open() -> *mut Context {
    let context = Store::default();
    let schema = Schema::new(Query, Mutation, EmptySubscription::new());

    let d = Box::new(Context{ context, schema });
    // Context の raw pointer を返す
    Box::into_raw(d)
}
// (a) Context の破棄
#[no_mangle]
extern fn close(ptr: *mut Context) {
    unsafe {
        Box::from_raw(ptr);
    }
}

// (b) GraphQL 入力文字列の領域を確保
#[no_mangle]
extern fn _new_string(size: usize) -> *mut u8 {
    let v = vec![0; size];

    Box::into_raw(v.into_boxed_slice()) as *mut u8
}
// (b) GraphQL 入力文字列の破棄
#[no_mangle]
extern fn _drop_string(ptr: *mut u8) {
    unsafe {
        Box::from_raw(ptr);
    }
}

// (c) GraphQL 結果文字列の破棄
#[no_mangle]
extern fn _drop_result(ptr: *mut String) {
    unsafe {
        Box::from_raw(ptr);
    }
}
// (c) GraphQL 結果文字列のポインタ取得
#[no_mangle]
extern fn _result_ptr(ptr: *mut String) -> *const u8 {
    unsafe {
        let s = Box::from_raw(ptr);
        let r = s.as_ptr();
        // 結果文字列を破棄させないための措置
        Box::into_raw(s);

        r
    }
}
// (c) GraphQL 結果文字列のバイトサイズ取得
#[no_mangle]
extern fn _result_size(ptr: *mut String) -> usize {
    unsafe {
        let s = Box::from_raw(ptr);
        let r = s.len();
        // 結果文字列を破棄させないための措置
        Box::into_raw(s);

        r
    }
}

// GraphQL のクエリー処理
#[no_mangle]
extern fn query(ptr: *mut Context, sptr: *const u8, len: usize) -> *mut String {
    unsafe {
        // Context の取得
        let ctx = Box::from_raw(ptr);

        // GraphQL の入力文字列を取得
        let slice = std::slice::from_raw_parts(sptr, len);
        let q = std::str::from_utf8_unchecked(slice);

        // GraphQL の処理実行
        let r = execute_sync(q, None, &ctx.schema, &Variables::new(), &ctx.context);

        // 処理結果の文字列化
        let msg = match r {
            Ok((v, _)) => format!("{}", v),
            Err(e) => format!("{}", e),
        };
        // Context を破棄させないための措置
        Box::into_raw(ctx);
        // 結果文字列の raw pointer を返す
        Box::into_raw(Box::new(msg))
    }
}

ビルド

WASI を使っていないので --target を wasm32-unknown-unknown にしてビルドします。

ビルド例
> cargo build --release --target wasm32-unknown-unknown

(2) ランタイムの実装

(1) で作成した WebAssembly を呼び出す処理を Deno 用の JavaScript で実装してみました。

run_wasm_deno.js
const wasmFile = 'target/wasm32-unknown-unknown/release/sample.wasm'
// 処理結果の取得
const toResult = (wasm, retptr) => {
    const sptr = wasm.exports._result_ptr(retptr) 
    const len = wasm.exports._result_size(retptr)

    const memory = wasm.exports.memory.buffer

    const buf = new Uint8Array(memory, sptr, len)
    // JavaScript 上で文字列化
    const res = new TextDecoder('utf-8').decode(buf)

    return JSON.parse(res)
}

const query = (wasm, ptr, q) => {
    const buf = new TextEncoder('utf-8').encode(q)
    // 入力文字列用の領域確保
    const sptr = wasm.exports._new_string(buf.length)

    try {
        // 入力文字列の書き込み
        new Uint8Array(wasm.exports.memory.buffer).set(buf, sptr)
        // GraphQL の実行
        const retptr = wasm.exports.query(ptr, sptr, buf.length)

        try {
            return toResult(wasm, retptr)
        } finally {
            // 処理結果の破棄
            wasm.exports._drop_result(retptr)
        }
    } finally {
        // 入力文字列の破棄
        wasm.exports._drop_string(sptr)
    }
}

const buf = await Deno.readFile(wasmFile)
const module = await WebAssembly.compile(buf)
// WebAssembly のインスタンス化
const wasm = await WebAssembly.instantiate(module, {})

// Context の作成
const ctxptr = wasm.exports.open()
// GraphQL を処理して結果を出力
const queryAndShow = (q) => {
    console.log( query(wasm, ctxptr, q) )
}

try {
    queryAndShow(`
        mutation {
            create(input: { id: "item-1", value: 12 }) {
                id
            }
        }
    `)

    queryAndShow(`
        mutation {
            create(input: { id: "item-2", value: 34 }) {
                id
            }
        }
    `)

    queryAndShow(`
        query {
            find(id: "item-1") {
                id
                value
            }
        }
    `)

    queryAndShow(`
        {
            find(id: "item-2") {
                id
                value
            }
        }
    `)

    queryAndShow(`
        {
            find(id: "item-3") {
                id
            }
        }
    `)
} finally {
    // Context の破棄
    wasm.exports.close(ctxptr)
}

実行

実行すると以下のような結果になりました。

実行例
> deno run --allow-read run_wasm_deno.js
{ create: { id: "item-1" } }
{ create: { id: "item-2" } }
{ find: { id: "item-1", value: 12 } }
{ find: { id: "item-2", value: 34 } }
null

辞書ベースの日本語 Tokenizer - Kuromoji, Sudachi, Fugashi, Kagome, Lindera

辞書をベースに処理する日本語 Tokenizer のいくつかをコードを書いて実行してみました。

  • (a) Lucene Kuromoji
  • (b) atilika Kuromoji
  • (c) Sudachi
  • (d) Kuromoji.js
  • (e) Fugashi
  • (f) Kagome
  • (g) Lindera

今回は以下の文を処理して分割された単語と品詞を出力します。

処理対象文
 WebAssemblyがサーバーレス分野へ大きな影響を与えるだろうと答えた回答者は全体の56%だった。

システム辞書だけを使用し、分割モードを指定する場合は固有名詞などをそのままにする(細かく分割しない)モードを選ぶ事にします。

ソースコードhttps://github.com/fits/try_samples/tree/master/blog/20220106/

(a) Lucene Kuromoji

Lucene に組み込まれた Kuromoji で Elasticsearch や Solr で使われます。

kuromoji.gradle を見ると、システム辞書は以下のどちらかを使うようになっているようです。

a1

lucene-analyzers-kuromoji の JapaneseTokenizer を使います。 辞書は IPADIC のようです。

lucene/a1.groovy
@Grab('org.apache.lucene:lucene-analyzers-kuromoji:8.11.1')
import org.apache.lucene.analysis.ja.JapaneseTokenizer;
import org.apache.lucene.analysis.ja.JapaneseTokenizer.Mode
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute
import org.apache.lucene.analysis.ja.tokenattributes.PartOfSpeechAttribute

def text = args[0]

new JapaneseTokenizer(null, false, Mode.NORMAL).withCloseable { tokenizer ->
    def term = tokenizer.addAttribute(CharTermAttribute)
    def pos = tokenizer.addAttribute(PartOfSpeechAttribute)

    tokenizer.reader = new StringReader(text)
    tokenizer.reset()

    while(tokenizer.incrementToken()) {
        println "term=${term}, partOfSpeech=${pos.partOfSpeech}"
    }
}
a1 結果
> groovy a1.groovy "WebAssemblyがサーバーレス分野へ大きな影響を与えるだろうと答えた回答者は全体の56%だった。"

term=WebAssembly, partOfSpeech=名詞-固有名詞-組織
term=が, partOfSpeech=助詞-格助詞-一般
term=サーバー, partOfSpeech=名詞-一般
term=レス, partOfSpeech=名詞-サ変接続
term=分野, partOfSpeech=名詞-一般
term=へ, partOfSpeech=助詞-格助詞-一般
term=大きな, partOfSpeech=連体詞
term=影響, partOfSpeech=名詞-サ変接続
term=を, partOfSpeech=助詞-格助詞-一般
term=与える, partOfSpeech=動詞-自立
term=だろ, partOfSpeech=助動詞
term=う, partOfSpeech=助動詞
term=と, partOfSpeech=助詞-格助詞-引用
term=答え, partOfSpeech=動詞-自立
term=た, partOfSpeech=助動詞
term=回答, partOfSpeech=名詞-サ変接続
term=者, partOfSpeech=名詞-接尾-一般
term=は, partOfSpeech=助詞-係助詞
term=全体, partOfSpeech=名詞-副詞可能
term=の, partOfSpeech=助詞-連体化
term=5, partOfSpeech=名詞-数
term=6, partOfSpeech=名詞-数
term=%, partOfSpeech=名詞-接尾-助数詞
term=だっ, partOfSpeech=助動詞
term=た, partOfSpeech=助動詞
term=。, partOfSpeech=記号-句点

a2

org.codelibs が上記の ipadic-neologd 版を提供していたので、ついでに試してみました。

処理内容はそのままで、モジュールとパッケージ名を変えるだけです。

lucene/a2.groovy
@GrabResolver('https://maven.codelibs.org/')
@Grab('org.codelibs:lucene-analyzers-kuromoji-ipadic-neologd:8.2.0-20200120')
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute
import org.codelibs.neologd.ipadic.lucene.analysis.ja.JapaneseTokenizer
import org.codelibs.neologd.ipadic.lucene.analysis.ja.JapaneseTokenizer.Mode
import org.codelibs.neologd.ipadic.lucene.analysis.ja.tokenattributes.PartOfSpeechAttribute

def text = args[0]

new JapaneseTokenizer(null, false, Mode.NORMAL).withCloseable { tokenizer ->
    ・・・
}
a2 結果
> groovy a2.groovy "WebAssemblyがサーバーレス分野へ大きな影響を与えるだろうと答えた回答者は全体の56%だった。"

term=WebAssembly, partOfSpeech=名詞-固有名詞-組織
term=が, partOfSpeech=助詞-格助詞-一般
term=サーバーレス, partOfSpeech=名詞-固有名詞-一般
term=分野, partOfSpeech=名詞-一般
term=へ, partOfSpeech=助詞-格助詞-一般
term=大きな, partOfSpeech=連体詞
term=影響, partOfSpeech=名詞-サ変接続
term=を, partOfSpeech=助詞-格助詞-一般
term=与える, partOfSpeech=動詞-自立
term=だろ, partOfSpeech=助動詞
term=う, partOfSpeech=助動詞
term=と, partOfSpeech=助詞-格助詞-引用
term=答え, partOfSpeech=動詞-自立
term=た, partOfSpeech=助動詞
term=回答者, partOfSpeech=名詞-固有名詞-一般
term=は, partOfSpeech=助詞-係助詞
term=全体, partOfSpeech=名詞-副詞可能
term=の, partOfSpeech=助詞-連体化
term=5, partOfSpeech=名詞-数
term=6, partOfSpeech=名詞-数
term=%, partOfSpeech=名詞-接尾-助数詞
term=だっ, partOfSpeech=助動詞
term=た, partOfSpeech=助動詞
term=。, partOfSpeech=記号-句点

a1 と違って "サーバーレス" や "回答者" となりました。

(b) atilika Kuromoji

https://github.com/atilika/kuromoji

Lucene Kuromoji のベースとなった Kuromoji。 更新は途絶えているようですが、色々な辞書に対応しています。

ここでは以下の 2種類の辞書を試してみました。

  • UniDic(2.1.2)
  • JUMAN(7.0-20130310)

b1

まずは、UniDic 版です。

kuromoji/b1.groovy
@Grab('com.atilika.kuromoji:kuromoji-unidic:0.9.0')
import com.atilika.kuromoji.unidic.Tokenizer

def text = args[0]
def tokenizer = new Tokenizer()

tokenizer.tokenize(args[0]).each {
    def pos = [
        it.partOfSpeechLevel1,
        it.partOfSpeechLevel2,
        it.partOfSpeechLevel3,
        it.partOfSpeechLevel4
    ]

    println "term=${it.surface}, partOfSpeech=${pos}"
}
b1 結果
> groovy b1.groovy "WebAssemblyがサーバーレス分野へ大きな影響を与えるだろうと答えた回答者は全体の56%だ った。"

term=WebAssembly, partOfSpeech=[名詞, 普通名詞, 一般, *]
term=が, partOfSpeech=[助詞, 格助詞, *, *]
term=サーバー, partOfSpeech=[名詞, 普通名詞, 一般, *]
term=レス, partOfSpeech=[名詞, 普通名詞, 一般, *]
term=分野, partOfSpeech=[名詞, 普通名詞, 一般, *]
term=へ, partOfSpeech=[助詞, 格助詞, *, *]
term=大きな, partOfSpeech=[連体詞, *, *, *]
term=影響, partOfSpeech=[名詞, 普通名詞, サ変可能, *]
term=を, partOfSpeech=[助詞, 格助詞, *, *]
term=与える, partOfSpeech=[動詞, 一般, *, *]
term=だろう, partOfSpeech=[助動詞, *, *, *]
term=と, partOfSpeech=[助詞, 格助詞, *, *]
term=答え, partOfSpeech=[動詞, 一般, *, *]
term=た, partOfSpeech=[助動詞, *, *, *]
term=回答, partOfSpeech=[名詞, 普通名詞, サ変可能, *]
term=者, partOfSpeech=[接尾辞, 名詞的, 一般, *]
term=は, partOfSpeech=[助詞, 係助詞, *, *]
term=全体, partOfSpeech=[名詞, 普通名詞, 一般, *]
term=の, partOfSpeech=[助詞, 格助詞, *, *]
term=5, partOfSpeech=[名詞, 数詞, *, *]
term=6, partOfSpeech=[名詞, 数詞, *, *]
term=%, partOfSpeech=[名詞, 普通名詞, 助数詞可能, *]
term=だっ, partOfSpeech=[助動詞, *, *, *]
term=た, partOfSpeech=[助動詞, *, *, *]
term=。, partOfSpeech=[補助記号, 句点, *, *]

"だろう" が分割されていないのが特徴。

b2

JUMAN 辞書版です。

kuromoji/b2.groovy
@Grab('com.atilika.kuromoji:kuromoji-jumandic:0.9.0')
import com.atilika.kuromoji.jumandic.Tokenizer

def text = args[0]
def tokenizer = new Tokenizer()

・・・
b2 結果
> groovy b2.groovy "WebAssemblyがサーバーレス分野へ大きな影響を与えるだろうと答えた回答者は全体の56%だ った。"

term=WebAssembly, partOfSpeech=[名詞, 組織名, *, *]
term=が, partOfSpeech=[助詞, 格助詞, *, *]
term=サーバーレス, partOfSpeech=[名詞, 人名, *, *]
term=分野, partOfSpeech=[名詞, 普通名詞, *, *]
term=へ, partOfSpeech=[助詞, 格助詞, *, *]
term=大きな, partOfSpeech=[連体詞, *, *, *]
term=影響, partOfSpeech=[名詞, サ変名詞, *, *]
term=を, partOfSpeech=[助詞, 格助詞, *, *]
term=与える, partOfSpeech=[動詞, *, 母音動詞, 基本形]
term=だろう, partOfSpeech=[助動詞, *, 助動詞だろう型, 基本形]
term=と, partOfSpeech=[助詞, 格助詞, *, *]
term=答えた, partOfSpeech=[動詞, *, 母音動詞, タ形]
term=回答, partOfSpeech=[名詞, サ変名詞, *, *]
term=者, partOfSpeech=[接尾辞, 名詞性名詞接尾辞, *, *]
term=は, partOfSpeech=[助詞, 副助詞, *, *]
term=全体, partOfSpeech=[名詞, 普通名詞, *, *]
term=の, partOfSpeech=[助詞, 接続助詞, *, *]
term=56, partOfSpeech=[名詞, 数詞, *, *]
term=%, partOfSpeech=[接尾辞, 名詞性名詞助数辞, *, *]
term=だった, partOfSpeech=[判定詞, *, 判定詞, ダ列タ形]
term=。, partOfSpeech=[特殊, 句点, *, *]

"サーバーレス"、"だろう"、"答えた"、"56"、"だった" が分割されていないのが特徴。 "サーバーレス" が人名となっているのは不思議。

(c) Sudachi

https://github.com/WorksApplications/Sudachi

Rust 版 もありますが、Java 版を使いました。

辞書は UniDic と NEologd をベースに調整したものらしく、3種類(Small, Core, Full)用意されています。

辞書が継続的にメンテナンスされており最新のものを使えるのが魅力だと思います。

ここではデフォルトの Core 辞書を使いました。(system_core.dic ファイルをカレントディレクトリに配置して実行)

  • Core 辞書(20211220 版)

また、Elasticsearch 用のプラグイン analysis-sudachi も用意されています。

sudachi/c1.groovy
@Grab('com.worksap.nlp:sudachi:0.5.3')
import com.worksap.nlp.sudachi.DictionaryFactory
import com.worksap.nlp.sudachi.Tokenizer

def text = args[0]

new DictionaryFactory().create().withCloseable { dic ->
    def tokenizer = dic.create()
    def ts = tokenizer.tokenize(Tokenizer.SplitMode.C, text)

    ts.each { t ->
        def pos = dic.getPartOfSpeechString(t.partOfSpeechId())

        println "term=${t.surface()}, partOfSpeech=${pos}"
    }
}
c1 結果
> groovy c1.groovy "WebAssemblyがサーバーレス分野へ大きな影響を与えるだろうと答えた回答者は全体の56%だっ た。"

term=WebAssembly, partOfSpeech=[名詞, 普通名詞, 一般, *, *, *]
term=が, partOfSpeech=[助詞, 格助詞, *, *, *, *]
term=サーバーレス, partOfSpeech=[名詞, 普通名詞, 一般, *, *, *]
term=分野, partOfSpeech=[名詞, 普通名詞, 一般, *, *, *]
term=へ, partOfSpeech=[助詞, 格助詞, *, *, *, *]
term=大きな, partOfSpeech=[連体詞, *, *, *, *, *]
term=影響, partOfSpeech=[名詞, 普通名詞, サ変可能, *, *, *]
term=を, partOfSpeech=[助詞, 格助詞, *, *, *, *]
term=与える, partOfSpeech=[動詞, 一般, *, *, 下一段-ア行, 終止形-一般]
term=だろう, partOfSpeech=[助動詞, *, *, *, 助動詞-ダ, 意志推量形]
term=と, partOfSpeech=[助詞, 格助詞, *, *, *, *]
term=答え, partOfSpeech=[動詞, 一般, *, *, 下一段-ア行, 連用形-一般]
term=た, partOfSpeech=[助動詞, *, *, *, 助動詞-タ, 連体形-一般]
term=回答者, partOfSpeech=[名詞, 普通名詞, 一般, *, *, *]
term=は, partOfSpeech=[助詞, 係助詞, *, *, *, *]
term=全体, partOfSpeech=[名詞, 普通名詞, 一般, *, *, *]
term=の, partOfSpeech=[助詞, 格助詞, *, *, *, *]
term=56, partOfSpeech=[名詞, 数詞, *, *, *, *]
term=%, partOfSpeech=[名詞, 普通名詞, 助数詞可能, *, *, *]
term=だっ, partOfSpeech=[助動詞, *, *, *, 助動詞-ダ, 連用形-促音便]
term=た, partOfSpeech=[助動詞, *, *, *, 助動詞-タ, 終止形-一般]
term=。, partOfSpeech=[補助記号, 句点, *, *, *, *]

"サーバーレス"、"だろう"、"回答者"、"56" となっているのが特徴。

(d) Kuromoji.js

https://github.com/takuyaa/kuromoji.js/

Kuromoji の JavaScript 実装。

kuromoji.js/d1.mjs
import kuromoji from 'kuromoji'

const dicPath = 'node_modules/kuromoji/dict'
const text = process.argv[2]

kuromoji.builder({ dicPath }).build((err, tokenizer) => {
    if (err) {
        console.error(err)
        return
    }

    const ts = tokenizer.tokenize(text)

    for (const t of ts) {
        const pos = [t.pos, t.pos_detail_1, t.pos_detail_2, t.pos_detail_3]

        console.log(`term=${t.surface_form}, partOfSpeech=${pos}`)
    }
})
d1 結果
> node d1.mjs "WebAssemblyがサーバーレス分野へ大きな影響を与えるだろうと答えた回答者は全体の56%だった。"

term=WebAssembly, partOfSpeech=名詞,固有名詞,組織,*
term=が, partOfSpeech=助詞,格助詞,一般,*
term=サーバー, partOfSpeech=名詞,一般,*,*
term=レス, partOfSpeech=名詞,サ変接続,*,*
term=分野, partOfSpeech=名詞,一般,*,*
term=へ, partOfSpeech=助詞,格助詞,一般,*
term=大きな, partOfSpeech=連体詞,*,*,*
term=影響, partOfSpeech=名詞,サ変接続,*,*
term=を, partOfSpeech=助詞,格助詞,一般,*
term=与える, partOfSpeech=動詞,自立,*,*
term=だろ, partOfSpeech=助動詞,*,*,*
term=う, partOfSpeech=助動詞,*,*,*
term=と, partOfSpeech=助詞,格助詞,引用,*
term=答え, partOfSpeech=動詞,自立,*,*
term=た, partOfSpeech=助動詞,*,*,*
term=回答, partOfSpeech=名詞,サ変接続,*,*
term=者, partOfSpeech=名詞,接尾,一般,*
term=は, partOfSpeech=助詞,係助詞,*,*
term=全体, partOfSpeech=名詞,副詞可能,*,*
term=の, partOfSpeech=助詞,連体化,*,*
term=5, partOfSpeech=名詞,数,*,*
term=6, partOfSpeech=名詞,数,*,*
term=%, partOfSpeech=名詞,接尾,助数詞,*
term=だっ, partOfSpeech=助動詞,*,*,*
term=た, partOfSpeech=助動詞,*,*,*
term=。, partOfSpeech=記号,句点,*,*

a1 と同じ結果になりました。

(e) Fugashi

https://github.com/polm/fugashi

MeCabPython 用ラッパー。

辞書として unidic-lite と unidic のパッケージが用意されていましたが、 ここでは JUMAN 辞書を使いました。

  • JUMAN
fugashi/e1.py
from fugashi import GenericTagger
import sys

text = sys.argv[1]

tagger = GenericTagger()

for t in tagger(text):
    pos = t.feature[0:4]
    print(f"term={t.surface}, partOfSpeech={pos}")
e1 結果
> python e1.py "WebAssemblyがサーバーレス分野へ大きな影響を与えるだろうと答えた回答者は全体の56%だった。"

term=WebAssembly, partOfSpeech=('名詞', '組織名', '*', '*')
term=が, partOfSpeech=('助詞', '格助詞', '*', '*')
term=サーバーレス, partOfSpeech=('名詞', '人名', '*', '*')
term=分野, partOfSpeech=('名詞', '普通名詞', '*', '*')
term=へ, partOfSpeech=('助詞', '格助詞', '*', '*')
term=大きな, partOfSpeech=('連体詞', '*', '*', '*')
term=影響, partOfSpeech=('名詞', 'サ変名詞', '*', '*')
term=を, partOfSpeech=('助詞', '格助詞', '*', '*')
term=与える, partOfSpeech=('動詞', '*', '母音動詞', '基本形')
term=だろう, partOfSpeech=('助動詞', '*', '助動詞だろう型', '基本形')
term=と, partOfSpeech=('助詞', '格助詞', '*', '*')
term=答えた, partOfSpeech=('動詞', '*', '母音動詞', 'タ形')
term=回答, partOfSpeech=('名詞', 'サ変名詞', '*', '*')
term=者, partOfSpeech=('接尾辞', '名詞性名詞接尾辞', '*', '*')
term=は, partOfSpeech=('助詞', '副助詞', '*', '*')
term=全体, partOfSpeech=('名詞', '普通名詞', '*', '*')
term=の, partOfSpeech=('助詞', '接続助詞', '*', '*')
term=56, partOfSpeech=('名詞', '数詞', '*', '*')
term=%, partOfSpeech=('接尾辞', '名詞性名詞助数辞', '*', '*')
term=だった, partOfSpeech=('判定詞', '*', '判定詞', 'ダ列タ形')
term=。, partOfSpeech=('特殊', '句点', '*', '*')

同じ辞書を使っている b2 と同じ結果になりました。("サーバーレス" が人名なのも同じ)。

(f) Kagome

https://github.com/ikawaha/kagome

ここでは以下の辞書を使用します。

  • IPADIC(mecab-ipadic-2.7.0-20070801)
  • UniDic(2.1.2)

なお、Tokenize を呼び出した場合は、分割モードとして Normal が適用されるようです。

f1

IPADIC 版

kagome/f1.go
package main

import (
    "fmt"
    "os"

    "github.com/ikawaha/kagome-dict/ipa"
    "github.com/ikawaha/kagome/v2/tokenizer"
)

func main() {
    text := os.Args[1]

    t, err := tokenizer.New(ipa.Dict(), tokenizer.OmitBosEos())

    if err != nil {
        panic(err)
    }
    // 分割モード Normal
    ts := t.Tokenize(text)

    for _, t := range ts {
        fmt.Printf("term=%s, partOfSpeech=%v\n", t.Surface, t.POS())
    }
}
f1 結果
> go run f1.go "WebAssemblyがサーバーレス分野へ大きな影響を与えるだろうと答えた回答者は全体の56%だった。"

term=WebAssembly, partOfSpeech=[名詞 固有名詞 組織 *]
term=が, partOfSpeech=[助詞 格助詞 一般 *]
term=サーバー, partOfSpeech=[名詞 一般 * *]
term=レス, partOfSpeech=[名詞 サ変接続 * *]
term=分野, partOfSpeech=[名詞 一般 * *]
term=へ, partOfSpeech=[助詞 格助詞 一般 *]
term=大きな, partOfSpeech=[連体詞 * * *]
term=影響, partOfSpeech=[名詞 サ変接続 * *]
term=を, partOfSpeech=[助詞 格助詞 一般 *]
term=与える, partOfSpeech=[動詞 自立 * *]
term=だろ, partOfSpeech=[助動詞 * * *]
term=う, partOfSpeech=[助動詞 * * *]
term=と, partOfSpeech=[助詞 格助詞 引用 *]
term=答え, partOfSpeech=[動詞 自立 * *]
term=た, partOfSpeech=[助動詞 * * *]
term=回答, partOfSpeech=[名詞 サ変接続 * *]
term=者, partOfSpeech=[名詞 接尾 一般 *]
term=は, partOfSpeech=[助詞 係助詞 * *]
term=全体, partOfSpeech=[名詞 副詞可能 * *]
term=の, partOfSpeech=[助詞 連体化 * *]
term=5, partOfSpeech=[名詞 数 * *]
term=6, partOfSpeech=[名詞 数 * *]
term=%, partOfSpeech=[名詞 接尾 助数詞 *]
term=だっ, partOfSpeech=[助動詞 * * *]
term=た, partOfSpeech=[助動詞 * * *]
term=。, partOfSpeech=[記号 句点 * *]

同じ辞書を使っている a1 と同じ結果になりました。

f2

UniDic 版

kagome/f2.go
package main

import (
    "fmt"
    "os"

    "github.com/ikawaha/kagome-dict/uni"
    "github.com/ikawaha/kagome/v2/tokenizer"
)

func main() {
    text := os.Args[1]

    t, err := tokenizer.New(uni.Dict(), tokenizer.OmitBosEos())

    ・・・
}
f2 結果
> go run f2.go "WebAssemblyがサーバーレス分野へ大きな影響を与えるだろうと答えた回答者は全体の56%だった。"

term=WebAssembly, partOfSpeech=[名詞 普通名詞 一般 *]
term=が, partOfSpeech=[助詞 格助詞 * *]
term=サーバー, partOfSpeech=[名詞 普通名詞 一般 *]
term=レス, partOfSpeech=[名詞 普通名詞 一般 *]
term=分野, partOfSpeech=[名詞 普通名詞 一般 *]
term=へ, partOfSpeech=[助詞 格助詞 * *]
term=大きな, partOfSpeech=[連体詞 * * *]
term=影響, partOfSpeech=[名詞 普通名詞 サ変可能 *]
term=を, partOfSpeech=[助詞 格助詞 * *]
term=与える, partOfSpeech=[動詞 一般 * *]
term=だろう, partOfSpeech=[助動詞 * * *]
term=と, partOfSpeech=[助詞 格助詞 * *]
term=答え, partOfSpeech=[動詞 一般 * *]
term=た, partOfSpeech=[助動詞 * * *]
term=回答, partOfSpeech=[名詞 普通名詞 サ変可能 *]
term=者, partOfSpeech=[接尾辞 名詞的 一般 *]
term=は, partOfSpeech=[助詞 係助詞 * *]
term=全体, partOfSpeech=[名詞 普通名詞 一般 *]
term=の, partOfSpeech=[助詞 格助詞 * *]
term=5, partOfSpeech=[名詞 数詞 * *]
term=6, partOfSpeech=[名詞 数詞 * *]
term=%, partOfSpeech=[名詞 普通名詞 助数詞可能 *]
term=だっ, partOfSpeech=[助動詞 * * *]
term=た, partOfSpeech=[助動詞 * * *]
term=。, partOfSpeech=[補助記号 句点 * *]

同じ辞書を使っている b1 と同じ結果になりました。

(g) Lindera

https://github.com/lindera-morphology/lindera

kuromoji-rs のフォークのようで、辞書は IPADIC です。

  • IPADIC(mecab-ipadic-2.7.0-20070801)
lindera/src/main.rs
use lindera::tokenizer::Tokenizer;
use lindera_core::LinderaResult;

use std::env;

fn main() -> LinderaResult<()> {
    let text = env::args().nth(1).unwrap_or("".to_string());

    let mut tokenizer = Tokenizer::new()?;
    let ts = tokenizer.tokenize(&text)?;

    for t in ts {
        let pos = t.detail.get(0..4).unwrap_or(&t.detail);

        println!("text={}, partOfSpeech={:?}", t.text, pos);
    }

    Ok(())
}
結果
> cargo run "WebAssemblyがサーバーレス分野へ大きな影響を与えるだろうと答えた回答者は全体の56%だった。"

・・・
text=WebAssembly, partOfSpeech=["UNK"]
text=が, partOfSpeech=["助詞", "格助詞", "一般", "*"]
text=サーバー, partOfSpeech=["名詞", "一般", "*", "*"]
text=レス, partOfSpeech=["名詞", "サ変接続", "*", "*"]
text=分野, partOfSpeech=["名詞", "一般", "*", "*"]
text=へ, partOfSpeech=["助詞", "格助詞", "一般", "*"]
text=大きな, partOfSpeech=["連体詞", "*", "*", "*"]
text=影響, partOfSpeech=["名詞", "サ変接続", "*", "*"]
text=を, partOfSpeech=["助詞", "格助詞", "一般", "*"]
text=与える, partOfSpeech=["動詞", "自立", "*", "*"]
text=だろ, partOfSpeech=["助動詞", "*", "*", "*"]
text=う, partOfSpeech=["助動詞", "*", "*", "*"]
text=と, partOfSpeech=["助詞", "格助詞", "引用", "*"]
text=答え, partOfSpeech=["動詞", "自立", "*", "*"]
text=た, partOfSpeech=["助動詞", "*", "*", "*"]
text=回答, partOfSpeech=["名詞", "サ変接続", "*", "*"]
text=者, partOfSpeech=["名詞", "接尾", "一般", "*"]
text=は, partOfSpeech=["助詞", "係助詞", "*", "*"]
text=全体, partOfSpeech=["名詞", "副詞可能", "*", "*"]
text=の, partOfSpeech=["助詞", "連体化", "*", "*"]
text=5, partOfSpeech=["名詞", "数", "*", "*"]
text=6, partOfSpeech=["名詞", "数", "*", "*"]
text=%, partOfSpeech=["名詞", "接尾", "助数詞", "*"]
text=だっ, partOfSpeech=["助動詞", "*", "*", "*"]
text=た, partOfSpeech=["助動詞", "*", "*", "*"]
text=。, partOfSpeech=["記号", "句点", "*", "*"]

a1 と概ね同じ結果となりましたが、"WebAssembly" が名詞になっていないのが特徴。

イベントベースで考える在庫管理モデル

従来のイベントソーシングのような手法だと、特定の State(というよりは Entity かも)を永続化するための手段として Event を用いるというような、あくまでも State 中心の発想になると思います。

そこで、ここでは下記のような Event 中心の発想に切り替えて、在庫管理(在庫数を把握するだけの単純なもの)を考えてみました。

  • State は本質ではなく、Event を解釈した結果にすぎない(解釈の仕方は様々)
  • Event を得たり、伝えたりするための手段として State を用いる

要するに、Event こそが重要で State(Entity とか)は取るに足らない存在だと(実験的に)考えてみたって事です。

従来のイベントソーシング 本件
State が目的、Event が手段 Event が目的、State が手段

なお、ここでイメージしている Event は、特定のドメインに依存しないような本質的なものです。

在庫管理

本件の在庫管理は、以下を把握するだけの単純なものです。

  • 何処に何の在庫(とりあえず現物のあるもの)がいくつあるか

実装コードは http://github.com/fits/try_samples/tree/master/blog/20201213/

1. 本質的なイベント

在庫管理で起こりそうなイベントを考えてみます。

在庫(数)は入庫と出庫の結果だと考えられるので、以下のようなイベントが考えられそうです。

  • 入庫イベント
  • 出庫イベント

また、シンプルに物が移動 ※ した結果が在庫なのだと考えると、(2地点間の)在庫の移動という形で抽象化できそうな気がします。

 ※ 概念的な移動も含める

そうすると、以下のようなイベントも考えられそうです。

  • 在庫移動の開始イベント
  • 在庫移動の終了(完了)イベント

ついでに、在庫の引当も区別して考えると ※、以下のようなイベントも考えられます。

  • 引当イベント
 ※ 引当用の場所へ移動するという事にするのであれば区別しなくてもよさそう

まとめると、とりあえずは以下のようなイベントが考えられそうです。

  • 在庫移動の開始イベント
  • 在庫移動の完了イベント
  • 在庫移動のキャンセルイベント
  • 引当イベント
  • 引当した場合の出庫イベント
  • 引当しなかった場合の出庫イベント
  • 入庫イベント

ついでに、引当や出庫などの成否をイベントとして明確に分けたい場合は、引当失敗イベント等の失敗イベントを別途設ければ良さそうな気がします。

2. イベント定義

これらのイベントを Rust と TypeScript で型定義してみました。

商品や在庫のロケーション(在庫の保管場所)の具体的な内容はどうでもよいので(ここで具体化する必要がない)、Generics の型変数で表現しておきます。

本質的に必要そうな最低限の情報のみを持たせ、余計な情報は取り除いておきます。※

 ※ 在庫移動を一意に限定する ID や日付のような
    メタデータ(と考えられるもの)に関しても除外しました

用語はとりあえず以下のようにしています。

  • 引当: assign
  • 出庫: ship
  • 入庫: arrive

何(item)をいくつ(qty)、何処(from)から何処(to)へ移動する予定なのかという情報を持たせて在庫の移動を開始するようにしてみました。

入出庫等で予定とは異なる内容になっても不都合が生じないように、それぞれのイベントに必要な情報を持たせています。

また、全体的に ADT(代数的データ型)を意識した内容にしています。

Rust で型定義したイベント

models/events.rs
pub enum StockMoveEvent<Item, Location, Quantity> {
    // 開始
    Started {
        item: Item, 
        qty: Quantity, 
        from: Location, 
        to: Location,
    },
    // 完了
    Completed,
    // キャンセル
    Cancelled,
    // 引当
    Assigned {
        item: Item, 
        from: Location,
        assigned: Quantity, 
    },
    // 出庫
    Shipped {
        item: Item, 
        from: Location,
        outgoing: Quantity, 
    },
    // 引当に対する出庫
    AssignShipped {
        item: Item, 
        from: Location,
        outgoing: Quantity,
        assigned: Quantity,
    },
    // 入庫
    Arrived {
        item: Item, 
        to: Location,
        incoming: Quantity, 
    },
}

TypeScript で型定義したイベント

models/events.ts
export interface StockMoveEventStarted<Item, Location, Quantity> {
    tag: 'stock-move-event.started'
    item: Item
    qty: Quantity
    from: Location
    to: Location
}

export interface StockMoveEventCompleted {
    tag: 'stock-move-event.completed'
}

export interface StockMoveEventCancelled {
    tag: 'stock-move-event.cancelled'
}

export interface StockMoveEventAssigned<Item, Location, Quantity> {
    tag: 'stock-move-event.assigned'
    item: Item
    from: Location
    assigned: Quantity
}

export interface StockMoveEventShipped<Item, Location, Quantity> {
    tag: 'stock-move-event.shipped'
    item: Item
    from: Location
    outgoing: Quantity
}

export interface StockMoveEventAssignShipped<Item, Location, Quantity> {
    tag: 'stock-move-event.assign-shipped'
    item: Item
    from: Location
    outgoing: Quantity
    assigned: Quantity
}

export interface StockMoveEventArrived<Item, Location, Quantity> {
    tag: 'stock-move-event.arrived'
    item: Item
    to: Location
    incoming: Quantity
}

export type StockMoveEvent<Item, Location, Quantity> = 
    StockMoveEventStarted<Item, Location, Quantity> | 
    StockMoveEventCompleted | 
    StockMoveEventCancelled | 
    StockMoveEventAssigned<Item, Location, Quantity> | 
    StockMoveEventShipped<Item, Location, Quantity> | 
    StockMoveEventAssignShipped<Item, Location, Quantity> | 
    StockMoveEventArrived<Item, Location, Quantity>

3. 在庫移動処理サンプル

上記で定義したイベントを以下のような(在庫移動の)ステートマシンで扱ってみます。※

f:id:fits:20201213191951p:plain

 ※ 本件の考え方では、
    (本質的な)イベントは特定の処理やルールになるべく依存していない事が重要なので、
    このステートマシン(イベントを扱う手段の一つでしかない)に対して
    特化しないように注意します
  • 在庫移動の状態遷移の基本パターンは 3通り
    • (a) 引当 -> 出庫 -> 入庫
    • (b) 出庫 -> 入庫
    • (c) 入庫
  • 入庫の失敗状態は無し(0個の入庫で代用)

(c) は出庫側の状況が不明なケースで入庫の記録だけを残すような用途を想定しています。

3.1 ステートマシンの実装

Rust と TypeScript でそれぞれ実装してみます。

この辺のレイヤーまでは、外界の都合(※1)から隔離しておきたいと考え、関数言語的な作りにしています。

イベントと同様に在庫移動や在庫を ADT(代数的データ型) で表現し、下記のような関数(+ ユーティリティ関数)を提供するだけの作りにしてみました。(※2)

(※1)フレームワーク、UI、永続化、非同期処理、排他制御やその他諸々の都合
(※2)こうしておくと、WebAssembly 等でコンポーネント化して
       再利用するなんて事も実現し易くなるかもしれませんし
  • (1) 初期状態を返す関数
  • (2) 現在の状態とアクションから次の状態とそれに伴って発生したイベントを返す関数(イメージとしては State -> Action -> Container<(State, Event)>
  • (3) ある時点の状態とそれ以降に起きたイベントの内容から任意の状態を復元して返す関数

なお、ここでは (2) のアクションに相当する部分は関数(と引数の一部)として実装しています。

また、(2) で状態遷移が発生しなかった場合に undefined を返すように実装していますが、実際は成功時と失敗時の両方を扱うようなコンテナ型(Rust の Result や Either モナドとか)で包むのが望ましいと思います。

ついでに、実装に際して以下のようなルールを加えています。

  • 引当、入庫、出庫のロケーションは開始時に予定したものを使用
  • 引当時にのみ在庫数を確認(残りの在庫をチェック)
  • 在庫のタイプは 2種類
    • 在庫数を管理するタイプ(引当分の在庫が余っている場合にのみ引当が成功、在庫数は入庫イベントと出庫イベントから算出)
    • 在庫数を管理しないタイプ(引当は常に成功、在庫数は管理せず実質的に無限)
  • 引当数や出庫数が 0 の場合は(引当や出庫の)失敗として扱う

引当はこの処理内における単なる数値上の予約であり、入出庫は実際の作業の結果を反映するような用途をとりあえず想定しています。

そのため、数値上の引当に成功しても実際の出庫が成功するとは限らず、数値上の在庫数以上の出庫が発生するようなケースも考えられるので、この処理ではそれらを許容するようにしています。※

 ※ 在庫の整合性等をどのように制御・調整するかは
    使う側(外側のレイヤー)に任せる

Rust による実装

ここで、商品(以下の ItemCode)や在庫ロケーション(以下の LocationCode)の具体的な型を決めていますが、これより外側のレイヤーに具体型を決めさせるようにした方が望ましいかもしれません。

models/stockmove.rs
use std::slice;

use super::events::StockMoveEvent;
// 商品を識別するための型
pub type ItemCode = String;
// 在庫ロケーションを識別するための型
pub type LocationCode = String;
pub type Quantity = u32;

pub trait Event<S> {
    type Output;

    fn apply_to(&self, state: S) -> Self::Output;
}

pub trait Restore<E> {
    fn restore(self, events: slice::Iter<E>) -> Self;
}
// 在庫の型定義
#[allow(dead_code)]
#[derive(Debug, Clone)]
pub enum Stock {
    Unmanaged { item: ItemCode, location: LocationCode },
    Managed { 
        item: ItemCode, 
        location: LocationCode, 
        qty: Quantity, 
        assigned: Quantity
    },
}
// 在庫に関する処理
#[allow(dead_code)]
impl Stock {
    pub fn unmanaged_new(item: ItemCode, location: LocationCode) -> Self {
        Self::Unmanaged { item, location }
    }

    pub fn managed_new(item: ItemCode, location: LocationCode) -> Self {
        Self::Managed { item, location, qty: 0, assigned: 0 }
    }

    pub fn eq_id(&self, item: &ItemCode, location: &LocationCode) -> bool {
        match self {
            Self::Managed { item: it, location: loc, .. } | 
            Self::Unmanaged { item: it, location: loc } => 
                it == item && loc == location
        }
    }
    // 在庫数のチェック
    pub fn is_sufficient(&self, v: Quantity) -> bool {
        match self {
            Self::Managed { qty, assigned, .. } =>
                v + assigned <= *qty,
            Self::Unmanaged { .. } => true, 
        }
    }

    fn update(&self, qty: Quantity, assigned: Quantity) -> Self {
        match self {
            Self::Managed { item, location, .. } => {
                Self::Managed {
                    item: item.clone(),
                    location: location.clone(),
                    qty,
                    assigned,
                }
            },
            Self::Unmanaged { .. } => self.clone(),
        }
    }

    fn update_qty(&self, qty: Quantity) -> Self {
        match self {
            Self::Managed { assigned, .. } => self.update(qty, *assigned),
            Self::Unmanaged { .. } => self.clone(),
        }
    }

    fn update_assigned(&self, assigned: Quantity) -> Self {
        match self {
            Self::Managed { qty, .. } => self.update(*qty, assigned),
            Self::Unmanaged { .. } => self.clone(),
        }
    }
}
// 在庫に対するイベントの適用
impl Event<Stock> for MoveEvent {
    type Output = Stock;

    fn apply_to(&self, state: Stock) -> Self::Output {
        match &state {
            Stock::Unmanaged { .. } => state,
            Stock::Managed { item: s_item, location: s_loc, 
                qty: s_qty, assigned: s_assigned } => {

                match self {
                    Self::Assigned { item, from, assigned } 
                    if s_item == item && s_loc == from => {

                        state.update_assigned(
                            s_assigned + assigned
                        )
                    },
                    Self::Shipped { item, from, outgoing }
                    if s_item == item && s_loc == from => {

                        state.update_qty(
                            s_qty.checked_sub(*outgoing).unwrap_or(0)
                        )
                    },
                    Self::AssignShipped { item, from, outgoing, assigned }
                    if s_item == item && s_loc == from => {

                        state.update(
                            s_qty.checked_sub(*outgoing).unwrap_or(0),
                            s_assigned.checked_sub(*assigned).unwrap_or(0),
                        )
                    },
                    Self::Arrived { item, to, incoming }
                    if s_item == item && s_loc == to => {

                        state.update_qty(
                            s_qty + incoming
                        )
                    },
                    _ => state,
                }
            },
        }
    }
}

#[derive(Debug, Default, Clone, PartialEq)]
pub struct StockMoveInfo {
    item: ItemCode,
    qty: Quantity,
    from: LocationCode,
    to: LocationCode,
}
// 在庫移動の型(状態)定義
#[allow(dead_code)]
#[derive(Debug, Clone, PartialEq)]
pub enum StockMove {
    Nothing,
    Draft { info: StockMoveInfo },
    Completed { info: StockMoveInfo, outgoing: Quantity, incoming: Quantity },
    Cancelled { info: StockMoveInfo },
    Assigned { info: StockMoveInfo, assigned: Quantity },
    Shipped { info: StockMoveInfo, outgoing: Quantity },
    Arrived { info: StockMoveInfo, outgoing: Quantity, incoming: Quantity },
    AssignFailed { info: StockMoveInfo },
    ShipmentFailed { info: StockMoveInfo },
}

type MoveEvent = StockMoveEvent<ItemCode, LocationCode, Quantity>;
type MoveResult = Option<(StockMove, MoveEvent)>;
// 在庫移動に関する処理
#[allow(dead_code)]
impl StockMove {
    // 初期状態の取得
    pub fn initial_state() -> Self {
        Self::Nothing
    }
    // 開始
    pub fn start(&self, item: ItemCode, qty: Quantity, 
        from: LocationCode, to: LocationCode) -> MoveResult {

        if qty < 1 {
            return None
        }

        let event = StockMoveEvent::Started {
            item: item.clone(), 
            qty: qty, 
            from: from.clone(), 
            to: to.clone()
        };

        self.apply_event(event)
    }
    // 引当
    pub fn assign(&self, stock: &Stock) -> MoveResult {
        if let Some(info) = self.info() {
            if stock.eq_id(&info.item, &info.from) {
                let assigned = if stock.is_sufficient(info.qty) {
                    info.qty
                } else {
                    0
                };

                return self.apply_event(
                    StockMoveEvent::Assigned {
                        item: info.item.clone(),
                        from: info.from.clone(),
                        assigned,
                    }
                )
            }
        }

        None
    }
    // 出庫
    pub fn ship(&self, outgoing: Quantity) -> MoveResult {
        let ev = match self {
            Self::Assigned { info, assigned } => {
                Some(StockMoveEvent::AssignShipped {
                    item: info.item.clone(),
                    from: info.from.clone(),
                    outgoing,
                    assigned: assigned.clone(),
                })
            },
            _ => {
                self.info()
                    .map(|i|
                        StockMoveEvent::Shipped {
                            item: i.item.clone(),
                            from: i.from.clone(),
                            outgoing,
                        }
                    )
            },
        };

        ev.and_then(|e| self.apply_event(e))
    }
    // 入庫
    pub fn arrive(&self, incoming: Quantity) -> MoveResult {
        self.info()
            .and_then(|i|
                self.apply_event(StockMoveEvent::Arrived {
                    item: i.item.clone(),
                    to: i.to.clone(),
                    incoming,
                })
            )
    }

    pub fn complete(&self) -> MoveResult {
        self.apply_event(StockMoveEvent::Completed)
    }

    pub fn cancel(&self) -> MoveResult {
        self.apply_event(StockMoveEvent::Cancelled)
    }

    fn info(&self) -> Option<StockMoveInfo> {
        match self {
            Self::Draft { info } |
            Self::Completed { info, .. } |
            ・・・
            Self::Arrived { info, .. } => {
                Some(info.clone())
            },
            Self::Nothing => None,
        }
    }

    fn apply_event(&self, event: MoveEvent) -> MoveResult {
        let new_state = event.apply_to(self.clone());

        Some((new_state, event))
            .filter(|r| r.0 != *self)
    }
}
// 在庫移動に対するイベントの適用
impl Event<StockMove> for MoveEvent {
    type Output = StockMove;

    fn apply_to(&self, state: StockMove) -> Self::Output {
        match self {
            Self::Started { item, qty, from, to } => {
                if state == StockMove::Nothing {
                    StockMove::Draft {
                        info: StockMoveInfo { 
                            item: item.clone(), 
                            qty: qty.clone(), 
                            from: from.clone(), 
                            to: to.clone(),
                        }
                    }
                } else {
                    state
                }
            },
            Self::Completed => {
                if let StockMove::Arrived { info, outgoing, incoming } = state {
                    StockMove::Completed { info: info.clone(), outgoing, incoming }
                } else {
                    state
                }
            },
            Self::Cancelled => {
                if let StockMove::Draft { info } = state {
                    StockMove::Cancelled { info: info.clone() }
                } else {
                    state
                }
            },
            Self::Assigned { item, from, assigned } => {
                match state {
                    StockMove::Draft { info } 
                    if info.item == *item && info.from == *from => {

                        if *assigned > 0 {
                            StockMove::Assigned { 
                                info: info.clone(), 
                                assigned: assigned.clone(),
                            }
                        } else {
                            StockMove::AssignFailed { info: info.clone() }
                        }
                    },
                    _ => state,
                }
            },
            Self::Shipped { item, from, outgoing } => {
                match state {
                    StockMove::Draft { info }
                    if info.item == *item && info.from == *from => {

                        if *outgoing > 0 {
                            StockMove::Shipped { 
                                info: info.clone(), 
                                outgoing: outgoing.clone(),
                            }
                        } else {
                            StockMove::ShipmentFailed { info: info.clone() }
                        }
                    },
                    _ => state,
                }
            },
            Self::AssignShipped { item, from, outgoing, .. } => {
                match state {
                    StockMove::Assigned { info, .. }
                    if info.item == *item && info.from == *from => {

                        if *outgoing > 0 {
                            StockMove::Shipped { 
                                info: info.clone(), 
                                outgoing: outgoing.clone(),
                            }
                        } else {
                            StockMove::ShipmentFailed { info: info.clone() }
                        }
                    },
                    _ => state,
                }
            },
            Self::Arrived { item, to, incoming } => {
                match state {
                    StockMove::Draft { info }
                    if info.item == *item && info.to == *to => {
                        StockMove::Arrived {
                            info: info.clone(),
                            outgoing: 0,
                            incoming: *incoming,
                        }
                    },
                    StockMove::Shipped { info, outgoing }
                    if info.item == *item && info.to == *to => {
                        StockMove::Arrived {
                            info: info.clone(),
                            outgoing,
                            incoming: *incoming,
                        }
                    },
                    _ => state,
                }
            },
        }
    }
}
// 在庫や在庫移動の状態復元
impl<S, E> Restore<&E> for S
where
    Self: Clone,
    E: Event<Self, Output = Self>,
{
    fn restore(self, events: slice::Iter<&E>) -> Self {
        events.fold(self, |acc, ev| ev.apply_to(acc.clone()))
    }
}

TypeScript による実装

実装の仕方が多少違っていますが、Rust 版の処理内容と概ね同じ(にしたつもり)です。

models/stockmove.ts
import { 
    StockMoveEvent, StockMoveEventShipped, StockMoveEventAssignShipped 
} from './events'

export type ItemCode = string
export type LocationCode = string
export type Quantity = number

export type MoveEvent = StockMoveEvent<ItemCode, LocationCode, Quantity>

type ShippedMoveEvent = StockMoveEventShipped<ItemCode, LocationCode, Quantity>
type AssignShippedMoveEvent = StockMoveEventAssignShipped<ItemCode, LocationCode, Quantity>

interface StockUnmanaged {
    tag: 'stock.unmanaged'
    item: ItemCode
    location: LocationCode
}

interface StockManaged {
    tag: 'stock.managed'
    item: ItemCode
    location: LocationCode
    qty: Quantity
    assigned: Quantity
}
// 在庫の型定義
export type Stock = StockUnmanaged | StockManaged
// 在庫に関する処理
export class StockAction {
    static newUnmanaged(item: ItemCode, location: LocationCode): Stock {
        return {
            tag: 'stock.unmanaged',
            item,
            location
        }
    }

    static newManaged(item: ItemCode, location: LocationCode): Stock {
        return {
            tag: 'stock.managed',
            item,
            location,
            qty: 0,
            assigned: 0
        }
    }
    // 在庫数のチェック
    static isSufficient(stock: Stock, qty: Quantity): boolean {
        switch (stock.tag) {
            case 'stock.unmanaged':
                return true
            case 'stock.managed':
                return qty + Math.max(0, stock.assigned) <= Math.max(0, stock.qty)
        }
    }
}
// 在庫の復元処理
export class StockRestore {
    static restore(state: Stock, events: MoveEvent[]): Stock {
        return events.reduce(StockRestore.applyTo, state)
    }
    // 在庫に対するイベントの適用
    private static applyTo(state: Stock, event: MoveEvent): Stock {
        if (state.tag == 'stock.managed') {
            switch (event.tag) {
                case 'stock-move-event.assigned':
                    if (state.item == event.item && state.location == event.from) {
                        return StockRestore.updateAssigned(
                            state, 
                            state.assigned + event.assigned
                        )
                    }
                    break
                case 'stock-move-event.assign-shipped':
                    if (state.item == event.item && state.location == event.from) {
                        return StockRestore.updateStock(
                            state,
                            state.qty - event.outgoing,
                            state.assigned - event.assigned
                        )
                    }
                    break
                ・・・
            }
        }
        return state
    }

    private static updateStock(stock: Stock, qty: Quantity, assigned: Quantity): Stock {
        switch (stock.tag) {
            case 'stock.unmanaged':
                return stock
            case 'stock.managed':
                return {
                    tag: stock.tag,
                    item: stock.item,
                    location: stock.location,
                    qty,
                    assigned
                }
        }
    }

    ・・・
}

interface StockMoveInfo {
    item: ItemCode
    qty: Quantity
    from: LocationCode
    to: LocationCode
}

interface StockMoveNothing {
    tag: 'stock-move.nothing'
}

interface StockMoveDraft {
    tag: 'stock-move.draft'
    info: StockMoveInfo
}

・・・

// 在庫移動の型(状態)定義
export type StockMove = 
    StockMoveNothing | StockMoveDraft | StockMoveCompleted | 
    StockMoveCancelled | StockMoveAssigned | StockMoveShipped |
    StockMoveArrived | StockMoveAssignFailed | StockMoveShipmentFailed


export type StockMoveResult = [StockMove, MoveEvent] | undefined
// 在庫移動に関する処理
export class StockMoveAction {
    // 初期状態を取得
    static initialState(): StockMove {
        return { tag: 'stock-move.nothing' }
    }
    // 開始
    static start(state: StockMove, item: ItemCode, qty: Quantity, 
        from: LocationCode, to: LocationCode): StockMoveResult {

        if (qty < 1) {
            return undefined
        }

        const event: MoveEvent = {
            tag: 'stock-move-event.started',
            item,
            qty,
            from,
            to
        }

        return StockMoveAction.applyTo(state, event)
    }
    // 引当
    static assign(state: StockMove, stock: Stock): StockMoveResult {
        const info = StockMoveAction.info(state)

        if (info && info.item == stock.item && info.from == stock.location) {
            const assigned = 
                (stock && StockAction.isSufficient(stock, info.qty)) ? info.qty : 0
            
            const event: MoveEvent = {
                tag: 'stock-move-event.assigned',
                item: info.item,
                from: info.from,
                assigned
            }

            return StockMoveAction.applyTo(state, event)
        }

        return undefined
    }
    // 出庫
    static ship(state: StockMove, outgoing: Quantity): StockMoveResult {
        if (outgoing < 0) {
            return undefined
        }

        const event = StockMoveAction.toShippedEvent(state, outgoing)

        return event ? StockMoveAction.applyTo(state, event) : undefined
    }
    // 入庫
    static arrive(state: StockMove, incoming: Quantity): StockMoveResult {
        if (incoming < 0) {
            return undefined
        }

        const info = StockMoveAction.info(state)

        if (info) {
            const event: MoveEvent = {
                tag: 'stock-move-event.arrived',
                item: info.item,
                to: info.to,
                incoming
            }

            return StockMoveAction.applyTo(state, event)
        }
        return undefined
    }

    ・・・

    static info(state: StockMove) {
        if (state.tag != 'stock-move.nothing') {
            return state.info
        }

        return undefined
    }

    private static applyTo(state: StockMove, event: MoveEvent): StockMoveResult {
        const nextState = StockMoveRestore.restore(state, [event])

        return (nextState != state) ? [nextState, event] : undefined
    }

    private static toShippedEvent(state: StockMove, outgoing: number): MoveEvent | undefined {

        const info = StockMoveAction.info(state)

        if (info) {
            if (state.tag == 'stock-move.assigned') {
                return {
                    tag: 'stock-move-event.assign-shipped',
                    item: info.item,
                    from: info.from,
                    assigned: state.assigned,
                    outgoing
                }
            }
            else {
                return {
                    tag: 'stock-move-event.shipped',
                    item: info.item,
                    from: info.from,
                    outgoing
                }
            }
        }
        return undefined
    }
}
// 在庫移動の復元処理
export class StockMoveRestore {
    static restore(state: StockMove, events: MoveEvent[]): StockMove {
        return events.reduce(StockMoveRestore.applyTo, state)
    }
    // 在庫移動に対するイベントの適用
    private static applyTo(state: StockMove, event: MoveEvent): StockMove {
        switch (state.tag) {
            case 'stock-move.nothing':
                if (event.tag == 'stock-move-event.started') {
                    return {
                        tag: 'stock-move.draft',
                        info: {
                            item: event.item,
                            qty: event.qty,
                            from: event.from,
                            to: event.to
                        }
                    }
                }
                break
            case 'stock-move.draft':
                return StockMoveRestore.applyEventToDraft(state, event)
            case 'stock-move.assigned':
                if (event.tag == 'stock-move-event.assign-shipped') {
                    return StockMoveRestore.applyShipped(state, event)
                }
                break
            case 'stock-move.shipped':
                if (event.tag == 'stock-move-event.arrived' &&
                    state.info.item == event.item && 
                    state.info.to == event.to) {

                    return {
                        tag: 'stock-move.arrived',
                        info: state.info,
                        outgoing: state.outgoing,
                        incoming: event.incoming
                    }
                }
                break
            case 'stock-move.arrived':
                if (event.tag == 'stock-move-event.completed') {
                    return {
                        tag: 'stock-move.completed',
                        info: state.info,
                        outgoing: state.outgoing,
                        incoming: state.incoming
                    }
                }
                break
            case 'stock-move.completed':
            case 'stock-move.cancelled':
            case 'stock-move.assign-failed':
            case 'stock-move.shipment-failed':
                break
        }
        return state
    }

    private static applyShipped(state: StockMoveDraft | StockMoveAssigned, 
        event: ShippedMoveEvent | AssignShippedMoveEvent): StockMove {

        if (state.info.item == event.item && state.info.from == event.from) {
            if (event.outgoing > 0) {
                return {
                    tag: 'stock-move.shipped',
                    info: state.info,
                    outgoing: event.outgoing
                }
            }
            else {
                return {
                    tag: 'stock-move.shipment-failed',
                    info: state.info
                }
            }
        }
        return state
    }

    private static applyEventToDraft(state: StockMoveDraft, event: MoveEvent): StockMove {

        switch (event.tag) {
            case 'stock-move-event.cancelled':
                return {
                    tag: 'stock-move.cancelled',
                    info: state.info
                }
            case 'stock-move-event.assigned':
                if (state.info.item == event.item && state.info.from == event.from) {
                    if (event.assigned > 0) {
                        return {
                            tag: 'stock-move.assigned',
                            info: state.info,
                            assigned: event.assigned
                        }
                    }
                    else {
                        return {
                            tag: 'stock-move.assign-failed',
                            info: state.info
                        }
                    }
                }
                break
            case 'stock-move-event.shipped':
                return StockMoveRestore.applyShipped(state, event)
            case 'stock-move-event.arrived':
                if (state.info.item == event.item && state.info.to == event.to) {
                    return {
                        tag: 'stock-move.arrived',
                        info: state.info,
                        outgoing: 0,
                        incoming: Math.max(event.incoming, 0)
                    }
                }
                break
        }

        return state
    }
}

3.2 GraphQL 化 + MongoDB へ永続化

ついでに、前述のステートマシン(TypeScript 実装版)を Apollo Server で GraphQL 化し、MongoDB へ永続化するようにしてみました。

index.ts
import { ApolloServer, gql } from 'apollo-server'
import { v4 as uuidv4 } from 'uuid'
import { MongoClient, Collection } from 'mongodb'

import {
    ItemCode, LocationCode, MoveEvent,
    StockMoveAction, StockMoveRestore, StockMove, StockMoveResult,
    StockAction, StockRestore, Stock
} from './models'

const mongoUrl = 'mongodb://localhost'
const dbName = 'stockmoves'
const colName = 'events'
const stocksColName = 'stocks'

type MoveId = string
type Revision = number
// MongoDB へ保存するイベント内容
interface StoredEvent {
    move_id: MoveId
    revision: Revision
    item: ItemCode
    from: LocationCode
    to: LocationCode
    event: MoveEvent
}

interface RestoredStockMove {
    state: StockMove
    revision: Revision
}
// MongoDB への永続化処理
class Store {
    ・・・
    async loadStock(item: ItemCode, location: LocationCode): Promise<Stock | undefined> {
        const id = this.stockId(item, location)
        const stock = await this.stocksCol.findOne({ _id: id })

        if (!stock) {
            return undefined
        }

        const query = {
            '$and': [
                { item },
                { '$or': [
                    { from: location },
                    { to: location }
                ]}
            ]
        }

        const events = await this.eventsCol
            .find(query)
            .map(r => r.event)
            .toArray()

        return StockRestore.restore(stock, events)
    }

    async saveStock(stock: Stock): Promise<void> {
        const id = this.stockId(stock.item, stock.location)

        const res = await this.stocksCol.updateOne(
            { _id: id },
            { '$setOnInsert': stock },
            { upsert: true }
        )

        if (res.upsertedCount == 0) {
            return Promise.reject('conflict stock')
        }
    }

    async loadMove(moveId: MoveId): Promise<RestoredStockMove | undefined> {
        const events: StoredEvent[] = await this.eventsCol
            .find({ move_id: moveId })
            .sort({ revision: 1 })
            .toArray()

        const state = StockMoveAction.initialState()
        const revision = events.reduce((acc, e) => Math.max(acc, e.revision), 0)

        const res = StockMoveRestore.restore(state, events.map(e => e.event))

        return (res == state) ? undefined : { state: res, revision }
    }

    async saveEvent(event: StoredEvent): Promise<void> {
        const res = await this.eventsCol.updateOne(
            { move_id: event.move_id, revision: event.revision },
            { '$setOnInsert': event },
            { upsert: true }
        )

        if (res.upsertedCount == 0) {
            return Promise.reject(`conflict event revision=${event.revision}`)
        }
    }

    private stockId(item: ItemCode, location: LocationCode): string {
        return `${item}/${location}`
    }
}
// GraphQL スキーマ定義
const typeDefs = gql(`
    type StockMoveInfo {
        item: ID!
        qty: Int!
        from: ID!
        to: ID!
    }

    interface StockMove {
        id: ID!
        info: StockMoveInfo!
    }

    type DraftStockMove implements StockMove {
        id: ID!
        info: StockMoveInfo!
    }

    type CompletedStockMove implements StockMove {
        id: ID!
        info: StockMoveInfo!
        outgoing: Int!
        incoming: Int!
    }

    ・・・

    interface Stock {
        item: ID!
        location: ID!
    }

    type UnmanagedStock implements Stock {
        item: ID!
        location: ID!
    }

    type ManagedStock implements Stock {
        item: ID!
        location: ID!
        qty: Int!
        assigned: Int!
    }

    input CreateStockInput {
        item: ID!
        location: ID!
    }

    input StartMoveInput {
        item: ID!
        qty: Int!
        from: ID!
        to: ID!
    }

    type Query {
        findStock(item: ID!, location: ID!): Stock
        findMove(id: ID!): StockMove
    }

    type Mutation {
        createManaged(input: CreateStockInput!): ManagedStock
        createUnmanaged(input: CreateStockInput!): UnmanagedStock

        start(input: StartMoveInput!): StockMove
        assign(id: ID!): StockMove
        ship(id: ID!, outgoing: Int!): StockMove
        arrive(id: ID!, incoming: Int!): StockMove
        complete(id: ID!): StockMove
        cancel(id: ID!): StockMove
    }
`)

const toStockMoveForGql = (id: MoveId, state: StockMove | undefined) => {
    if (state) {
        return { id, ...state }
    }
    return undefined
}

type MoveAction = (state: StockMove) => StockMoveResult

const doMoveAction = async (store: Store, rs: RestoredStockMove | undefined, 
    id: MoveId, action: MoveAction) => {

    if (rs) {
        const res = action(rs.state)

        if (res) {
            const [mv, ev] = res
            const info = StockMoveAction.info(mv)

            if (info) {
                const event = { 
                    move_id: id, 
                    revision: rs.revision + 1,
                    item: info.item,
                    from: info.from,
                    to: info.to,
                    event: ev
                }

                await store.saveEvent(event)

                return toStockMoveForGql(id, mv)
            }
        }
    }
    return undefined
}
// GraphQL 処理の実装
const resolvers = {
    Stock: {
        __resolveType: (obj, ctx, info) => {
            if (obj.tag == 'stock.managed') {
                return 'ManagedStock'
            }
            return 'UnmanagedStock'
        }
    },
    StockMove: {
        __resolveType: (obj: StockMove, ctx, info) => {
            switch (obj.tag) {
                case 'stock-move.draft':
                    return 'DraftStockMove'
                case 'stock-move.completed':
                    return 'CompletedStockMove'
                ・・・
                case 'stock-move.shipment-failed':
                    return 'ShipmentFailedStockMove'
            }
            return undefined
        }
    },
    Query: {
        findStock: async (parent, { item, location }, { store }, info) => {
            return store.loadStock(item, location)
        },
        findMove: async (parent, { id }, { store }, info) => {
            const res = await store.loadMove(id)
            return toStockMoveForGql(id, res?.state)
        }
    },
    Mutation: {
        createManaged: async (parent, { input: { item, location } }, { store }, info) => {
            const s = StockAction.newManaged(item, location)

            await store.saveStock(s)

            return s
        },
        ・・・
        start: async (parent, { input: { item, qty, from, to } }, { store }, info) => {
            const rs = { state: StockMoveAction.initialState(), revision: 0 }
            const id = `move-${uuidv4()}`

            return doMoveAction(
                store, rs, id, 
                s => StockMoveAction.start(s, item, qty, from, to)
            )
        },
        assign: async(parent, { id }, { store }, info) => {
            const rs = await store.loadMove(id)

            if (rs) {
                const info = StockMoveAction.info(rs.state)

                if (info) {
                    const stock = await store.loadStock(info.item, info.from)

                    return doMoveAction(
                        store, rs, id, 
                        s => StockMoveAction.assign(s, stock)
                    )
                }
            }
            return undefined
        },
        ship: async(parent, { id, outgoing }, { store }, info) => {
            const rs = await store.loadMove(id)

            return doMoveAction(
                store, rs, id, 
                s => StockMoveAction.ship(s, outgoing)
            )
        },
        ・・・
    }
}

const run = async () => {
    const mongo = await MongoClient.connect(mongoUrl, { useUnifiedTopology: true })
    const eventsCol = mongo.db(dbName).collection(colName)
    const stocksCol = mongo.db(dbName).collection(stocksColName)

    const store = new Store(eventsCol, stocksCol)

    const server = new ApolloServer({
        typeDefs, 
        resolvers, 
        context: {
            store
        }
    })

    const res = await server.listen()

    console.log(res.url)
}

run().catch(err => console.error(err))

クライアント実装例

以下のように GraphQL クエリを送信する事で操作できます。

client/create_stock.ts (在庫の作成)
import { request, gql } from 'graphql-request'

const endpoint = 'http://localhost:4000'

const item = process.argv[2]
const location = process.argv[3]

const q1 = gql`
    mutation CreateUnmanaged($item: ID!, $location: ID!) {
        createUnmanaged(input: { item: $item, location: $location }) {
            __typename
            item
            location
        }
    }
`

const q2 = gql`
    mutation CreateManaged($item: ID!, $location: ID!) {
        createManaged(input: { item: $item, location: $location }) {
            __typename
            item
            location
        }
    }
`

const query = process.argv.length > 4 ? q1 : q2

request(endpoint, query, { item, location })
    .then(r => console.log(r))
    .catch(err => console.error(err))
create_stock.ts 実行例
> ts-node create_stock.ts item-1 store-A
{
  createManaged: { __typename: 'ManagedStock', item: 'item-1', location: 'store-A' }
}
client/start_move.ts (在庫移動の開始)
・・・
const item = process.argv[2]
const qty = parseInt(process.argv[3])
const from = process.argv[4]
const to = process.argv[5]

const query = gql`
    mutation {
        start(input: { item: "${item}", qty: ${qty}, from: "${from}", to: "${to}" }) {
            __typename
            id
            info {
                item
                qty
                from
                to
            }
        }
    }
`

request(endpoint, query)
    .then(r => console.log(r))
    .catch(err => console.error(err))
start_move.ts 実行例
> ts-node start_move.ts item-1 5 store-A store-B
{
  start: {
    __typename: 'DraftStockMove',
    id: 'move-cfa1fc9c-b599-4854-8385-207cbb77e8a3',
    info: { item: 'item-1', qty: 5, from: 'store-A', to: 'store-B' }
  }
}
client/find_move.ts (在庫移動の取得)
・・・
const id = process.argv[2]

const query = gql`
    {
        findMove(id: "${id}") {
            __typename
            id
            info {
                item
                qty
                from
                to
            }
            ... on AssignedStockMove {
                assigned
            }
            ... on ShippedStockMove {
                outgoing
            }
            ... on ArrivedStockMove {
                outgoing
                incoming
            }
            ... on CompletedStockMove {
                outgoing
                incoming
            }
        }
    }
`

request(endpoint, query)
    .then(r => console.log(r))
    .catch(err => console.error(err))
find_move.ts 実行例
> ts-node find_move.ts move-cfa1fc9c-b599-4854-8385-207cbb77e8a3
{
  findMove: {
    __typename: 'CompletedStockMove',
    id: 'move-cfa1fc9c-b599-4854-8385-207cbb77e8a3',
    info: { item: 'item-1', qty: 5, from: 'store-A', to: 'store-B' },
    outgoing: 5,
    incoming: 5
  }
}

Go言語と Rust で Mutex による排他制御

以下の 3通りを Go 言語と Rust でそれぞれ実装してみました。

サンプルコードは http://github.com/fits/try_samples/tree/master/blog/20201122/

Go 言語の場合

まずは Go 言語による実装です。 Go 言語では goroutine で軽量スレッドによる並行処理を実施できます。

ここでは、goroutine の終了を待機するために sync.WaitGroup を使いました。

WaitGroup では Add した数に相当する Done が呼び出されるまで Wait でブロックして待機する事が可能です。

go_mutex_sample.go
package main

import (
    "fmt"
    "sync"
)

type Data struct {
    value int
}

// (a)
func noLock() {
    var wg sync.WaitGroup

    var ds []Data

    for i := 0; i < 100; i++ {
        wg.Add(1)

        go func() {
            ds = append(ds, Data{i})
            wg.Done()
        }()
    }
    // goroutine の終了を待機
    wg.Wait()

    fmt.Println("(a) noLock length =", len(ds))
}

// (b)
func useMutex() {
    var wg sync.WaitGroup
    var mu sync.Mutex

    var ds []Data

    for i := 0; i < 100; i++ {
        wg.Add(1)

        go func() {
            mu.Lock()
            ds = append(ds, Data{i})
            mu.Unlock()

            wg.Done()
        }()
    }

    wg.Wait()

    fmt.Println("(b) useMutex length =", len(ds))
}

// (c)
func useRWMutex() {
    var wg sync.WaitGroup
    var mu sync.RWMutex

    var ds []Data

    for i := 0; i < 100; i++ {
        wg.Add(1)

        go func() {
            mu.Lock()
            ds = append(ds, Data{i})
            mu.Unlock()

            wg.Done()
        }()
    }

    for i := 0; i < 5; i++ {
        wg.Add(1)

        go func() {
            mu.RLock()
            fmt.Println("(c) progress length =", len(ds))
            mu.RUnlock()

            wg.Done()
        }()
    }

    wg.Wait()

    fmt.Println("(c) useRWMutex length =", len(ds))
}

func main() {
    noLock()

    println("-----")

    useMutex()

    println("-----")

    useRWMutex()
}

実行結果は以下の通りです。

排他制御を行っていない (a) では、同じ状態の ds に対して複数の goroutine が ds = append(ds, Data{i}) を実行してしまうケースが発生するため、基本的に結果が 100 にはなりません。

実行結果
> go build go_mutex_sample.go

> go_mutex_sample
(a) noLock length = 95
-----
(b) useMutex length = 100
-----
(c) progress length = 71
(c) progress length = 72
(c) progress length = 72
(c) progress length = 72
(c) progress length = 72
(c) useRWMutex length = 100

Rust の場合

次は Rust による実装です。 ここでは thread::spawn で並行処理を実施し join で待機しています。

基本的に Rust では、Go 言語で実装した noLock のようなスレッドセーフでは無い処理はコンパイルエラーとなって実装できないように工夫されています。

スレッド間で安全に所有権を共有するには、スレッドセーフな参照カウントのポインタである Arc を使用する事になります。

排他制御なしの (a) の処理(下記コードの no_lock)をコンパイルが通るように一応は実装してみましたが、この Arc::get_mut(&mut ds) は常に None を返すので ds.push(Data(i)) の処理は実行されません。

rust_mutex_sample.rs
use std::thread;
use std::sync::{Arc, Mutex, RwLock};

struct Data(i32);

// (a)
fn no_lock() {
    let mut hs = Vec::new();
    let ds = Arc::new(Vec::new());

    for i in 0..100 {
        let mut ds = ds.clone();

        hs.push(
            thread::spawn(move || {
                if let Some(ds) = Arc::get_mut(&mut ds) {
                    // 上記は常に None となるので下記処理は実行されない
                    ds.push(Data(i))
                }
            })
        );
    }

    for h in hs {
        let _ = h.join();
    }

    println!("(a) no_lock length = {}", ds.len());
}

// (b)
fn use_mutex() {
    let mut hs = Vec::new();
    let ds = Arc::new(Mutex::new(Vec::new()));

    for i in 0..100 {
        let ds = ds.clone();

        hs.push(
            thread::spawn(move || {
                if let Ok(mut ds) = ds.lock() {
                    ds.push(Data(i));
                }
            })
        );
    }

    for h in hs {
        let _ = h.join();
    }

    println!("(b) use_mutex length = {}", ds.lock().unwrap().len());
}

// (c)
fn use_rwlock() {
    let mut hs = Vec::new();
    let ds = Arc::new(RwLock::new(Vec::new()));

    for i in 0..100 {
        let ds = ds.clone();

        hs.push(
            thread::spawn(move || {
                if let Ok(mut ds) = ds.write() {
                    ds.push(Data(i));
                }
            })
        );
    }

    for _ in 0..5 {
        let ds = ds.clone();

        hs.push(
            thread::spawn(move || {
                if let Ok(ds) = ds.read() {
                    println!("(c) progress length = {}", ds.len());
                }
            })
        );
    }

    for h in hs {
        let _ = h.join();
    }

    println!("(c) use_rwlock length = {}", ds.read().unwrap().len());
}

fn main() {
    no_lock();

    println!("-----");

    use_mutex();

    println!("-----");

    use_rwlock();
}

実行結果は以下の通りです。

no_lock 内の ds.push(Data(i)) は実行されないので結果は 0 となります。

実行結果
> rustc rust_mutex_sample.rs

> rust_mutex_sample
(a) no_lock length = 0
-----
(b) use_mutex length = 100
-----
(c) progress length = 99
(c) progress length = 99
(c) progress length = 99
(c) progress length = 99
(c) progress length = 99
(c) use_rwlock length = 100

rusty_v8 を使って Rust から JavaScript を実行

Node.js の製作者が新たに作り直した Deno という JavaScript/TypeScript 実行環境があります。

Deno の内部では、V8 JavaScript エンジンの呼び出しに rusty_v8 という Rust 用バインディングを使っていたので、今回はこの rusty_v8 を使って Rust コード内で JavaScript コードを実行してみました。

今回のサンプルコードは http://github.com/fits/try_samples/tree/master/blog/20200705/

設定

rusty_v8 を使うための Cargo 用の dependencies 設定は以下のようになります。

Cargo.toml
・・・
[dependencies]
rusty_v8 = "0.6"

JavaScript コード実行

以下の JavaScript コードを実行し結果(1 ~ 5 の合計値)を出力する処理を Rust で実装してみます。

実行する JavaScript コード
const vs = [1, 2, 3, 4, 5]
console.log(vs)
vs.reduce((acc, v) => acc + v, 0)

基本的に、下記 V8 API による手順と同様の処理を rusty_v8 の API で実装すればよさそうです。

V8 エンジンのインスタンスである Isolate(独自のヒープを持ち他のインスタンスとは隔離される)、GC で管理するオブジェクトへの参照をまとめて管理する HandleScopeサンドボックス化された実行コンテキストの Context(組み込みのオブジェクト・関数を管理)をそれぞれ作成していきます。

そして、JavaScript のコードを Script::compileコンパイルして、run で実行します。

run の戻り値は Option<Local<Value>> となっているので、ここでは to_string を使って JavaScript の String(Option<Local<rusty_v8::String>>)として取得し、rusty_v8::String を to_rust_string_lossy を使って Rust の String へ変換して出力しています。

src/sample1.rs
use rusty_v8 as v8;

fn main() {
    let platform = v8::new_default_platform().unwrap();
    v8::V8::initialize_platform(platform);
    v8::V8::initialize();

    let isolate = &mut v8::Isolate::new(Default::default());

    let scope = &mut v8::HandleScope::new(isolate);
    let context = v8::Context::new(scope);
    let scope = &mut v8::ContextScope::new(scope, context);

    // JavaScript コード
    let src = r#"
        const vs = [1, 2, 3, 4, 5]
        console.log(vs)
        vs.reduce((acc, v) => acc + v, 0)
    "#;

    v8::String::new(scope, src)
        .map(|code| {
            println!("code: {}", code.to_rust_string_lossy(scope));
            code
        })
        .and_then(|code| v8::Script::compile(scope, code, None)) //コンパイル
        .and_then(|script| script.run(scope)) //実行
        .and_then(|value| value.to_string(scope)) // rusty_v8::Value を rusty_v8::String へ
        .iter()
        .for_each(|s| println!("result: {}", s.to_rust_string_lossy(scope)));
}

実行

複数の実行ファイルに対応した下記 Cargo.toml を使って実行します。

Cargo.toml
[package]
・・・
default-run = "sample1"

[dependencies]
rusty_v8 = "0.6"

[[bin]]
name = "sample1"
path = "src/sample1.rs"

[[bin]]
name = "sample2"
path = "src/sample2.rs"

[[bin]]
name = "sample3"
path = "src/sample3.rs"

sample1(src/sample1.rs)の実行結果は以下の通りです。

console.log に関しては何も処理されていませんが、JavaScript の実行結果は取得できています。

sample1 実行結果
> cargo run

・・・
     Running `target\debug\sample1.exe`
code:
        const vs = [1, 2, 3, 4, 5]
        console.log(vs)
        vs.reduce((acc, v) => acc + v, 0)

result: 15

Inspector 機能

次に、console.log されたログメッセージを Rust から出力するようにしてみます。

V8 で console.log (デバッグコンソールへのログ出力)のようなデバッグ機能を処理するには Inspector という機能を使うようです。

rusty_v8 では、console.log 等の呼び出し時に V8InspectorClientImpl トレイトの console_api_message が呼びされるようになっているため、これを実装した構造体のインスタンスから V8Inspector を作成して context_created を実行する事で実現できます。

context_created のシグネチャfn context_created(&mut self, context: Local<Context>, context_group_id: i32, human_readable_name: StringView) となっており、context_group_id 引数へ指定した値が、console_api_message の context_group_id 引数の値となります。(human_readable_name の用途に関してはよく分からなかった)

また、console_api_message の level 引数の値はログレベル(V8 API の MessageErrorLevel) のようです。

ちなみに、V8InspectorClientImpl トレイトは basebase_mut の実装が必須でした。(console_api_message は空実装されている)

src/sample2.rs
use rusty_v8 as v8;
use rusty_v8::inspector::*;

struct InspectorClient(V8InspectorClientBase);

impl InspectorClient {
    fn new() -> Self {
        Self(V8InspectorClientBase::new::<Self>())
    }
}

impl V8InspectorClientImpl for InspectorClient {
    fn base(&self) -> &V8InspectorClientBase {
        &self.0
    }

    fn base_mut(&mut self) -> &mut V8InspectorClientBase {
        &mut self.0
    }

    fn console_api_message(&mut self, _context_group_id: i32, 
        _level: i32, message: &StringView, _url: &StringView, 
        _line_number: u32, _column_number: u32, _stack_trace: &mut V8StackTrace) {
        // ログメッセージの出力
        println!("{}", message);
    }
}

fn main() {
    ・・・

    let isolate = &mut v8::Isolate::new(Default::default());

    // V8Inspector の作成
    let mut client = InspectorClient::new();
    let mut inspector = V8Inspector::create(isolate, &mut client);

    let scope = &mut v8::HandleScope::new(isolate);
    let context = v8::Context::new(scope);
    let scope = &mut v8::ContextScope::new(scope, context);

    // context_created の実行
    inspector.context_created(context, 1, StringView::empty());

    let src = r#"
        const vs = [1, 2, 3, 4, 5]
        console.log(vs)
        vs.reduce((acc, v) => acc + v, 0)
    "#;

    ・・・
}

実行結果は以下の通り。

console.log(vs) を処理して 1,2,3,4,5 が出力されるようになりました。

sample2 実行結果
> cargo run --bin sample2

・・・
     Running `target\debug\sample2.exe`
code:
        const vs = [1, 2, 3, 4, 5]
        console.log(vs)
        vs.reduce((acc, v) => acc + v, 0)

1,2,3,4,5
result: 15

最後に、context_group_idlevel の値も出力するようにしてみます。

src/sample3.rs
・・・

impl V8InspectorClientImpl for InspectorClient {
    ・・・

    fn console_api_message(&mut self, context_group_id: i32, 
        level: i32, message: &StringView, _url: &StringView, 
        _line_number: u32, _column_number: u32, _stack_trace: &mut V8StackTrace) {

            println!(
                "*** context_group_id={}, level={}, message={}", 
                context_group_id, 
                level, 
                message
            );
    }
}

fn main() {
    ・・・

    inspector.context_created(context, 123, StringView::empty());

    let src = r#"
        console.log('log')
        console.debug('debug')
        console.info('info')
        console.error('error')
        console.warn('warn')
    "#;

    v8::String::new(scope, src)
        .and_then(|code| v8::Script::compile(scope, code, None))
        .and_then(|script| script.run(scope))
        .and_then(|value| value.to_string(scope))
        .iter()
        .for_each(|s| println!("result: {}", s.to_rust_string_lossy(scope)));
}

実行結果は以下の通りです。

sample3 実行結果
> cargo run --bin sample3

・・・
     Running `target\debug\sample3.exe`
*** context_group_id=123, level=4, message=log
*** context_group_id=123, level=2, message=debug
*** context_group_id=123, level=4, message=info
*** context_group_id=123, level=8, message=error
*** context_group_id=123, level=16, message=warn
result: undefined

Rust で WASI 対応の WebAssembly を作成して実行

Rust で WASI 対応の WebAssembly を作って、スタンドアロン実行や Web ブラウザ上での実行を試してみました。

WASI(WebAssembly System Interface) は WebAssembly のコードを様々なプラットフォームで実行するためのインターフェースで、これに対応した WebAssembly であれば Web ブラウザ外で実行できます。

Rust で WASI 対応の WebAssembly を作るのは簡単で、ビルドターゲットに wasm32-wasi を追加しておいて、rustccargo build によるビルド時に --target wasm32-wasi を指定するだけでした。

wasm32-wasi の追加
> rustup target add wasm32-wasi

標準出力へ文字列を出力するだけの下記サンプルコードを --target wasm32-wasi でビルドすると sample1.wasm ファイルが作られました。

sample1.rs
fn main() {
    for i in 1..=3 {
        println!("count-{}", i);
    }

    print!("aaa");
    print!("bbb");
}
WASI 対応の WebAssembly として sample1.rs をビルド
> rustc --target wasm32-wasi sample1.rs

なお、今回のビルドに使用した Rust のバージョンは以下の通りです。

  • Rust 1.43.0

また、使用したソースコードhttp://github.com/fits/try_samples/tree/master/blog/20200429/ に置いてあります。

(1) スタンドアロン用ランタイムで実行

sample1.wasm を WebAssembly のランタイム wasmtimewasmer でそれぞれ実行してみます。

(1-a) wasmtime で実行

wasmtime v0.15.0 による sample1.wasm 実行結果
> wasmtime sample1.wasm
count-1
count-2
count-3
aaabbb

(1-b) wasmer で実行

wasmer v0.16.2 による sample1.wasm 実行結果
> wasmer sample1.wasm
count-1
count-2
count-3
aaabbb

どちらのランタイムでも問題なく実行できました。

(2) Web ブラウザ上で実行

次は、sample1.wasm を外部ライブラリ等を使わずに Web ブラウザ上で実行してみます。

主要な Web ブラウザや Node.js は JavaScript 用の WebAssembly API に対応済みのため、WebAssembly を実行可能です。

WASI 対応 WebAssembly の場合、実行対象の WebAssembly がインポートしている WASI の関数(の実装)を WebAssembly インスタンス化関数(WebAssembly.instantiate()WebAssembly.instantiateStreaming())の第二引数(引数名 importObject)として渡す必要があるようです。

(2-a) WebAssembly のインポート内容を確認

WebAssembly.compile() 関数で取得した WebAssembly.Module オブジェクトを WebAssembly.Module.imports() 関数へ渡す事で、その WebAssembly がインポートしている内容を取得できます。

ここでは、以下の Node.js スクリプトを使って WebAssembly のインポート内容を確認してみました。

wasm_listup_imports.js (WebAssembly のインポート内容を出力)
const fs = require('fs')

const wasmFile = process.argv[2]

const run = async () => {
    const module = await WebAssembly.compile(fs.readFileSync(wasmFile))

    const imports = WebAssembly.Module.imports(module)

    console.log(imports)
}

run().catch(err => console.error(err))

sample1.wasm へ適用してみると以下のような結果となりました。

インポート内容の出力結果(Node.js v12.16.2 で実行)
> node wasm_listup_imports.js sample1.wasm
[
  {
    module: 'wasi_snapshot_preview1',
    name: 'proc_exit',
    kind: 'function'
  },
  {
    module: 'wasi_snapshot_preview1',
    name: 'fd_write',
    kind: 'function'
  },
  {
    module: 'wasi_snapshot_preview1',
    name: 'fd_prestat_get',
    kind: 'function'
  },
  {
    module: 'wasi_snapshot_preview1',
    name: 'fd_prestat_dir_name',
    kind: 'function'
  },
  {
    module: 'wasi_snapshot_preview1',
    name: 'environ_sizes_get',
    kind: 'function'
  },
  {
    module: 'wasi_snapshot_preview1',
    name: 'environ_get',
    kind: 'function'
  }
]

この結果から、sample1.wasm は以下のようにしてインスタンス化できる事になります。

WebAssembly インスタンス化の例
const importObject = {
    wasi_snapshot_preview1: {
        proc_exit: () => {・・・},
        fd_write: () => {・・・},
        fd_prestat_get: () => {・・・},
        fd_prestat_dir_name: () => {・・・},
        environ_sizes_get: () => {・・・},
        environ_get: () => {・・・}
    }
}

WebAssembly.instantiate(・・・, importObject)
    ・・・

(2-b) fd_write 関数の実装

Rust の println! で呼び出される WASI の関数は fd_write なので、これを実装してみます。

fd_write の引数は 4つで、第一引数 fd は出力先のファイルディスクリプタで標準出力の場合は 1、第二引数 iovs は出力内容へのポインタ、第三引数 iovsLen は出力内容の数、第四引数 nwritten は出力済みのバイト数を設定するポインタとなっています。

なお、ポインタの対象は WebAssembly.instantiate() で取得した WebAssembly のインスタンスに含まれている WebAssembly.Memory です。

出力内容は iovs ポインタの位置から 4バイト毎に以下のような並びで情報が格納されているようなので、これを基に出力対象の文字列を取得して出力する事になります。

  • 1個目の出力内容の格納先ポインタ(4バイト)
  • 1個目の出力内容のバイトサイズ(4バイト)
  • ・・・
  • iovsLen 個目の出力内容の格納先ポインタ(4バイト)
  • iovsLen 個目の出力内容のバイトサイズ(4バイト)

何処まで処理を行ったか(出力したか)を返すために、nwritten ポインタの位置へ出力の完了したバイトサイズを設定します。

fd_write の実装例(wasmInstance には WebAssembly のインスタンスを設定)
・・・
fd_write: (fd, iovs, iovsLen, nwritten) => {
    const memory = wasmInstance.exports.memory.buffer
    const view = new DataView(memory)

    const sizeList = Array.from(Array(iovsLen), (v, i) => {
        const ptr = iovs + i * 8

        // 出力内容の格納先のポインタ取得
        const bufStart = view.getUint32(ptr, true)
        // 出力内容のバイトサイズを取得
        const bufLen = view.getUint32(ptr + 4, true)

        const buf = new Uint8Array(memory, bufStart, bufLen)

        // 出力内容の String 化
        const msg = String.fromCharCode(...buf)

        // 出力
        console.log(msg)

        return buf.byteLength
    })

    // 出力済みのバイトサイズ合計
    const totalSize = sizeList.reduce((acc, v) => acc + v)

    // 出力済みのバイトサイズを設定
    view.setUint32(nwritten, totalSize, true)

    return 0
},
・・・

最終的な HTML は下記のようになりました。

fd_write 以外の WASI 関数を空実装にして main 関数を呼び出して実行するようにしていますが、WASI の仕様としては _start 関数を呼び出すのが正しいようです ※。(WASI Application ABI 参照)

 ※ _start 関数を使う場合、fd_prestat_get 等の実装も必要となります

WebAssembly がインポートしている WASI 関数の実装をインスタンス化時(WebAssembly.instantiateStreaming)に渡す事になりますが、WASI の関数(fd_write 等)はインスタンス化の結果を使って処理する点に注意が必要です。

index.html(main 関数版)
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8" />
</head>
<body>
  <h1>WASI WebAssembly Sample</h1>
  <div id="res"></div>

  <script>
    const WASM_URL = './sample1.wasm'

    const wasiObj = {
      wasmInstance: null,
      importObject: {
        wasi_snapshot_preview1: {
          fd_write: (fd, iovs, iovsLen, nwritten) => {
            console.log(`*** call fd_write: fd=${fd}, iovs=${iovs}, iovsLen=${iovsLen}, nwritten=${nwritten}`)

            const memory = wasiObj.wasmInstance.exports.memory.buffer
            const view = new DataView(memory)

            const sizeList = Array.from(Array(iovsLen), (v, i) => {
              const ptr = iovs + i * 8

              const bufStart = view.getUint32(ptr, true)
              const bufLen = view.getUint32(ptr + 4, true)

              const buf = new Uint8Array(memory, bufStart, bufLen)

              const msg = String.fromCharCode(...buf)

              // 出力
              console.log(msg)
              document.getElementById('res').innerHTML += `<p>${msg}</p>`

              return buf.byteLength
            })

            const totalSize = sizeList.reduce((acc, v) => acc + v)

            view.setUint32(nwritten, totalSize, true)

            return 0
          },
          proc_exit: () => {},
          fd_prestat_get: () => {},
          fd_prestat_dir_name: () => {},
          environ_sizes_get: () => {},
          environ_get: () => {}
        }
      }
    }

    WebAssembly.instantiateStreaming(fetch(WASM_URL), wasiObj.importObject)
      .then(res => {
        console.log(res)

        // fd_write で参照できるようにインスタンスを wasmInstance へ設定
        wasiObj.wasmInstance = res.instance

        // main 関数の実行
        wasiObj.wasmInstance.exports.main()
      })
      .catch(err => console.error(err))
  </script>
</body>
</html>

main 関数の代わりに _start 関数を呼び出す場合は下記のようになりました。

_start 関数版の場合、fd_prestat_get の実装が重要となります ※。

 ※ fd_prestat_get を正しく実装していないと、
    fd_prestat_get の呼び出しが延々と繰り返されてしまいました

今回はファイル等を使っていないので(file descriptor 3 以降を開いていない)、fd_prestat_get は単に 8(WASI_EBADF, Bad file descriptor)を返すだけで良さそうです。

index2.html(_start 関数版)
・・・
  <script>
    const WASM_URL = './sample1.wasm'

    const wasiObj = {
      wasmInstance: null,
      importObject: {
        wasi_snapshot_preview1: {
          ・・・
          fd_prestat_get: () => 8,
          ・・・
        }
      }
    }

    WebAssembly.instantiateStreaming(fetch(WASM_URL), wasiObj.importObject)
      .then(res => {
        console.log(res)

        wasiObj.wasmInstance = res.instance
        // _start 関数の実行
        wasiObj.wasmInstance.exports._start()
      })
      .catch(err => console.error(err))
  </script>
・・・

(2-c) 実行

上記の .html ファイルを Web ブラウザで直接開いても WebAssembly を実行できないため、HTTP サーバーを使う事になります。

更に、Web ブラウザ上で WebAssembly を実行するには、.wasm ファイルを MIME Type application/wasm で取得する必要があるようです。

Python の http.server は application/wasm に対応していたため(Python 3.8.2 と 3.7.6 で確認)、以下のスクリプトで HTTP サーバーを立ち上げる事にしました。

web_server.py
import http.server
import socketserver

PORT = 8080

Handler = http.server.SimpleHTTPRequestHandler

with socketserver.TCPServer(("", PORT), Handler) as httpd:
    print(f"start server port:{PORT}")
    httpd.serve_forever()
HTTP サーバー起動(Python 3.8.2 で実行)
> python web_server.py
start server port:8080

Web ブラウザ(Chrome)で http://localhost:8080/index.html へアクセスしたところ(index2.html でも同様)、sample1.wasm の実行を確認できました。

Chrome の実行結果

f:id:fits:20200429200738p:plain

(3) Node.js で組み込み実行

次は、Node.js で WebAssembly を組み込み実行してみます。

(3-a) fd_write 実装

上記 index2.html の処理をベースにローカルの .wasm ファイルを読み込んで実行するようにしました。

sample1.wasm のインポート内容に合わせたものなので、インポート内容の異なる WebAssembly の実行には使えません。

wasm_run_sample.js
const fs = require('fs')

const WASI_ESUCCESS = 0;
const WASI_EBADF = 8; // Bad file descriptor

const wasmFile = process.argv[2]

const wasiObj = {
    wasmInstance: null,
    importObject: {
        wasi_snapshot_preview1: {
            fd_write: (fd, iovs, iovsLen, nwritten) => {
                ・・・
                
                const sizeList = Array.from(Array(iovsLen), (v, i) => {
                    ・・・
                    
                    process.stdout.write(msg)
                    
                    return buf.byteLength
                })
                
                ・・・
                
                return WASI_ESUCCESS
            },
            ・・・
            fd_prestat_get: (fd, bufPtr) => { 
                console.log(`*** call fd_prestat_get: fd=${fd}, bufPtr=${bufPtr}`)
                return WASI_EBADF
            },
            ・・・
        }
    }
}

const buf = fs.readFileSync(wasmFile)

WebAssembly.instantiate(buf, wasiObj.importObject)
    .then(res => {
        wasiObj.wasmInstance = res.instance
        wasiObj.wasmInstance.exports._start()
    })
    .catch(err => console.error(err))
実行結果(Node.js v12.16.2 で実行)
> node wasm_run_sample.js sample1.wasm
*** call fd_prestat_get : fd=3, bufPtr=1048568
*** call fd_write: fd=1, iovs=1047968, iovsLen=1, nwritten=1047948
count-1
*** call fd_write: fd=1, iovs=1047968, iovsLen=1, nwritten=1047948
count-2
*** call fd_write: fd=1, iovs=1047968, iovsLen=1, nwritten=1047948
count-3
*** call fd_write: fd=1, iovs=1048432, iovsLen=1, nwritten=1048412
aaabbb

(3-b) Wasmer-JS 使用

Wasmer-JS@wasmer/wasi モジュールを使って、もっと汎用的に組み込み実行できるようにしてみます。

@wasmer/wasi インストール例
> npm install @wasmer/wasi

@wasmer/wasi の WASI を使う事で、インポート内容に合わせた WASI 関数の取得や _start 関数の呼び出しを任せる事ができます。

run_wasmer_js/index.js
const fs = require('fs')
const { WASI } = require('@wasmer/wasi')

const wasmFile = process.argv[2]

const wasi = new WASI()

const run = async () => {
    const module = await WebAssembly.compile(fs.readFileSync(wasmFile))
    // インポート内容に合わせた WASI 関数の実装を取得
    const importObject = wasi.getImports(module)

    const instance = await WebAssembly.instantiate(module, importObject)

    // 実行
    wasi.start(instance)
}

run().catch(err => console.error(err))
実行結果(Node.js v12.16.2 で実行)
> node index.js ../sample1.wasm
count-1
count-2
count-3
aaabbb

(4) 標準出力以外の機能

最後に、現時点でどんな機能を使えるのか気になったので、いくつか試してみました。

まず、TcpStream を使ったコードの wasm32-wasi ビルドは一応成功しました。

sample2.rs
use std::net::TcpStream;

fn main() {
    let res = TcpStream::connect("127.0.0.1:8080");
    println!("{:?}", res);
}
sample2.rs ビルド
> rustc --target wasm32-wasi sample2.rs

ただし、実行してみると以下のような結果となりました。(wasmtime 以外で実行しても同じ)

wasmtime による sample2.wasm 実行結果
> wasmtime sample2.wasm

Err(Custom { kind: Other, error: "operation not supported on wasm yet" })

Rust のソースコードで該当(すると思われる)箇所を確認してみると、unsupported() を返すようになっていました。

https://github.com/rust-lang/rust/blob/master/src/libstd/sys/wasi/net.rs(2020/4/26 時点)
・・・
impl TcpStream {
    pub fn connect(_: io::Result<&SocketAddr>) -> io::Result<TcpStream> {
        unsupported()
    }
    ・・・
}
・・・

https://github.com/rust-lang/rust/blob/master/src/libstd/sys/wasi/ のソースを確認してみると、(スレッド系の処理等)他にも未対応の機能がいくつもありました。

一方で、環境変数・システム時間・スリープ処理は使えそうだったので、以下のコードで確認してみました。

sample3.rs
use std::env;
use std::thread::sleep;
use std::time::{ Duration, SystemTime };

fn main() {
    // 環境変数 SLEEP_TIME からスリープする秒数を取得(デフォルトは 1)
    let sleep_sec = env::var("SLEEP_TIME").ok()
                         .and_then(|v| v.parse::<u64>().ok())
                         .unwrap_or(1);

    // システム時間の取得
    let time = SystemTime::now();

    println!("start: sleep {}s", sleep_sec);

    // スリープの実施
    sleep(Duration::from_secs(sleep_sec));

    // 経過時間の出力
    match time.elapsed() {
        Ok(s) => println!("end: elapsed {}s", s.as_secs()),
        Err(e) => println!("error: {:?}", e),
    }
}
sample3.rs のビルド
> rustc --target wasm32-wasi sample3.rs

wasmtime では正常に実行できましたが、wasmer は今のところスリープ処理に対応していないようでエラーとなりました。

ちなみに、環境変数はどちらのコマンドも --env で指定できました。

wasmtime v0.15.0 による sample3.wasm 実行結果
> wasmtime --env SLEEP_TIME=5 sample3.wasm
start: sleep 5s
end: elapsed 5s
wasmer v0.16.2 による sample3.wasm 実行結果
> wasmer sample3.wasm --env SLEEP_TIME=5
start: sleep 5s
thread 'main' panicked at 'not yet implemented: Polling not implemented for clocks yet', lib\wasi\src\syscalls\mod.rs:2373:21
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace.
Error: error: "unhandled trap at 7fffd474a799 - code #e06d7363: unknown exception code"