Go言語で GraphQL - graph-gophers/graphql-go で Interface を試す
前回の graph-gophers/graphql-go を使って、GraphQL の Interface を扱ってみます。
ソースは http://github.com/fits/try_samples/tree/master/blog/20210125/
はじめに
GraphQL には Interface と Union という類似の機能が用意されており、共通のフィールドを設けるかどうかによって使い分けるようになっています。
graph-gophers/graphql-go では、具体的な型(Interface の実装型や Union の要素型)への変換メソッドを用意する事で Interface や Union を扱えるようになっています。
具体型への変換メソッド
func To<GraphQLの型名>() (<Goの型>, bool)
Go 側の実装方法はいくつか考えられるので、試しに 3通りで実装してみました。
(1) 基本形
まずは、graph-gophers/graphql-go の examples で使われている実装方法です。
Go の実装方法 | |
---|---|
GraphQL Interface | interface 埋め込み struct |
GraphQL Interface 実装型 | struct |
GraphQL 実装型への変換 | struct へのキャスト |
sample1.go
package main import ( "context" "encoding/json" graphql "github.com/graph-gophers/graphql-go" ) const ( // GraphQL スキーマ定義 schemaString = ` interface Event { id: ID! } type Created implements Event { id: ID! title: String! } type Deleted implements Event { id: ID! reason: String } type Query { events: [Event!]! } ` ) // GraphQL の Event に対応 type event interface { ID() graphql.ID } // GraphQL の Created に対応 type created struct { id string title string } func (c *created) ID() graphql.ID { return graphql.ID(c.id) } func (c *created) Title() string { return c.title } // GraphQL の Deleted に対応 type deleted struct { id string reason string } func (d *deleted) ID() graphql.ID { return graphql.ID(d.id) } func (d *deleted) Reason() *string { if d.reason == "" { return nil } return &d.reason } // GraphQL の Event に対応 type eventResolver struct { event } // GraphQL の Created 型への変換処理 func (r *eventResolver) ToCreated() (*created, bool) { c, ok := r.event.(*created) return c, ok } // GraphQL の Deleted 型への変換処理 func (r *eventResolver) ToDeleted() (*deleted, bool) { d, ok := r.event.(*deleted) return d, ok } type resolver struct{} func (r *resolver) Events() []*eventResolver { return []*eventResolver{ {&created{id: "i-1", title: "sample1"}}, {&deleted{id: "i-1"}}, {&created{id: "i-2", title: "sample2"}}, {&created{id: "i-3", title: "sample3"}}, {&deleted{id: "i-3", reason: "test"}}, } } func main() { schema := graphql.MustParseSchema(schemaString, new(resolver)) q := ` { events { __typename id ... on Created { title } ... on Deleted { reason } } } ` r := schema.Exec(context.Background(), q, "", nil) b, err := json.Marshal(r) if err != nil { panic(err) } println(string(b)) }
実行結果は以下の通りです。
実行結果
> go build sample1.go > sample1 {"data":{"events":[{"__typename":"Created","id":"i-1","title":"sample1"},{"__typename":"Deleted","id":"i-1","reason":null},{"__typename":"Created","id":"i-2","title":"sample2"},{"__typename":"Created","id":"i-3","title":"sample3"},{"__typename":"Deleted","id":"i-3","reason":"test"}]}}
Union の場合
Interface の代わりに Union を使った場合は以下のようになります。
sample1_union.go
・・・ const ( schemaString = ` union Event = Created | Deleted type Created { id: ID! title: String! } type Deleted { id: ID! reason: String } type Query { events: [Event!]! } ` ) type event interface{} ・・・ func main() { schema := graphql.MustParseSchema(schemaString, new(resolver)) q := ` { events { __typename ... on Created { id title } ... on Deleted { id reason } } } ` ・・・ }
(2) OneOf
次は、gRPC の oneof を参考にした実装です。
こちらは Interface よりも Union の実装に向いているかもしれません。
Go の実装方法 | |
---|---|
GraphQL Interface | GraphQL 実装型毎にフィールドを用意した struct |
GraphQL Interface 実装型 | struct |
GraphQL 実装型への変換 | nil では無いフィールド値を返す |
sample2.go
・・・ // GraphQL の Created に対応 type created struct { id string title string } func (c *created) ID() graphql.ID { return graphql.ID(c.id) } func (c *created) Title() string { return c.title } // GraphQL の Deleted に対応 type deleted struct { id string reason string } func (d *deleted) ID() graphql.ID { return graphql.ID(d.id) } func (d *deleted) Reason() *string { if d.reason == "" { return nil } return &d.reason } // GraphQL の Event に対応 type event struct { created *created deleted *deleted } func (e *event) ID() graphql.ID { if e.created == nil { return e.deleted.ID() } return e.created.ID() } // GraphQL の Created 型への変換処理 func (e *event) ToCreated() (*created, bool) { if e.created == nil { return nil, false } return e.created, true } // GraphQL の Deleted 型への変換処理 func (e *event) ToDeleted() (*deleted, bool) { if e.deleted == nil { return nil, false } return e.deleted, true } type resolver struct{} func (r *resolver) Events() []*event { return []*event{ {created: &created{id: "i-1", title: "sample1"}}, {deleted: &deleted{id: "i-1"}}, {created: &created{id: "i-2", title: "sample2"}}, {created: &created{id: "i-3", title: "sample3"}}, {deleted: &deleted{id: "i-3", reason: "test"}}, } } func main() { ・・・ }
実行結果
> go build sample2.go > sample2 {"data":{"events":[{"__typename":"Created","id":"i-1","title":"sample1"},{"__typename":"Deleted","id":"i-1","reason":null},{"__typename":"Created","id":"i-2","title":"sample2"},{"__typename":"Created","id":"i-3","title":"sample3"},{"__typename":"Deleted","id":"i-3","reason":"test"}]}}
(c) オールインワン
最後に、GraphQL Interface の実装型を単一の struct へ集約してみました。
実装内容が分かり難くなりそうで微妙かもしれません。
Go の実装方法 | |
---|---|
GraphQL Interface | GraphQL 実装型の全フィールドを備えた struct |
GraphQL Interface 実装型 | interface |
GraphQL 実装型への変換 | フラグやフィールド値の有無で判定して自身を返す |
sample3.go
・・・ // GraphQL の Created に対応 type created interface { ID() graphql.ID Title() string } // GraphQL の Deleted に対応 type deleted interface { ID() graphql.ID Reason() *string } // GraphQL の Event に対応 type event struct { id string title string reason string del bool // Created と Deleted の判定用 } func (e *event) ID() graphql.ID { return graphql.ID(e.id) } func (e *event) Title() string { return e.title } func (e *event) Reason() *string { if e.reason == "" { return nil } return &e.reason } // GraphQL の Created 型への変換処理 func (e *event) ToCreated() (created, bool) { if e.del { return nil, false } return e, true } // GraphQL の Deleted 型への変換処理 func (e *event) ToDeleted() (deleted, bool) { if e.del { return e, true } return nil, false } type resolver struct{} func (r *resolver) Events() []*event { return []*event{ {id: "i-1", title: "sample1"}, {id: "i-1", del: true}, {id: "i-2", title: "sample2"}, {id: "i-3", title: "sample3"}, {id: "i-3", reason: "test", del: true}, } } func main() { ・・・ }
実行結果
> go build sample3.go > sample3 {"data":{"events":[{"__typename":"Created","id":"i-1","title":"sample1"},{"__typename":"Deleted","id":"i-1","reason":null},{"__typename":"Created","id":"i-2","title":"sample2"},{"__typename":"Created","id":"i-3","title":"sample3"},{"__typename":"Deleted","id":"i-3","reason":"test"}]}}
Go言語で GraphQL - graph-gophers/graphql-go で Query, Mutation, Subscription を試す
Go言語で GraphQL を扱うライブラリはいくつかありますが、今回は下記を試しました。
文字列として定義した GraphQL スキーマを使うようになっており、それなりに使い易いと思います。
今回のソースは http://github.com/fits/try_samples/tree/master/blog/20210112/
Query 処理
まずは、単純な Query 処理を実装してみます。
MustParseSchema
に GraphQL スキーマ文字列(以下の schemaString
)と処理の実装(以下の resolver
)を与えて、Schema
を取得します。
Exec
に Context
、クエリ文字列、オペレーション名、クエリ用の変数を与えてクエリを実行します。
クエリの実行結果は Response
として返ってくるので json.Marshal
で JSON 文字列化して出力しています。
デフォルトで、GraphQL スキーマのフィールド(下記の id
や value
)の値は、該当するメソッドから取得するようになっています。※
※ 大文字・小文字は区別せず、 "_" を除いた名称が合致するメソッドを探している模様
sample1.go
package main import ( "context" "encoding/json" graphql "github.com/graph-gophers/graphql-go" ) const ( // GraphQL スキーマ定義 schemaString = ` type Item { id: ID! value: Int! } type Query { one: Item! } ` ) type item struct { id string value int32 } // GraphQL の id フィールドの値 func (i *item) ID() graphql.ID { return graphql.ID(i.id) } // GraphQL の value フィールドの値 func (i *item) Value() int32 { return i.value } type resolver struct{} // Query の one を実装 func (r *resolver) One() *item { return &item{"item-1", 12} } func main() { // GraphQL スキーマのパース schema := graphql.MustParseSchema(schemaString, new(resolver)) q := ` { one { id value } } ` // GraphQL クエリの実行 r := schema.Exec(context.Background(), q, "", nil) // JSON 文字列化 b, err := json.Marshal(r) if err != nil { panic(err) } println(string(b)) }
実行結果は以下の通りです。
ビルドと実行
> go build sample1.go > sample1 {"data":{"one":{"id":"item-1","value":12}}}
構造体のフィールドを使用
graph-gophers/graphql-go のソースコード internal/exec/resolvable/resolvable.go
を確認してみたところ、GraphQL フィールド値の取得先は以下のように探しているようでした。
- (1) 該当するメソッドを探す(findMethod の実施)
- (2) (1) で見つからず、UseFieldResolvers が適用されている場合は該当するフィールドを探す(findField の実施)
そこで、UseFieldResolvers
を適用し、item 構造体のフィールドから値を取得するようにしてみました。
sample2.go
・・・ type item struct { ID graphql.ID Value int32 } type resolver struct{} func (r *resolver) One() *item { return &item{graphql.ID("item-2"), 34} } func main() { // UseFieldResolvers 適用 schema := graphql.MustParseSchema(gqlSchema, new(resolver), graphql.UseFieldResolvers()) ・・・ }
実行結果は以下の通りです。
ビルドと実行
> go build sample2.go > sample1_field {"data":{"one":{"id":"item-2","value":34}}}
なお、このコードで UseFieldResolvers を適用しなかった場合、実行時に panic: *main.item does not resolve "Item": missing method for field "id"
となりました。
Mutation, Subscription 処理
最後に、GraphQL の enum や input を使って Mutation や Subscription を行う処理を実装してみました。
enum は string、input は 構造体で扱う事ができます。
Subscription は Subscribe
で実施し、その実装メソッドは受信用 channel(<-chan T
)を戻り値にします。
Exec の戻り値である Response の Data フィールドの型は json.RawMessage
となっているので、構造体や map へアンマーシャルする事が可能です。
sample3.go
package main import ( "context" "encoding/json" "log" "sync" "github.com/google/uuid" graphql "github.com/graph-gophers/graphql-go" ) const ( schemaString = ` enum Category { Standard Extra } input CreateItem { category: Category! value: Int! } type Item { id: ID! category: Category! value: Int! } type Mutation { create(input: CreateItem!): Item } type Query { find(id: ID!): Item } type Subscription { created: Item } ` ) // input type createItem struct { Category string Value int32 } type item struct { id string category string value int32 } func (i *item) ID() graphql.ID { return graphql.ID(i.id) } func (i *item) Category() string { return i.category } func (i *item) Value() int32 { return i.value } // item 管理 type store struct { sync.RWMutex items []*item } func (s *store) add(i *item) { s.Lock() defer s.Unlock() s.items = append(s.items, i) } func (s *store) get(id graphql.ID) *item { s.RLock() defer s.RUnlock() for _, i := range s.items { if i.ID() == id { return i } } return nil } // Subscription 用の channel 管理 type broker struct { sync.RWMutex subscribes []chan<- *item } func (b *broker) subscribe(ch chan<- *item) { log.Println("[INFO] subscribe") b.Lock() defer b.Unlock() b.subscribes = append(b.subscribes, ch) } func (b *broker) unsubscribe(ch chan<- *item) { log.Println("[INFO] unsubscribe") var tmp []chan<- *item b.Lock() defer b.Unlock() for _, s := range b.subscribes { if s != ch { tmp = append(tmp, s) } } b.subscribes = tmp } func (b *broker) publish(i *item) { b.RLock() defer b.RUnlock() for _, s := range b.subscribes { s <- i } } type resolver struct { store *store broker *broker } // Mutation の実装 func (r *resolver) Create(args struct{ Input createItem }) (*item, error) { id, err := uuid.NewRandom() if err != nil { return nil, err } i := item{id.String(), args.Input.Category, args.Input.Value} r.store.add(&i) go func() { r.broker.publish(&i) }() return &i, nil } // Query の実装 func (r *resolver) Find(args struct{ ID graphql.ID }) *item { return r.store.get(args.ID) } // Subscription の実装 func (r *resolver) Created(ctx context.Context) <-chan *item { ch := make(chan *item) r.broker.subscribe(ch) go func() { // Context キャンセル時 <-ctx.Done() log.Println("[INFO] context done") r.broker.unsubscribe(ch) close(ch) }() return ch } // Subscription の受信 func onCreated(ch <-chan interface{}) { for { r, ok := <-ch if !ok { log.Println("[INFO] closed channel") return } b, _ := json.Marshal(r) log.Println("[SUBSCRIPTION]", string(b)) } } func printResponse(r *graphql.Response) error { b, err := json.Marshal(r) if err != nil { return err } log.Println(string(b)) return nil } func main() { resolver := resolver{new(store), new(broker)} schema := graphql.MustParseSchema(schemaString, &resolver) ctx, cancel := context.WithCancel(context.Background()) s := ` subscription { created { id category value } } ` // Subscription の実施 ch, err := schema.Subscribe(ctx, s, "", nil) if err != nil { panic(err) } go onCreated(ch) m1 := ` mutation { create(input: { category: Standard, value: 10 }) { id } } ` mr1 := schema.Exec(context.Background(), m1, "", nil) _ = printResponse(mr1) var cr1 struct { Create struct { ID string } } // Mutation 結果の data の内容を構造体へアンマーシャル err = json.Unmarshal(mr1.Data, &cr1) if err != nil { panic(err) } q := ` query findItem($id: ID!) { find(id: $id) { id category value } } ` qr1 := schema.Exec(context.Background(), q, "", map[string]interface{}{"id": cr1.Create.ID}) _ = printResponse(qr1) m2 := ` mutation Create($p: CreateItem!) { create(input: $p) { id } } ` // GraphQL のクエリ用変数 vs := map[string]interface{}{ "p": map[string]interface{}{ "category": "Extra", "value": 123, }, } mr2 := schema.Exec(context.Background(), m2, "", vs) _ = printResponse(mr2) var cr2 map[string]map[string]interface{} // Mutation 結果の data の内容を map へアンマーシャル err = json.Unmarshal(mr2.Data, &cr2) if err != nil { panic(err) } qr2 := schema.Exec(context.Background(), q, "", cr2["create"]) _ = printResponse(qr2) // Subscription のキャンセル cancel() mr3 := schema.Exec(context.Background(), m2, "", map[string]interface{}{ "p": map[string]interface{}{ "category": "Extra", "value": 987, }, }) _ = printResponse(mr3) mr4 := schema.Exec(context.Background(), m2, "", map[string]interface{}{ "p": map[string]interface{}{ "category": "Standard", "value": 567, }, }) _ = printResponse(mr4) qr5 := schema.Exec(context.Background(), q, "", map[string]interface{}{"id": "invalid-id"}) _ = printResponse(qr5) }
ビルドと実行
> go build sample3.go > sample3 2021/01/11 21:03:40 [INFO] subscribe 2021/01/11 21:03:40 {"data":{"create":{"id":"507dae03-1f93-4b1a-a75e-3fc54b297ad4"}}} 2021/01/11 21:03:40 [SUBSCRIPTION] {"data":{"created":{"id":"507dae03-1f93-4b1a-a75e-3fc54b297ad4","category":"Standard","value":10}}} 2021/01/11 21:03:40 {"data":{"find":{"id":"507dae03-1f93-4b1a-a75e-3fc54b297ad4","category":"Standard","value":10}}} 2021/01/11 21:03:40 {"data":{"create":{"id":"b47bf46c-5c10-4a8f-892e-9ebfa83d576a"}}} 2021/01/11 21:03:40 [SUBSCRIPTION] {"data":{"created":{"id":"b47bf46c-5c10-4a8f-892e-9ebfa83d576a","category":"Extra","value":123}}} 2021/01/11 21:03:40 {"data":{"find":{"id":"b47bf46c-5c10-4a8f-892e-9ebfa83d576a","category":"Extra","value":123}}} 2021/01/11 21:03:40 [INFO] closed channel 2021/01/11 21:03:40 [INFO] context done 2021/01/11 21:03:40 [INFO] unsubscribe 2021/01/11 21:03:40 {"data":{"create":{"id":"aef942a6-3aa7-4b31-89c4-cd44f748bed6"}}} 2021/01/11 21:03:40 {"data":{"create":{"id":"fe3db2a4-5578-4d33-b54a-26a8b6e281f3"}}} 2021/01/11 21:03:40 {"data":{"find":null}}
Go言語でインターフェースのメソッドを列挙する
Go 言語のリフレクションで型情報を扱う Type
は以下で取得できます。
func TypeOf(i interface{}) Type
引数 i
には値を指定する事になりますが、インターフェースの型情報を取得したい場合はどうするのか気になって試してみたところ、以下で取得できました。
インターフェースの型情報を取得
TypeOf( (*対象インターフェース)(nil) ).Elem()
対象とするインターフェースのポインタ型の nil
を使って TypeOf
を実施した後、Elem
※ を使ってインターフェースの型情報を取得します。
※ Elem() で Array, Chan, Map, Ptr, Slice から 要素の型情報を取得できる
これを利用して、インターフェースのメソッドを列挙するとこのようになります。
sample.go
package main import ( "fmt" "reflect" ) type Counter interface { Up(v int) Down(v int) Current() int reset() end() int } // メソッド情報の出力 func printMethods(t reflect.Type) { for i := 0; i < t.NumMethod(); i++ { fmt.Println("method:", t.Method(i)) } } func main() { // Counter インターフェースの型情報を取得 t := reflect.TypeOf((*Counter)(nil)).Elem() printMethods(t) }
ビルドと実行
> go build sample.go > sample method: {Current func() int <invalid Value> 0} method: {Down func(int) <invalid Value> 1} method: {Up func(int) <invalid Value> 2} method: {end main func() int <invalid Value> 3} method: {reset main func() <invalid Value> 4}
ちなみに、上記コードで t := reflect.TypeOf((Counter)(nil))
のようにすると、t.NumMethod()
で panic: runtime error: invalid memory address or nil pointer dereference
となりました。
イベントベースで考える在庫管理モデル
従来のイベントソーシングのような手法だと、特定の State(というよりは Entity かも)を永続化するための手段として Event を用いるというような、あくまでも State 中心の発想になると思います。
そこで、ここでは下記のような Event 中心の発想に切り替えて、在庫管理(在庫数を把握するだけの単純なもの)を考えてみました。
State
は本質ではなく、Event
を解釈した結果にすぎない(解釈の仕方は様々)Event
を得たり、伝えたりするための手段としてState
を用いる
要するに、Event こそが重要で State(Entity とか)は取るに足らない存在だと(実験的に)考えてみたって事です。
従来のイベントソーシング | 本件 |
---|---|
State が目的、Event が手段 | Event が目的、State が手段 |
なお、ここでイメージしている Event は、特定のドメインに依存しないような本質的なものです。
在庫管理
本件の在庫管理は、以下を把握するだけの単純なものです。
- 何処に何の在庫(とりあえず現物のあるもの)がいくつあるか
実装コードは http://github.com/fits/try_samples/tree/master/blog/20201213/
1. 本質的なイベント
在庫管理で起こりそうなイベントを考えてみます。
在庫(数)は入庫と出庫の結果だと考えられるので、以下のようなイベントが考えられそうです。
- 入庫イベント
- 出庫イベント
また、シンプルに物が移動 ※ した結果が在庫なのだと考えると、(2地点間の)在庫の移動という形で抽象化できそうな気がします。
※ 概念的な移動も含める
そうすると、以下のようなイベントも考えられそうです。
- 在庫移動の開始イベント
- 在庫移動の終了(完了)イベント
ついでに、在庫の引当も区別して考えると ※、以下のようなイベントも考えられます。
- 引当イベント
※ 引当用の場所へ移動するという事にするのであれば区別しなくてもよさそう
まとめると、とりあえずは以下のようなイベントが考えられそうです。
- 在庫移動の開始イベント
- 在庫移動の完了イベント
- 在庫移動のキャンセルイベント
- 引当イベント
- 引当した場合の出庫イベント
- 引当しなかった場合の出庫イベント
- 入庫イベント
ついでに、引当や出庫などの成否をイベントとして明確に分けたい場合は、引当失敗イベント等の失敗イベントを別途設ければ良さそうな気がします。
2. イベント定義
これらのイベントを Rust と TypeScript で型定義してみました。
商品や在庫のロケーション(在庫の保管場所)の具体的な内容はどうでもよいので(ここで具体化する必要がない)、Generics の型変数で表現しておきます。
本質的に必要そうな最低限の情報のみを持たせ、余計な情報は取り除いておきます。※
※ 在庫移動を一意に限定する ID や日付のような メタデータ(と考えられるもの)に関しても除外しました
用語はとりあえず以下のようにしています。
- 引当: assign
- 出庫: ship
- 入庫: arrive
何(item
)をいくつ(qty
)、何処(from
)から何処(to
)へ移動する予定なのかという情報を持たせて在庫の移動を開始するようにしてみました。
入出庫等で予定とは異なる内容になっても不都合が生じないように、それぞれのイベントに必要な情報を持たせています。
また、全体的に ADT(代数的データ型)を意識した内容にしています。
Rust で型定義したイベント
models/events.rs
pub enum StockMoveEvent<Item, Location, Quantity> { // 開始 Started { item: Item, qty: Quantity, from: Location, to: Location, }, // 完了 Completed, // キャンセル Cancelled, // 引当 Assigned { item: Item, from: Location, assigned: Quantity, }, // 出庫 Shipped { item: Item, from: Location, outgoing: Quantity, }, // 引当に対する出庫 AssignShipped { item: Item, from: Location, outgoing: Quantity, assigned: Quantity, }, // 入庫 Arrived { item: Item, to: Location, incoming: Quantity, }, }
TypeScript で型定義したイベント
models/events.ts
export interface StockMoveEventStarted<Item, Location, Quantity> { tag: 'stock-move-event.started' item: Item qty: Quantity from: Location to: Location } export interface StockMoveEventCompleted { tag: 'stock-move-event.completed' } export interface StockMoveEventCancelled { tag: 'stock-move-event.cancelled' } export interface StockMoveEventAssigned<Item, Location, Quantity> { tag: 'stock-move-event.assigned' item: Item from: Location assigned: Quantity } export interface StockMoveEventShipped<Item, Location, Quantity> { tag: 'stock-move-event.shipped' item: Item from: Location outgoing: Quantity } export interface StockMoveEventAssignShipped<Item, Location, Quantity> { tag: 'stock-move-event.assign-shipped' item: Item from: Location outgoing: Quantity assigned: Quantity } export interface StockMoveEventArrived<Item, Location, Quantity> { tag: 'stock-move-event.arrived' item: Item to: Location incoming: Quantity } export type StockMoveEvent<Item, Location, Quantity> = StockMoveEventStarted<Item, Location, Quantity> | StockMoveEventCompleted | StockMoveEventCancelled | StockMoveEventAssigned<Item, Location, Quantity> | StockMoveEventShipped<Item, Location, Quantity> | StockMoveEventAssignShipped<Item, Location, Quantity> | StockMoveEventArrived<Item, Location, Quantity>
3. 在庫移動処理サンプル
上記で定義したイベントを以下のような(在庫移動の)ステートマシンで扱ってみます。※
※ 本件の考え方では、 (本質的な)イベントは特定の処理やルールになるべく依存していない事が重要なので、 このステートマシン(イベントを扱う手段の一つでしかない)に対して 特化しないように注意します
- 在庫移動の状態遷移の基本パターンは 3通り
- (a) 引当 -> 出庫 -> 入庫
- (b) 出庫 -> 入庫
- (c) 入庫
- 入庫の失敗状態は無し(0個の入庫で代用)
(c) は出庫側の状況が不明なケースで入庫の記録だけを残すような用途を想定しています。
3.1 ステートマシンの実装
Rust と TypeScript でそれぞれ実装してみます。
この辺のレイヤーまでは、外界の都合(※1)から隔離しておきたいと考え、関数言語的な作りにしています。
イベントと同様に在庫移動や在庫を ADT(代数的データ型) で表現し、下記のような関数(+ ユーティリティ関数)を提供するだけの作りにしてみました。(※2)
(※1)フレームワーク、UI、永続化、非同期処理、排他制御やその他諸々の都合 (※2)こうしておくと、WebAssembly 等でコンポーネント化して 再利用するなんて事も実現し易くなるかもしれませんし
- (1) 初期状態を返す関数
- (2) 現在の状態とアクションから次の状態とそれに伴って発生したイベントを返す関数(イメージとしては
State -> Action -> Container<(State, Event)>
) - (3) ある時点の状態とそれ以降に起きたイベントの内容から任意の状態を復元して返す関数
なお、ここでは (2) のアクションに相当する部分は関数(と引数の一部)として実装しています。
また、(2) で状態遷移が発生しなかった場合に undefined
を返すように実装していますが、実際は成功時と失敗時の両方を扱うようなコンテナ型(Rust の Result や Either モナドとか)で包むのが望ましいと思います。
ついでに、実装に際して以下のようなルールを加えています。
- 引当、入庫、出庫のロケーションは開始時に予定したものを使用
- 引当時にのみ在庫数を確認(残りの在庫をチェック)
- 在庫のタイプは 2種類
- 在庫数を管理するタイプ(引当分の在庫が余っている場合にのみ引当が成功、在庫数は入庫イベントと出庫イベントから算出)
- 在庫数を管理しないタイプ(引当は常に成功、在庫数は管理せず実質的に無限)
- 引当数や出庫数が 0 の場合は(引当や出庫の)失敗として扱う
引当はこの処理内における単なる数値上の予約であり、入出庫は実際の作業の結果を反映するような用途をとりあえず想定しています。
そのため、数値上の引当に成功しても実際の出庫が成功するとは限らず、数値上の在庫数以上の出庫が発生するようなケースも考えられるので、この処理ではそれらを許容するようにしています。※
※ 在庫の整合性等をどのように制御・調整するかは 使う側(外側のレイヤー)に任せる
Rust による実装
ここで、商品(以下の ItemCode)や在庫ロケーション(以下の LocationCode)の具体的な型を決めていますが、これより外側のレイヤーに具体型を決めさせるようにした方が望ましいかもしれません。
models/stockmove.rs
use std::slice; use super::events::StockMoveEvent; // 商品を識別するための型 pub type ItemCode = String; // 在庫ロケーションを識別するための型 pub type LocationCode = String; pub type Quantity = u32; pub trait Event<S> { type Output; fn apply_to(&self, state: S) -> Self::Output; } pub trait Restore<E> { fn restore(self, events: slice::Iter<E>) -> Self; } // 在庫の型定義 #[allow(dead_code)] #[derive(Debug, Clone)] pub enum Stock { Unmanaged { item: ItemCode, location: LocationCode }, Managed { item: ItemCode, location: LocationCode, qty: Quantity, assigned: Quantity }, } // 在庫に関する処理 #[allow(dead_code)] impl Stock { pub fn unmanaged_new(item: ItemCode, location: LocationCode) -> Self { Self::Unmanaged { item, location } } pub fn managed_new(item: ItemCode, location: LocationCode) -> Self { Self::Managed { item, location, qty: 0, assigned: 0 } } pub fn eq_id(&self, item: &ItemCode, location: &LocationCode) -> bool { match self { Self::Managed { item: it, location: loc, .. } | Self::Unmanaged { item: it, location: loc } => it == item && loc == location } } // 在庫数のチェック pub fn is_sufficient(&self, v: Quantity) -> bool { match self { Self::Managed { qty, assigned, .. } => v + assigned <= *qty, Self::Unmanaged { .. } => true, } } fn update(&self, qty: Quantity, assigned: Quantity) -> Self { match self { Self::Managed { item, location, .. } => { Self::Managed { item: item.clone(), location: location.clone(), qty, assigned, } }, Self::Unmanaged { .. } => self.clone(), } } fn update_qty(&self, qty: Quantity) -> Self { match self { Self::Managed { assigned, .. } => self.update(qty, *assigned), Self::Unmanaged { .. } => self.clone(), } } fn update_assigned(&self, assigned: Quantity) -> Self { match self { Self::Managed { qty, .. } => self.update(*qty, assigned), Self::Unmanaged { .. } => self.clone(), } } } // 在庫に対するイベントの適用 impl Event<Stock> for MoveEvent { type Output = Stock; fn apply_to(&self, state: Stock) -> Self::Output { match &state { Stock::Unmanaged { .. } => state, Stock::Managed { item: s_item, location: s_loc, qty: s_qty, assigned: s_assigned } => { match self { Self::Assigned { item, from, assigned } if s_item == item && s_loc == from => { state.update_assigned( s_assigned + assigned ) }, Self::Shipped { item, from, outgoing } if s_item == item && s_loc == from => { state.update_qty( s_qty.checked_sub(*outgoing).unwrap_or(0) ) }, Self::AssignShipped { item, from, outgoing, assigned } if s_item == item && s_loc == from => { state.update( s_qty.checked_sub(*outgoing).unwrap_or(0), s_assigned.checked_sub(*assigned).unwrap_or(0), ) }, Self::Arrived { item, to, incoming } if s_item == item && s_loc == to => { state.update_qty( s_qty + incoming ) }, _ => state, } }, } } } #[derive(Debug, Default, Clone, PartialEq)] pub struct StockMoveInfo { item: ItemCode, qty: Quantity, from: LocationCode, to: LocationCode, } // 在庫移動の型(状態)定義 #[allow(dead_code)] #[derive(Debug, Clone, PartialEq)] pub enum StockMove { Nothing, Draft { info: StockMoveInfo }, Completed { info: StockMoveInfo, outgoing: Quantity, incoming: Quantity }, Cancelled { info: StockMoveInfo }, Assigned { info: StockMoveInfo, assigned: Quantity }, Shipped { info: StockMoveInfo, outgoing: Quantity }, Arrived { info: StockMoveInfo, outgoing: Quantity, incoming: Quantity }, AssignFailed { info: StockMoveInfo }, ShipmentFailed { info: StockMoveInfo }, } type MoveEvent = StockMoveEvent<ItemCode, LocationCode, Quantity>; type MoveResult = Option<(StockMove, MoveEvent)>; // 在庫移動に関する処理 #[allow(dead_code)] impl StockMove { // 初期状態の取得 pub fn initial_state() -> Self { Self::Nothing } // 開始 pub fn start(&self, item: ItemCode, qty: Quantity, from: LocationCode, to: LocationCode) -> MoveResult { if qty < 1 { return None } let event = StockMoveEvent::Started { item: item.clone(), qty: qty, from: from.clone(), to: to.clone() }; self.apply_event(event) } // 引当 pub fn assign(&self, stock: &Stock) -> MoveResult { if let Some(info) = self.info() { if stock.eq_id(&info.item, &info.from) { let assigned = if stock.is_sufficient(info.qty) { info.qty } else { 0 }; return self.apply_event( StockMoveEvent::Assigned { item: info.item.clone(), from: info.from.clone(), assigned, } ) } } None } // 出庫 pub fn ship(&self, outgoing: Quantity) -> MoveResult { let ev = match self { Self::Assigned { info, assigned } => { Some(StockMoveEvent::AssignShipped { item: info.item.clone(), from: info.from.clone(), outgoing, assigned: assigned.clone(), }) }, _ => { self.info() .map(|i| StockMoveEvent::Shipped { item: i.item.clone(), from: i.from.clone(), outgoing, } ) }, }; ev.and_then(|e| self.apply_event(e)) } // 入庫 pub fn arrive(&self, incoming: Quantity) -> MoveResult { self.info() .and_then(|i| self.apply_event(StockMoveEvent::Arrived { item: i.item.clone(), to: i.to.clone(), incoming, }) ) } pub fn complete(&self) -> MoveResult { self.apply_event(StockMoveEvent::Completed) } pub fn cancel(&self) -> MoveResult { self.apply_event(StockMoveEvent::Cancelled) } fn info(&self) -> Option<StockMoveInfo> { match self { Self::Draft { info } | Self::Completed { info, .. } | ・・・ Self::Arrived { info, .. } => { Some(info.clone()) }, Self::Nothing => None, } } fn apply_event(&self, event: MoveEvent) -> MoveResult { let new_state = event.apply_to(self.clone()); Some((new_state, event)) .filter(|r| r.0 != *self) } } // 在庫移動に対するイベントの適用 impl Event<StockMove> for MoveEvent { type Output = StockMove; fn apply_to(&self, state: StockMove) -> Self::Output { match self { Self::Started { item, qty, from, to } => { if state == StockMove::Nothing { StockMove::Draft { info: StockMoveInfo { item: item.clone(), qty: qty.clone(), from: from.clone(), to: to.clone(), } } } else { state } }, Self::Completed => { if let StockMove::Arrived { info, outgoing, incoming } = state { StockMove::Completed { info: info.clone(), outgoing, incoming } } else { state } }, Self::Cancelled => { if let StockMove::Draft { info } = state { StockMove::Cancelled { info: info.clone() } } else { state } }, Self::Assigned { item, from, assigned } => { match state { StockMove::Draft { info } if info.item == *item && info.from == *from => { if *assigned > 0 { StockMove::Assigned { info: info.clone(), assigned: assigned.clone(), } } else { StockMove::AssignFailed { info: info.clone() } } }, _ => state, } }, Self::Shipped { item, from, outgoing } => { match state { StockMove::Draft { info } if info.item == *item && info.from == *from => { if *outgoing > 0 { StockMove::Shipped { info: info.clone(), outgoing: outgoing.clone(), } } else { StockMove::ShipmentFailed { info: info.clone() } } }, _ => state, } }, Self::AssignShipped { item, from, outgoing, .. } => { match state { StockMove::Assigned { info, .. } if info.item == *item && info.from == *from => { if *outgoing > 0 { StockMove::Shipped { info: info.clone(), outgoing: outgoing.clone(), } } else { StockMove::ShipmentFailed { info: info.clone() } } }, _ => state, } }, Self::Arrived { item, to, incoming } => { match state { StockMove::Draft { info } if info.item == *item && info.to == *to => { StockMove::Arrived { info: info.clone(), outgoing: 0, incoming: *incoming, } }, StockMove::Shipped { info, outgoing } if info.item == *item && info.to == *to => { StockMove::Arrived { info: info.clone(), outgoing, incoming: *incoming, } }, _ => state, } }, } } } // 在庫や在庫移動の状態復元 impl<S, E> Restore<&E> for S where Self: Clone, E: Event<Self, Output = Self>, { fn restore(self, events: slice::Iter<&E>) -> Self { events.fold(self, |acc, ev| ev.apply_to(acc.clone())) } }
TypeScript による実装
実装の仕方が多少違っていますが、Rust 版の処理内容と概ね同じ(にしたつもり)です。
models/stockmove.ts
import { StockMoveEvent, StockMoveEventShipped, StockMoveEventAssignShipped } from './events' export type ItemCode = string export type LocationCode = string export type Quantity = number export type MoveEvent = StockMoveEvent<ItemCode, LocationCode, Quantity> type ShippedMoveEvent = StockMoveEventShipped<ItemCode, LocationCode, Quantity> type AssignShippedMoveEvent = StockMoveEventAssignShipped<ItemCode, LocationCode, Quantity> interface StockUnmanaged { tag: 'stock.unmanaged' item: ItemCode location: LocationCode } interface StockManaged { tag: 'stock.managed' item: ItemCode location: LocationCode qty: Quantity assigned: Quantity } // 在庫の型定義 export type Stock = StockUnmanaged | StockManaged // 在庫に関する処理 export class StockAction { static newUnmanaged(item: ItemCode, location: LocationCode): Stock { return { tag: 'stock.unmanaged', item, location } } static newManaged(item: ItemCode, location: LocationCode): Stock { return { tag: 'stock.managed', item, location, qty: 0, assigned: 0 } } // 在庫数のチェック static isSufficient(stock: Stock, qty: Quantity): boolean { switch (stock.tag) { case 'stock.unmanaged': return true case 'stock.managed': return qty + Math.max(0, stock.assigned) <= Math.max(0, stock.qty) } } } // 在庫の復元処理 export class StockRestore { static restore(state: Stock, events: MoveEvent[]): Stock { return events.reduce(StockRestore.applyTo, state) } // 在庫に対するイベントの適用 private static applyTo(state: Stock, event: MoveEvent): Stock { if (state.tag == 'stock.managed') { switch (event.tag) { case 'stock-move-event.assigned': if (state.item == event.item && state.location == event.from) { return StockRestore.updateAssigned( state, state.assigned + event.assigned ) } break case 'stock-move-event.assign-shipped': if (state.item == event.item && state.location == event.from) { return StockRestore.updateStock( state, state.qty - event.outgoing, state.assigned - event.assigned ) } break ・・・ } } return state } private static updateStock(stock: Stock, qty: Quantity, assigned: Quantity): Stock { switch (stock.tag) { case 'stock.unmanaged': return stock case 'stock.managed': return { tag: stock.tag, item: stock.item, location: stock.location, qty, assigned } } } ・・・ } interface StockMoveInfo { item: ItemCode qty: Quantity from: LocationCode to: LocationCode } interface StockMoveNothing { tag: 'stock-move.nothing' } interface StockMoveDraft { tag: 'stock-move.draft' info: StockMoveInfo } ・・・ // 在庫移動の型(状態)定義 export type StockMove = StockMoveNothing | StockMoveDraft | StockMoveCompleted | StockMoveCancelled | StockMoveAssigned | StockMoveShipped | StockMoveArrived | StockMoveAssignFailed | StockMoveShipmentFailed export type StockMoveResult = [StockMove, MoveEvent] | undefined // 在庫移動に関する処理 export class StockMoveAction { // 初期状態を取得 static initialState(): StockMove { return { tag: 'stock-move.nothing' } } // 開始 static start(state: StockMove, item: ItemCode, qty: Quantity, from: LocationCode, to: LocationCode): StockMoveResult { if (qty < 1) { return undefined } const event: MoveEvent = { tag: 'stock-move-event.started', item, qty, from, to } return StockMoveAction.applyTo(state, event) } // 引当 static assign(state: StockMove, stock: Stock): StockMoveResult { const info = StockMoveAction.info(state) if (info && info.item == stock.item && info.from == stock.location) { const assigned = (stock && StockAction.isSufficient(stock, info.qty)) ? info.qty : 0 const event: MoveEvent = { tag: 'stock-move-event.assigned', item: info.item, from: info.from, assigned } return StockMoveAction.applyTo(state, event) } return undefined } // 出庫 static ship(state: StockMove, outgoing: Quantity): StockMoveResult { if (outgoing < 0) { return undefined } const event = StockMoveAction.toShippedEvent(state, outgoing) return event ? StockMoveAction.applyTo(state, event) : undefined } // 入庫 static arrive(state: StockMove, incoming: Quantity): StockMoveResult { if (incoming < 0) { return undefined } const info = StockMoveAction.info(state) if (info) { const event: MoveEvent = { tag: 'stock-move-event.arrived', item: info.item, to: info.to, incoming } return StockMoveAction.applyTo(state, event) } return undefined } ・・・ static info(state: StockMove) { if (state.tag != 'stock-move.nothing') { return state.info } return undefined } private static applyTo(state: StockMove, event: MoveEvent): StockMoveResult { const nextState = StockMoveRestore.restore(state, [event]) return (nextState != state) ? [nextState, event] : undefined } private static toShippedEvent(state: StockMove, outgoing: number): MoveEvent | undefined { const info = StockMoveAction.info(state) if (info) { if (state.tag == 'stock-move.assigned') { return { tag: 'stock-move-event.assign-shipped', item: info.item, from: info.from, assigned: state.assigned, outgoing } } else { return { tag: 'stock-move-event.shipped', item: info.item, from: info.from, outgoing } } } return undefined } } // 在庫移動の復元処理 export class StockMoveRestore { static restore(state: StockMove, events: MoveEvent[]): StockMove { return events.reduce(StockMoveRestore.applyTo, state) } // 在庫移動に対するイベントの適用 private static applyTo(state: StockMove, event: MoveEvent): StockMove { switch (state.tag) { case 'stock-move.nothing': if (event.tag == 'stock-move-event.started') { return { tag: 'stock-move.draft', info: { item: event.item, qty: event.qty, from: event.from, to: event.to } } } break case 'stock-move.draft': return StockMoveRestore.applyEventToDraft(state, event) case 'stock-move.assigned': if (event.tag == 'stock-move-event.assign-shipped') { return StockMoveRestore.applyShipped(state, event) } break case 'stock-move.shipped': if (event.tag == 'stock-move-event.arrived' && state.info.item == event.item && state.info.to == event.to) { return { tag: 'stock-move.arrived', info: state.info, outgoing: state.outgoing, incoming: event.incoming } } break case 'stock-move.arrived': if (event.tag == 'stock-move-event.completed') { return { tag: 'stock-move.completed', info: state.info, outgoing: state.outgoing, incoming: state.incoming } } break case 'stock-move.completed': case 'stock-move.cancelled': case 'stock-move.assign-failed': case 'stock-move.shipment-failed': break } return state } private static applyShipped(state: StockMoveDraft | StockMoveAssigned, event: ShippedMoveEvent | AssignShippedMoveEvent): StockMove { if (state.info.item == event.item && state.info.from == event.from) { if (event.outgoing > 0) { return { tag: 'stock-move.shipped', info: state.info, outgoing: event.outgoing } } else { return { tag: 'stock-move.shipment-failed', info: state.info } } } return state } private static applyEventToDraft(state: StockMoveDraft, event: MoveEvent): StockMove { switch (event.tag) { case 'stock-move-event.cancelled': return { tag: 'stock-move.cancelled', info: state.info } case 'stock-move-event.assigned': if (state.info.item == event.item && state.info.from == event.from) { if (event.assigned > 0) { return { tag: 'stock-move.assigned', info: state.info, assigned: event.assigned } } else { return { tag: 'stock-move.assign-failed', info: state.info } } } break case 'stock-move-event.shipped': return StockMoveRestore.applyShipped(state, event) case 'stock-move-event.arrived': if (state.info.item == event.item && state.info.to == event.to) { return { tag: 'stock-move.arrived', info: state.info, outgoing: 0, incoming: Math.max(event.incoming, 0) } } break } return state } }
3.2 GraphQL 化 + MongoDB へ永続化
ついでに、前述のステートマシン(TypeScript 実装版)を Apollo Server で GraphQL 化し、MongoDB へ永続化するようにしてみました。
index.ts
import { ApolloServer, gql } from 'apollo-server' import { v4 as uuidv4 } from 'uuid' import { MongoClient, Collection } from 'mongodb' import { ItemCode, LocationCode, MoveEvent, StockMoveAction, StockMoveRestore, StockMove, StockMoveResult, StockAction, StockRestore, Stock } from './models' const mongoUrl = 'mongodb://localhost' const dbName = 'stockmoves' const colName = 'events' const stocksColName = 'stocks' type MoveId = string type Revision = number // MongoDB へ保存するイベント内容 interface StoredEvent { move_id: MoveId revision: Revision item: ItemCode from: LocationCode to: LocationCode event: MoveEvent } interface RestoredStockMove { state: StockMove revision: Revision } // MongoDB への永続化処理 class Store { ・・・ async loadStock(item: ItemCode, location: LocationCode): Promise<Stock | undefined> { const id = this.stockId(item, location) const stock = await this.stocksCol.findOne({ _id: id }) if (!stock) { return undefined } const query = { '$and': [ { item }, { '$or': [ { from: location }, { to: location } ]} ] } const events = await this.eventsCol .find(query) .map(r => r.event) .toArray() return StockRestore.restore(stock, events) } async saveStock(stock: Stock): Promise<void> { const id = this.stockId(stock.item, stock.location) const res = await this.stocksCol.updateOne( { _id: id }, { '$setOnInsert': stock }, { upsert: true } ) if (res.upsertedCount == 0) { return Promise.reject('conflict stock') } } async loadMove(moveId: MoveId): Promise<RestoredStockMove | undefined> { const events: StoredEvent[] = await this.eventsCol .find({ move_id: moveId }) .sort({ revision: 1 }) .toArray() const state = StockMoveAction.initialState() const revision = events.reduce((acc, e) => Math.max(acc, e.revision), 0) const res = StockMoveRestore.restore(state, events.map(e => e.event)) return (res == state) ? undefined : { state: res, revision } } async saveEvent(event: StoredEvent): Promise<void> { const res = await this.eventsCol.updateOne( { move_id: event.move_id, revision: event.revision }, { '$setOnInsert': event }, { upsert: true } ) if (res.upsertedCount == 0) { return Promise.reject(`conflict event revision=${event.revision}`) } } private stockId(item: ItemCode, location: LocationCode): string { return `${item}/${location}` } } // GraphQL スキーマ定義 const typeDefs = gql(` type StockMoveInfo { item: ID! qty: Int! from: ID! to: ID! } interface StockMove { id: ID! info: StockMoveInfo! } type DraftStockMove implements StockMove { id: ID! info: StockMoveInfo! } type CompletedStockMove implements StockMove { id: ID! info: StockMoveInfo! outgoing: Int! incoming: Int! } ・・・ interface Stock { item: ID! location: ID! } type UnmanagedStock implements Stock { item: ID! location: ID! } type ManagedStock implements Stock { item: ID! location: ID! qty: Int! assigned: Int! } input CreateStockInput { item: ID! location: ID! } input StartMoveInput { item: ID! qty: Int! from: ID! to: ID! } type Query { findStock(item: ID!, location: ID!): Stock findMove(id: ID!): StockMove } type Mutation { createManaged(input: CreateStockInput!): ManagedStock createUnmanaged(input: CreateStockInput!): UnmanagedStock start(input: StartMoveInput!): StockMove assign(id: ID!): StockMove ship(id: ID!, outgoing: Int!): StockMove arrive(id: ID!, incoming: Int!): StockMove complete(id: ID!): StockMove cancel(id: ID!): StockMove } `) const toStockMoveForGql = (id: MoveId, state: StockMove | undefined) => { if (state) { return { id, ...state } } return undefined } type MoveAction = (state: StockMove) => StockMoveResult const doMoveAction = async (store: Store, rs: RestoredStockMove | undefined, id: MoveId, action: MoveAction) => { if (rs) { const res = action(rs.state) if (res) { const [mv, ev] = res const info = StockMoveAction.info(mv) if (info) { const event = { move_id: id, revision: rs.revision + 1, item: info.item, from: info.from, to: info.to, event: ev } await store.saveEvent(event) return toStockMoveForGql(id, mv) } } } return undefined } // GraphQL 処理の実装 const resolvers = { Stock: { __resolveType: (obj, ctx, info) => { if (obj.tag == 'stock.managed') { return 'ManagedStock' } return 'UnmanagedStock' } }, StockMove: { __resolveType: (obj: StockMove, ctx, info) => { switch (obj.tag) { case 'stock-move.draft': return 'DraftStockMove' case 'stock-move.completed': return 'CompletedStockMove' ・・・ case 'stock-move.shipment-failed': return 'ShipmentFailedStockMove' } return undefined } }, Query: { findStock: async (parent, { item, location }, { store }, info) => { return store.loadStock(item, location) }, findMove: async (parent, { id }, { store }, info) => { const res = await store.loadMove(id) return toStockMoveForGql(id, res?.state) } }, Mutation: { createManaged: async (parent, { input: { item, location } }, { store }, info) => { const s = StockAction.newManaged(item, location) await store.saveStock(s) return s }, ・・・ start: async (parent, { input: { item, qty, from, to } }, { store }, info) => { const rs = { state: StockMoveAction.initialState(), revision: 0 } const id = `move-${uuidv4()}` return doMoveAction( store, rs, id, s => StockMoveAction.start(s, item, qty, from, to) ) }, assign: async(parent, { id }, { store }, info) => { const rs = await store.loadMove(id) if (rs) { const info = StockMoveAction.info(rs.state) if (info) { const stock = await store.loadStock(info.item, info.from) return doMoveAction( store, rs, id, s => StockMoveAction.assign(s, stock) ) } } return undefined }, ship: async(parent, { id, outgoing }, { store }, info) => { const rs = await store.loadMove(id) return doMoveAction( store, rs, id, s => StockMoveAction.ship(s, outgoing) ) }, ・・・ } } const run = async () => { const mongo = await MongoClient.connect(mongoUrl, { useUnifiedTopology: true }) const eventsCol = mongo.db(dbName).collection(colName) const stocksCol = mongo.db(dbName).collection(stocksColName) const store = new Store(eventsCol, stocksCol) const server = new ApolloServer({ typeDefs, resolvers, context: { store } }) const res = await server.listen() console.log(res.url) } run().catch(err => console.error(err))
クライアント実装例
以下のように GraphQL クエリを送信する事で操作できます。
client/create_stock.ts (在庫の作成)
import { request, gql } from 'graphql-request' const endpoint = 'http://localhost:4000' const item = process.argv[2] const location = process.argv[3] const q1 = gql` mutation CreateUnmanaged($item: ID!, $location: ID!) { createUnmanaged(input: { item: $item, location: $location }) { __typename item location } } ` const q2 = gql` mutation CreateManaged($item: ID!, $location: ID!) { createManaged(input: { item: $item, location: $location }) { __typename item location } } ` const query = process.argv.length > 4 ? q1 : q2 request(endpoint, query, { item, location }) .then(r => console.log(r)) .catch(err => console.error(err))
create_stock.ts 実行例
> ts-node create_stock.ts item-1 store-A { createManaged: { __typename: 'ManagedStock', item: 'item-1', location: 'store-A' } }
client/start_move.ts (在庫移動の開始)
・・・ const item = process.argv[2] const qty = parseInt(process.argv[3]) const from = process.argv[4] const to = process.argv[5] const query = gql` mutation { start(input: { item: "${item}", qty: ${qty}, from: "${from}", to: "${to}" }) { __typename id info { item qty from to } } } ` request(endpoint, query) .then(r => console.log(r)) .catch(err => console.error(err))
start_move.ts 実行例
> ts-node start_move.ts item-1 5 store-A store-B { start: { __typename: 'DraftStockMove', id: 'move-cfa1fc9c-b599-4854-8385-207cbb77e8a3', info: { item: 'item-1', qty: 5, from: 'store-A', to: 'store-B' } } }
client/find_move.ts (在庫移動の取得)
・・・ const id = process.argv[2] const query = gql` { findMove(id: "${id}") { __typename id info { item qty from to } ... on AssignedStockMove { assigned } ... on ShippedStockMove { outgoing } ... on ArrivedStockMove { outgoing incoming } ... on CompletedStockMove { outgoing incoming } } } ` request(endpoint, query) .then(r => console.log(r)) .catch(err => console.error(err))
find_move.ts 実行例
> ts-node find_move.ts move-cfa1fc9c-b599-4854-8385-207cbb77e8a3 { findMove: { __typename: 'CompletedStockMove', id: 'move-cfa1fc9c-b599-4854-8385-207cbb77e8a3', info: { item: 'item-1', qty: 5, from: 'store-A', to: 'store-B' }, outgoing: 5, incoming: 5 } }
Docker の containerd ランタイムからイベントを受信
Docker の containerd ランタイムからイベントを受信する処理を Go 言語で実装してみました。
今回のコードは http://github.com/fits/try_samples/tree/master/blog/20201206/
はじめに
ここでは、snap でインストールした Docker の containerd へ接続します。
バージョンは以下の通りです。
$ docker version ・・・ Server: Engine: Version: 19.03.11 API version: 1.40 (minimum version 1.12) Go version: go1.13.12 Git commit: 77e06fd Built: Mon Jun 8 20:24:59 2020 OS/Arch: linux/amd64 Experimental: false containerd: Version: v1.2.13 GitCommit: 7ad184331fa3e55e52b890ea95e65ba581ae3429 ・・・
containerd へ接続するために containerd.sock
を使いますが、この環境では以下のパスで参照できるようになっていました。
/var/snap/docker/current/run/docker/containerd/containerd.sock
更に、ネームスペースは docker ではなく moby
となっていました。
コンテナのリストアップ
まずは、ネームスペースの確認も兼ねて containerd で実行中のコンテナをリストアップしてみます。
listup.go
package main import ( "context" "log" "github.com/containerd/containerd" "github.com/containerd/containerd/namespaces" ) const ( address = "/var/snap/docker/current/run/docker/containerd/containerd.sock" ) func main() { client, err := containerd.New(address) if err != nil { log.Fatal(err) } defer client.Close() ctx := context.Background() ns, err := client.NamespaceService().List(ctx) if err != nil { log.Printf("ERROR %v", err) return } log.Printf("namespaces: %#v", ns) // ネームスペースの設定 ctx = namespaces.WithNamespace(ctx, ns[0]) cs, err := client.Containers(ctx) if err != nil { log.Printf("ERROR %v", err) return } log.Printf("containers: %#v", cs) }
実行
ビルド
$ go build listup.go
Docker で何も実行していない状態で実行した結果です。
実行結果1
$ sudo ./listup 2020/12/06 04:16:00 namespaces: []string{"moby"} 2020/12/06 04:16:00 containers: []containerd.Container(nil)
docker run した後の実行結果は次のようになりました。
docker run 実行
$ docker run --rm -it alpine /bin/sh / #
実行結果2
$ sudo ./listup 2020/12/06 04:20:34 namespaces: []string{"moby"} 2020/12/06 04:20:34 containers: []containerd.Container{(*containerd.container)(0xc00017e180)}
Subscribe 処理1
Subscribe
メソッドを呼び出して containerd からのイベントを受信してみます。
Subscribe は以下のようなシグネチャとなっています。
https://github.com/containerd/containerd/blob/master/client.go
func (c *Client) Subscribe(ctx context.Context, filters ...string) (ch <-chan *events.Envelope, errs <-chan error) { ・・・ }
events.Envelope
はこのようになっています。
https://github.com/containerd/containerd/blob/master/events/events.go
type Envelope struct { Timestamp time.Time Namespace string Topic string Event *types.Any }
イベントの内容は Event
フィールドに格納されており、このようになっています。
https://github.com/gogo/protobuf/blob/master/types/any.pb.go
type Any struct { TypeUrl string `protobuf:"bytes,1,opt,name=type_url,json=typeUrl,proto3" json:"type_url,omitempty"` Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` }
TypeUrl
に型名、Value
に Protocol Buffers でシリアライズした結果(バイナリデータ)が設定されており、イベントの内容を参照するには typeurl.UnmarshalAny
を使ってデシリアライズする必要がありました。
なお、UnmarshalAny でイベントの内容を復元するには github.com/containerd/containerd/api/events
を import する必要がありました。※
※ import しておかないと type with url containerd.TaskExit: not found のようなエラーが返される事になります
また、listup.go では Context へネームスペースを設定しましたが、ここでは containerd.WithDefaultNamespace
を使ってデフォルトのネームスペースを containerd.New
時に指定するようにしています。
subscribe1.go
package main import ( "context" "log" "github.com/containerd/containerd" "github.com/containerd/containerd/events" "github.com/containerd/typeurl" // UnmarshalAny でイベントの内容を復元するために以下の import が必要 _ "github.com/containerd/containerd/api/events" ) const ( address = "/var/snap/docker/current/run/docker/containerd/containerd.sock" namespace = "moby" ) func printEnvelope(env *events.Envelope) { // イベント内容のデシリアライズ event, err := typeurl.UnmarshalAny(env.Event) if err != nil { log.Printf("ERROR unmarshal %v", err) } log.Printf( "topic = %s, namespace = %s, event.typeurl = %s, event = %v", env.Topic, env.Namespace, env.Event.TypeUrl, event, ) } func main() { client, err := containerd.New( address, containerd.WithDefaultNamespace(namespace), // デフォルトのネームスペースを指定 ) if err != nil { log.Fatal(err) } defer client.Close() ctx := context.Background() // Subscribe の実施 ch, errs := client.Subscribe(ctx) for { select { case env := <-ch: printEnvelope(env) case e := <-errs: // log.Fatal は defer を実行せずに終了してしまう log.Fatal(e) } } }
実行
ビルドと実行
$ go build subscribe1.go $ sudo ./subscribe1
docker run を実行した後、すぐに終了してみます。
docker run 実行と終了
$ docker run --rm -it alpine /bin/sh / # exit
出力結果は以下のようになりました。
subscribe1 出力結果
$ sudo ./subscribe1 2020/12/06 04:33:18 topic = /containers/create, namespace = moby, event.typeurl = containerd.events.ContainerCreate, event = &ContainerCreate{ID:c7bed・・・,Image:,Runtime:&ContainerCreate_Runtime{Name:io.containerd.runtime.v1.linux,Options:&types.Any{TypeUrl:containerd.linux.runc.RuncOptions,Value:[10 4 114 ・・・ 99],XXX_unrecognized:[],},XXX_unrecognized:[],},XXX_unrecognized:[],} 2020/12/06 04:33:18 topic = /tasks/create, namespace = moby, event.typeurl = containerd.events.TaskCreate, event = &TaskCreate{ContainerID:c7bed・・・,Bundle:/var/snap/docker/471/run/docker/containerd/daemon/io.containerd.runtime.v1.linux/moby/c7bed・・・,Rootfs:[]*Mount{},IO:&TaskIO{・・・,},Checkpoint:,Pid:5926,XXX_unrecognized:[],} 2020/12/06 04:33:18 topic = /tasks/start, namespace = moby, event.typeurl = containerd.events.TaskStart, event = &TaskStart{ContainerID:c7bed・・・,Pid:5926,XXX_unrecognized:[],} 2020/12/06 04:33:24 topic = /tasks/exit, namespace = moby, event.typeurl = containerd.events.TaskExit, event = &TaskExit{ContainerID:c7bed・・・,Pid:5926,ExitStatus:0,ExitedAt:2020-12-05 19:33:24.325472986 +0000 UTC,XXX_unrecognized:[],} 2020/12/06 04:33:24 topic = /tasks/delete, namespace = moby, event.typeurl = containerd.events.TaskDelete, event = &TaskDelete{ContainerID:c7bed・・・,Pid:5926,ExitStatus:0,ExitedAt:2020-12-05 19:33:24.325472986 +0000 UTC,ID:,XXX_unrecognized:[],} 2020/12/06 04:33:25 topic = /containers/delete, namespace = moby, event.typeurl = containerd.events.ContainerDelete, event = &ContainerDelete{ID:c7bed・・・,XXX_unrecognized:[],}
Subscribe 処理2
switch case を使ってイベントの型毎に出力内容を変更するようにしてみます。
subscribe1.go は kill したり Subscribe でエラーが発生した場合に defer で指定した client.Close()
を実行しないので、ここでは Context のキャンセル機能を使って client.Close()
を呼び出して終了するようにしてみました。
subscribe2.go
package main import ( "context" "fmt" "log" "os" "os/signal" "syscall" "github.com/containerd/containerd" "github.com/containerd/containerd/events" "github.com/containerd/typeurl" apievents "github.com/containerd/containerd/api/events" ) const ( address = "/var/snap/docker/current/run/docker/containerd/containerd.sock" namespace = "moby" ) func printEnvelope(env *events.Envelope) { event, err := typeurl.UnmarshalAny(env.Event) if err != nil { log.Printf("ERROR unmarshal %v", err) } var s string // 型毎の処理 switch ev := event.(type) { case *apievents.ContainerCreate: s = fmt.Sprintf("{ id = %s, image = %s }", ev.ID, ev.Image) case *apievents.ContainerDelete: s = fmt.Sprintf("{ id = %s }", ev.ID) case *apievents.TaskCreate: s = fmt.Sprintf( "{ container_id = %s, pid = %d, bundle = %s }", ev.ContainerID, ev.Pid, ev.Bundle, ) case *apievents.TaskStart: s = fmt.Sprintf( "{ container_id = %s, pid = %d }", ev.ContainerID, ev.Pid, ) case *apievents.TaskExit: s = fmt.Sprintf( "{ container_id = %s, pid = %d, exit_status = %d }", ev.ContainerID, ev.Pid, ev.ExitStatus, ) case *apievents.TaskDelete: s = fmt.Sprintf( "{ container_id = %s, pid = %d, exit_status = %d }", ev.ContainerID, ev.Pid, ev.ExitStatus, ) } log.Printf( "topic = %s, namespace = %s, event.typeurl = %s, event = %v", env.Topic, env.Namespace, env.Event.TypeUrl, s, ) } func main() { client, err := containerd.New( address, containerd.WithDefaultNamespace(namespace), ) if err != nil { log.Fatal(err) } //defer client.Close() defer func() { log.Print("close") // defer の実施を確認するためのログ出力 client.Close() }() // 終了シグナルなどの受信設定 sig := make(chan os.Signal, 1) signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) ctx, cancel := context.WithCancel(context.Background()) go func() { s := <-sig log.Printf("syscall: %v", s) // Context のキャンセルを実施 cancel() }() ch, errs := client.Subscribe(ctx) for { select { case env := <-ch: printEnvelope(env) case e := <-errs: log.Printf("ERROR %v", e) // defer を呼び出して終了するための措置 return } } }
実行
ビルドと実行
$ go build subscribe2.go $ sudo ./subscribe2
docker run 実行と終了
$ docker run --rm -it alpine /bin/sh / # exit
subscribe2 のプロセスを kill します。
subscribe2 プロセスの終了(kill)
$ sudo kill `pgrep subscribe2`
出力結果は以下のようになり、defer の実行を確認できました。
subscribe2 出力結果
$ sudo ./subscribe2 2020/12/06 04:47:13 topic = /containers/create, namespace = moby, event.typeurl = containerd.events.ContainerCreate, event = { id = 988b・・・, image = } 2020/12/06 04:47:14 topic = /tasks/create, namespace = moby, event.typeurl = containerd.events.TaskCreate, event = { container_id = 988b・・・, pid = 6136, bundle = /var/snap/docker/471/run/docker/containerd/daemon/io.containerd.runtime.v1.linux/moby/988b・・・ } 2020/12/06 04:47:14 topic = /tasks/start, namespace = moby, event.typeurl = containerd.events.TaskStart, event = { container_id = 988b・・・, pid = 6136 } 2020/12/06 04:47:19 topic = /tasks/exit, namespace = moby, event.typeurl = containerd.events.TaskExit, event = { container_id = 988b・・・, pid = 6136, exit_status = 0 } 2020/12/06 04:47:19 topic = /tasks/delete, namespace = moby, event.typeurl = containerd.events.TaskDelete, event = { container_id = 988b・・・, pid = 6136, exit_status = 0 } 2020/12/06 04:47:20 topic = /containers/delete, namespace = moby, event.typeurl = containerd.events.ContainerDelete, event = { id = 988b・・・ } 2020/12/06 04:48:18 syscall: terminated 2020/12/06 04:48:18 ERROR rpc error: code = Canceled desc = context canceled 2020/12/06 04:48:18 close
Node.js で GraphQL over gRPC 的な事をやってみる
gRPC 上で GraphQL を扱う GraphQL over gRPC 的な処理を Node.js で試しに実装してみました。
今回のコードは http://github.com/fits/try_samples/tree/master/blog/20201124/
はじめに
GraphQL はクエリ言語なので基本的に通信プロトコルには依存していません。
Web フロントエンドの用途では Apollo GraphQL が公開している GraphQL over WebSocket Protocol が有力そうですが、マイクロサービス等の用途で GraphQL を利用する事を考えると WebSocket よりも gRPC の方が適しているように思います。
- GraphQL の Query や Mutation は gRPC の Unary RPC で実現可能
- GraphQL の Subscription は gRPC の Server streaming RPC で実現可能 ※
そこで、とりあえず実装し確認してみたというのが本件の趣旨となっています。
※ GraphQL の Subscription を使わずに gRPC の streaming RPC で代用する事も考えられる
なお、GraphQL の処理に関しては「Deno で GraphQL」の内容をベースに、gRPC は「Node.js で gRPC を試す」の静的コード生成を用いて実装しています。
Query と Mutation - sample1
まずは Subscription を除いた Query と Mutation について実装してみます。
gRPC サービス定義
gRPC のサービス定義を下記のように定義してみました。
GraphQL のクエリは文字列で扱うので型は string
、結果は実質的に JSON となるので型は google.protobuf.Struct
としました。
ついでに、クエリの変数も渡せるようにして型は google.protobuf.Value
としています。
ここで、google.protobuf.Struct
は JSON Object を Protocol Buffers で表現するための型として定義されたもので、google.protobuf.Value
は JSON Value(null、文字列、数値、配列、JSON Object 等)を表現するための型です。(参照 google/protobuf/struct.proto)
proto/graphql.proto
syntax = "proto3"; import "google/protobuf/struct.proto"; package gql; message QueryRequest { string query = 1; google.protobuf.Value variables = 2; } service GraphQL { rpc Query(QueryRequest) returns (google.protobuf.Struct); }
この proto/graphql.proto から gRPC のコードを生成しておきます。
静的コード生成
> mkdir generated > npm run gen-grpc ・・・ grpc_tools_node_protoc --grpc_out=grpc_js:generated --js_out=import_style=commonjs:generated proto/*.proto
package.json の内容は以下の通りです。
package.json
{ "name": "sample1", "version": "1.0.0", "description": "", "scripts": { "gen-grpc": "grpc_tools_node_protoc --grpc_out=grpc_js:generated --js_out=import_style=commonjs:generated proto/*.proto" }, "dependencies": { "@grpc/grpc-js": "^1.2.1", "google-protobuf": "^3.14.0", "graphql": "^15.4.0", "uuid": "^8.3.1" }, "devDependencies": { "grpc-tools": "^1.10.0" } }
サーバー実装
GraphQL を扱う gRPC サーバーを実装します。
gRPC のリクエストから GraphQL のクエリやその変数の内容を取得し、graphql
関数で処理した結果をレスポンスとして返すような処理となります。
google.protobuf.Struct や Value に該当する型は google-protobuf/google/protobuf/struct_pb.js
にて Struct
や Value
として定義されており、プレーンな JavaScript オブジェクトと相互変換するための fromJavaScript
や toJavaScript
メソッドが用意されています。
下記コードでは QueryRequest の variables の内容を JavaScript オブジェクトとして取得するために toJavaScript
を、graphql の処理結果を Struct で返すために fromJavaScript
をそれぞれ使用しています。
server.js
const grpc = require('@grpc/grpc-js') const { GraphQLService } = require('./generated/proto/graphql_grpc_pb') const { Struct } = require('google-protobuf/google/protobuf/struct_pb') const { graphql, buildSchema } = require('graphql') const { v4: uuidv4 } = require('uuid') // GraphQL スキーマ定義 const schema = buildSchema(` enum Category { Standard Extra } input CreateItem { category: Category! value: Int! } type Item { id: ID! category: Category! value: Int! } type Mutation { create(input: CreateItem!): Item } type Query { find(id: ID!): Item } `) const store = {} // スキーマ定義に応じた GraphQL 処理の実装 const root = { create: ({ input: { category, value } }) => { console.log(`*** call create: category = ${category}, value = ${value}`) const id = `item-${uuidv4()}` const item = { id, category, value } store[id] = item return item }, find: ({ id }) => { console.log(`*** call find: ${id}`) return store[id] } } const server = new grpc.Server() server.addService(GraphQLService, { async query(call, callback) { try { const query = call.request.getQuery() const variables = call.request.getVariables().toJavaScript() // GraphQL の処理 const r = await graphql(schema, query, root, {}, variables) callback(null, Struct.fromJavaScript(r)) } catch(e) { console.error(e) callback(e) } } }) server.bindAsync( '127.0.0.1:50051', grpc.ServerCredentials.createInsecure(), (err, port) => { if (err) { console.error(err) return } console.log(`start server: ${port}`) server.start() } )
クライアント実装
クライアント側は以下のようになります。
client.js
const grpc = require('@grpc/grpc-js') const { QueryRequest } = require('./generated/proto/graphql_pb') const { GraphQLClient } = require('./generated/proto/graphql_grpc_pb') const { Value } = require('google-protobuf/google/protobuf/struct_pb') const client = new GraphQLClient( '127.0.0.1:50051', grpc.credentials.createInsecure() ) const promisify = (obj, methodName) => args => new Promise((resolve, reject) => { obj[methodName](args, (err, res) => { if (err) { reject(err) } else { resolve(res) } }) }) const query = promisify(client, 'query') const createRequest = (q, v = null) => { const req = new QueryRequest() req.setQuery(q) req.setVariables(Value.fromJavaScript(v)) return req } const run = async () => { // Item の作成 const r1 = await query(createRequest(` mutation { create(input: { category: Extra, value: 123 }) { id } } `)) console.log(r1.toJavaScript()) // 存在しない Item の find const r2 = await query(createRequest(` { find(id: "a1") { id value } } `)) console.log(r2.toJavaScript()) const id = r1.toJavaScript().data.create.id // 作成した Item の find (クエリ変数の使用) const r3 = await query(createRequest( ` query findItem($id: ID!) { find(id: $id) { id category value } } `, { id } )) console.log(r3.toJavaScript()) } run().catch(err => console.error(err))
動作確認
server.js を実行しておきます。
server.js 実行
> node server.js start server: 50051
client.js を実行した結果は以下の通りで、特に問題無く動作しているようです。
client.js 実行
> node client.js { data: { create: { id: 'item-63bb7704-27b6-44ae-b955-61cbad83248d' } } } { data: { find: null } } { data: { find: { category: 'Extra', id: 'item-63bb7704-27b6-44ae-b955-61cbad83248d', value: 123 } } }
Subscription - sample2
次は Subscription の機能を追加します。
gRPC サービス定義
gRPC のサービス定義に Subscription 用のメソッドを追加し、sample1 と同様にコードを生成しておきます。
proto/graphql.proto
syntax = "proto3"; import "google/protobuf/struct.proto"; package gql; message QueryRequest { string query = 1; google.protobuf.Value variables = 2; } service GraphQL { rpc Query(QueryRequest) returns (google.protobuf.Struct); rpc Subscription(QueryRequest) returns (stream google.protobuf.Struct); }
サーバー実装
「Deno で GraphQL」では単一の Subscription を処理するだけの実装だったので、複数クライアントからの Subscription を処理するために PubSub
というクラスを追加し、subscription の呼び出し毎に MessageBox を作成、(クライアントが接続中の)有効な全ての MessageBox へメッセージを配信するようにしています。
server.js
const grpc = require('@grpc/grpc-js') const { GraphQLService } = require('./generated/proto/graphql_grpc_pb') const { Struct } = require('google-protobuf/google/protobuf/struct_pb') const { graphql, buildSchema, subscribe, parse } = require('graphql') const { v4: uuidv4 } = require('uuid') // GraphQL スキーマ定義 const schema = buildSchema(` enum Category { Standard Extra } input CreateItem { category: Category! value: Int! } type Item { id: ID! category: Category! value: Int! } type Mutation { create(input: CreateItem!): Item } type Query { find(id: ID!): Item } type Subscription { created: Item } `) class MessageBox { #promises = [] #resolves = [] #appendPromise = () => this.#promises.push( new Promise(res => this.#resolves.push(res)) ) publish(msg) { if (this.#resolves.length == 0) { this.#appendPromise() } this.#resolves.shift()(msg) } [Symbol.asyncIterator]() { return { next: async () => { console.log('*** asyncIterator next') if (this.#promises.length == 0) { this.#appendPromise() } const value = await this.#promises.shift() return { value, done: false } } } } } // クライアント毎の MessageBox を管理 class PubSub { #subscribes = [] publish(msg) { this.#subscribes.forEach(s => s.publish(msg)) } subscribe() { const sub = new MessageBox() this.#subscribes.push(sub) return sub } unsubscribe(sub) { this.#subscribes = this.#subscribes.filter(s => s != sub) } } const store = {} const pubsub = new PubSub() const root = { create: ({ input: { category, value } }) => { ・・・ }, find: ({ id }) => { ・・・ } } const server = new grpc.Server() server.addService(GraphQLService, { async query(call, callback) { ・・・ }, async subscription(call) { console.log('*** subscribed') try { const query = call.request.getQuery() const variables = call.request.getVariables().toJavaScript() const sub = pubsub.subscribe() call.on('cancelled', () => { console.log('*** unsubscribed') pubsub.unsubscribe(sub) }) const subRoot = { created: () => sub } // GraphQL の Subscription 処理 const aiter = await subscribe(schema, parse(query), subRoot, {}, variables) for await (const r of aiter) { // メッセージの配信 call.write(Struct.fromJavaScript(r)) } } catch(e) { console.error(e) call.destroy(e) } } }) server.bindAsync( ・・・ )
Subscription 用クライアント実装
Subscription を呼び出すクライアントは以下のようになります。
client_subscribe.js
const grpc = require('@grpc/grpc-js') const { QueryRequest } = require('./generated/proto/graphql_pb') const { GraphQLClient } = require('./generated/proto/graphql_grpc_pb') const { Value } = require('google-protobuf/google/protobuf/struct_pb') const client = new GraphQLClient( '127.0.0.1:50051', grpc.credentials.createInsecure() ) const req = new QueryRequest() req.setQuery(` subscription { created { id category } } `) req.setVariables(Value.fromJavaScript(null)) const stream = client.subscription(req) stream.on('data', msg => { const event = msg.toJavaScript() console.log(`*** received event = ${JSON.stringify(event)}`) }) stream.on('end', () => console.log('*** stream end')) stream.on('error', err => console.log(`*** stream error: ${err}`))
動作確認
server.js を実行した後、client_subscribe.js を 2つ実行して sample1 で作成した client.js を実行すると以下のようになりました。
server.js の出力結果
> node server.js start server: 50051 *** subscribed *** asyncIterator next *** subscribed *** asyncIterator next *** call create: category = Extra, value = 123 *** asyncIterator next *** asyncIterator next *** call find: a1 *** call find: item-5e3f81ed-774a-4f7f-afc5-000a2db34859
client_subscribe.js の出力結果
> node client_subscribe.js *** received event = {"data":{"created":{"category":"Extra","id":"item-5e3f81ed-774a-4f7f-afc5-000a2db34859"}}}
Go言語と Rust で Mutex による排他制御
以下の 3通りを Go 言語と Rust でそれぞれ実装してみました。
サンプルコードは http://github.com/fits/try_samples/tree/master/blog/20201122/
Go 言語の場合
まずは Go 言語による実装です。 Go 言語では goroutine で軽量スレッドによる並行処理を実施できます。
ここでは、goroutine の終了を待機するために sync.WaitGroup
を使いました。
WaitGroup では Add
した数に相当する Done
が呼び出されるまで Wait
でブロックして待機する事が可能です。
go_mutex_sample.go
package main import ( "fmt" "sync" ) type Data struct { value int } // (a) func noLock() { var wg sync.WaitGroup var ds []Data for i := 0; i < 100; i++ { wg.Add(1) go func() { ds = append(ds, Data{i}) wg.Done() }() } // goroutine の終了を待機 wg.Wait() fmt.Println("(a) noLock length =", len(ds)) } // (b) func useMutex() { var wg sync.WaitGroup var mu sync.Mutex var ds []Data for i := 0; i < 100; i++ { wg.Add(1) go func() { mu.Lock() ds = append(ds, Data{i}) mu.Unlock() wg.Done() }() } wg.Wait() fmt.Println("(b) useMutex length =", len(ds)) } // (c) func useRWMutex() { var wg sync.WaitGroup var mu sync.RWMutex var ds []Data for i := 0; i < 100; i++ { wg.Add(1) go func() { mu.Lock() ds = append(ds, Data{i}) mu.Unlock() wg.Done() }() } for i := 0; i < 5; i++ { wg.Add(1) go func() { mu.RLock() fmt.Println("(c) progress length =", len(ds)) mu.RUnlock() wg.Done() }() } wg.Wait() fmt.Println("(c) useRWMutex length =", len(ds)) } func main() { noLock() println("-----") useMutex() println("-----") useRWMutex() }
実行結果は以下の通りです。
排他制御を行っていない (a) では、同じ状態の ds
に対して複数の goroutine が ds = append(ds, Data{i})
を実行してしまうケースが発生するため、基本的に結果が 100 にはなりません。
実行結果
> go build go_mutex_sample.go > go_mutex_sample (a) noLock length = 95 ----- (b) useMutex length = 100 ----- (c) progress length = 71 (c) progress length = 72 (c) progress length = 72 (c) progress length = 72 (c) progress length = 72 (c) useRWMutex length = 100
Rust の場合
次は Rust による実装です。
ここでは thread::spawn
で並行処理を実施し join
で待機しています。
基本的に Rust では、Go 言語で実装した noLock
のようなスレッドセーフでは無い処理はコンパイルエラーとなって実装できないように工夫されています。
スレッド間で安全に所有権を共有するには、スレッドセーフな参照カウントのポインタである Arc
を使用する事になります。
排他制御なしの (a) の処理(下記コードの no_lock
)をコンパイルが通るように一応は実装してみましたが、この Arc::get_mut(&mut ds)
は常に None
を返すので ds.push(Data(i))
の処理は実行されません。
rust_mutex_sample.rs
use std::thread; use std::sync::{Arc, Mutex, RwLock}; struct Data(i32); // (a) fn no_lock() { let mut hs = Vec::new(); let ds = Arc::new(Vec::new()); for i in 0..100 { let mut ds = ds.clone(); hs.push( thread::spawn(move || { if let Some(ds) = Arc::get_mut(&mut ds) { // 上記は常に None となるので下記処理は実行されない ds.push(Data(i)) } }) ); } for h in hs { let _ = h.join(); } println!("(a) no_lock length = {}", ds.len()); } // (b) fn use_mutex() { let mut hs = Vec::new(); let ds = Arc::new(Mutex::new(Vec::new())); for i in 0..100 { let ds = ds.clone(); hs.push( thread::spawn(move || { if let Ok(mut ds) = ds.lock() { ds.push(Data(i)); } }) ); } for h in hs { let _ = h.join(); } println!("(b) use_mutex length = {}", ds.lock().unwrap().len()); } // (c) fn use_rwlock() { let mut hs = Vec::new(); let ds = Arc::new(RwLock::new(Vec::new())); for i in 0..100 { let ds = ds.clone(); hs.push( thread::spawn(move || { if let Ok(mut ds) = ds.write() { ds.push(Data(i)); } }) ); } for _ in 0..5 { let ds = ds.clone(); hs.push( thread::spawn(move || { if let Ok(ds) = ds.read() { println!("(c) progress length = {}", ds.len()); } }) ); } for h in hs { let _ = h.join(); } println!("(c) use_rwlock length = {}", ds.read().unwrap().len()); } fn main() { no_lock(); println!("-----"); use_mutex(); println!("-----"); use_rwlock(); }
実行結果は以下の通りです。
no_lock
内の ds.push(Data(i))
は実行されないので結果は 0 となります。
実行結果
> rustc rust_mutex_sample.rs > rust_mutex_sample (a) no_lock length = 0 ----- (b) use_mutex length = 100 ----- (c) progress length = 99 (c) progress length = 99 (c) progress length = 99 (c) progress length = 99 (c) progress length = 99 (c) use_rwlock length = 100