Docker の containerd ランタイムからイベントを受信
Docker の containerd ランタイムからイベントを受信する処理を Go 言語で実装してみました。
今回のコードは http://github.com/fits/try_samples/tree/master/blog/20201206/
はじめに
ここでは、snap でインストールした Docker の containerd へ接続します。
バージョンは以下の通りです。
$ docker version ・・・ Server: Engine: Version: 19.03.11 API version: 1.40 (minimum version 1.12) Go version: go1.13.12 Git commit: 77e06fd Built: Mon Jun 8 20:24:59 2020 OS/Arch: linux/amd64 Experimental: false containerd: Version: v1.2.13 GitCommit: 7ad184331fa3e55e52b890ea95e65ba581ae3429 ・・・
containerd へ接続するために containerd.sock
を使いますが、この環境では以下のパスで参照できるようになっていました。
/var/snap/docker/current/run/docker/containerd/containerd.sock
更に、ネームスペースは docker ではなく moby
となっていました。
コンテナのリストアップ
まずは、ネームスペースの確認も兼ねて containerd で実行中のコンテナをリストアップしてみます。
listup.go
package main import ( "context" "log" "github.com/containerd/containerd" "github.com/containerd/containerd/namespaces" ) const ( address = "/var/snap/docker/current/run/docker/containerd/containerd.sock" ) func main() { client, err := containerd.New(address) if err != nil { log.Fatal(err) } defer client.Close() ctx := context.Background() ns, err := client.NamespaceService().List(ctx) if err != nil { log.Printf("ERROR %v", err) return } log.Printf("namespaces: %#v", ns) // ネームスペースの設定 ctx = namespaces.WithNamespace(ctx, ns[0]) cs, err := client.Containers(ctx) if err != nil { log.Printf("ERROR %v", err) return } log.Printf("containers: %#v", cs) }
実行
ビルド
$ go build listup.go
Docker で何も実行していない状態で実行した結果です。
実行結果1
$ sudo ./listup 2020/12/06 04:16:00 namespaces: []string{"moby"} 2020/12/06 04:16:00 containers: []containerd.Container(nil)
docker run した後の実行結果は次のようになりました。
docker run 実行
$ docker run --rm -it alpine /bin/sh / #
実行結果2
$ sudo ./listup 2020/12/06 04:20:34 namespaces: []string{"moby"} 2020/12/06 04:20:34 containers: []containerd.Container{(*containerd.container)(0xc00017e180)}
Subscribe 処理1
Subscribe
メソッドを呼び出して containerd からのイベントを受信してみます。
Subscribe は以下のようなシグネチャとなっています。
https://github.com/containerd/containerd/blob/master/client.go
func (c *Client) Subscribe(ctx context.Context, filters ...string) (ch <-chan *events.Envelope, errs <-chan error) { ・・・ }
events.Envelope
はこのようになっています。
https://github.com/containerd/containerd/blob/master/events/events.go
type Envelope struct { Timestamp time.Time Namespace string Topic string Event *types.Any }
イベントの内容は Event
フィールドに格納されており、このようになっています。
https://github.com/gogo/protobuf/blob/master/types/any.pb.go
type Any struct { TypeUrl string `protobuf:"bytes,1,opt,name=type_url,json=typeUrl,proto3" json:"type_url,omitempty"` Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` }
TypeUrl
に型名、Value
に Protocol Buffers でシリアライズした結果(バイナリデータ)が設定されており、イベントの内容を参照するには typeurl.UnmarshalAny
を使ってデシリアライズする必要がありました。
なお、UnmarshalAny でイベントの内容を復元するには github.com/containerd/containerd/api/events
を import する必要がありました。※
※ import しておかないと type with url containerd.TaskExit: not found のようなエラーが返される事になります
また、listup.go では Context へネームスペースを設定しましたが、ここでは containerd.WithDefaultNamespace
を使ってデフォルトのネームスペースを containerd.New
時に指定するようにしています。
subscribe1.go
package main import ( "context" "log" "github.com/containerd/containerd" "github.com/containerd/containerd/events" "github.com/containerd/typeurl" // UnmarshalAny でイベントの内容を復元するために以下の import が必要 _ "github.com/containerd/containerd/api/events" ) const ( address = "/var/snap/docker/current/run/docker/containerd/containerd.sock" namespace = "moby" ) func printEnvelope(env *events.Envelope) { // イベント内容のデシリアライズ event, err := typeurl.UnmarshalAny(env.Event) if err != nil { log.Printf("ERROR unmarshal %v", err) } log.Printf( "topic = %s, namespace = %s, event.typeurl = %s, event = %v", env.Topic, env.Namespace, env.Event.TypeUrl, event, ) } func main() { client, err := containerd.New( address, containerd.WithDefaultNamespace(namespace), // デフォルトのネームスペースを指定 ) if err != nil { log.Fatal(err) } defer client.Close() ctx := context.Background() // Subscribe の実施 ch, errs := client.Subscribe(ctx) for { select { case env := <-ch: printEnvelope(env) case e := <-errs: // log.Fatal は defer を実行せずに終了してしまう log.Fatal(e) } } }
実行
ビルドと実行
$ go build subscribe1.go $ sudo ./subscribe1
docker run を実行した後、すぐに終了してみます。
docker run 実行と終了
$ docker run --rm -it alpine /bin/sh / # exit
出力結果は以下のようになりました。
subscribe1 出力結果
$ sudo ./subscribe1 2020/12/06 04:33:18 topic = /containers/create, namespace = moby, event.typeurl = containerd.events.ContainerCreate, event = &ContainerCreate{ID:c7bed・・・,Image:,Runtime:&ContainerCreate_Runtime{Name:io.containerd.runtime.v1.linux,Options:&types.Any{TypeUrl:containerd.linux.runc.RuncOptions,Value:[10 4 114 ・・・ 99],XXX_unrecognized:[],},XXX_unrecognized:[],},XXX_unrecognized:[],} 2020/12/06 04:33:18 topic = /tasks/create, namespace = moby, event.typeurl = containerd.events.TaskCreate, event = &TaskCreate{ContainerID:c7bed・・・,Bundle:/var/snap/docker/471/run/docker/containerd/daemon/io.containerd.runtime.v1.linux/moby/c7bed・・・,Rootfs:[]*Mount{},IO:&TaskIO{・・・,},Checkpoint:,Pid:5926,XXX_unrecognized:[],} 2020/12/06 04:33:18 topic = /tasks/start, namespace = moby, event.typeurl = containerd.events.TaskStart, event = &TaskStart{ContainerID:c7bed・・・,Pid:5926,XXX_unrecognized:[],} 2020/12/06 04:33:24 topic = /tasks/exit, namespace = moby, event.typeurl = containerd.events.TaskExit, event = &TaskExit{ContainerID:c7bed・・・,Pid:5926,ExitStatus:0,ExitedAt:2020-12-05 19:33:24.325472986 +0000 UTC,XXX_unrecognized:[],} 2020/12/06 04:33:24 topic = /tasks/delete, namespace = moby, event.typeurl = containerd.events.TaskDelete, event = &TaskDelete{ContainerID:c7bed・・・,Pid:5926,ExitStatus:0,ExitedAt:2020-12-05 19:33:24.325472986 +0000 UTC,ID:,XXX_unrecognized:[],} 2020/12/06 04:33:25 topic = /containers/delete, namespace = moby, event.typeurl = containerd.events.ContainerDelete, event = &ContainerDelete{ID:c7bed・・・,XXX_unrecognized:[],}
Subscribe 処理2
switch case を使ってイベントの型毎に出力内容を変更するようにしてみます。
subscribe1.go は kill したり Subscribe でエラーが発生した場合に defer で指定した client.Close()
を実行しないので、ここでは Context のキャンセル機能を使って client.Close()
を呼び出して終了するようにしてみました。
subscribe2.go
package main import ( "context" "fmt" "log" "os" "os/signal" "syscall" "github.com/containerd/containerd" "github.com/containerd/containerd/events" "github.com/containerd/typeurl" apievents "github.com/containerd/containerd/api/events" ) const ( address = "/var/snap/docker/current/run/docker/containerd/containerd.sock" namespace = "moby" ) func printEnvelope(env *events.Envelope) { event, err := typeurl.UnmarshalAny(env.Event) if err != nil { log.Printf("ERROR unmarshal %v", err) } var s string // 型毎の処理 switch ev := event.(type) { case *apievents.ContainerCreate: s = fmt.Sprintf("{ id = %s, image = %s }", ev.ID, ev.Image) case *apievents.ContainerDelete: s = fmt.Sprintf("{ id = %s }", ev.ID) case *apievents.TaskCreate: s = fmt.Sprintf( "{ container_id = %s, pid = %d, bundle = %s }", ev.ContainerID, ev.Pid, ev.Bundle, ) case *apievents.TaskStart: s = fmt.Sprintf( "{ container_id = %s, pid = %d }", ev.ContainerID, ev.Pid, ) case *apievents.TaskExit: s = fmt.Sprintf( "{ container_id = %s, pid = %d, exit_status = %d }", ev.ContainerID, ev.Pid, ev.ExitStatus, ) case *apievents.TaskDelete: s = fmt.Sprintf( "{ container_id = %s, pid = %d, exit_status = %d }", ev.ContainerID, ev.Pid, ev.ExitStatus, ) } log.Printf( "topic = %s, namespace = %s, event.typeurl = %s, event = %v", env.Topic, env.Namespace, env.Event.TypeUrl, s, ) } func main() { client, err := containerd.New( address, containerd.WithDefaultNamespace(namespace), ) if err != nil { log.Fatal(err) } //defer client.Close() defer func() { log.Print("close") // defer の実施を確認するためのログ出力 client.Close() }() // 終了シグナルなどの受信設定 sig := make(chan os.Signal, 1) signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) ctx, cancel := context.WithCancel(context.Background()) go func() { s := <-sig log.Printf("syscall: %v", s) // Context のキャンセルを実施 cancel() }() ch, errs := client.Subscribe(ctx) for { select { case env := <-ch: printEnvelope(env) case e := <-errs: log.Printf("ERROR %v", e) // defer を呼び出して終了するための措置 return } } }
実行
ビルドと実行
$ go build subscribe2.go $ sudo ./subscribe2
docker run 実行と終了
$ docker run --rm -it alpine /bin/sh / # exit
subscribe2 のプロセスを kill します。
subscribe2 プロセスの終了(kill)
$ sudo kill `pgrep subscribe2`
出力結果は以下のようになり、defer の実行を確認できました。
subscribe2 出力結果
$ sudo ./subscribe2 2020/12/06 04:47:13 topic = /containers/create, namespace = moby, event.typeurl = containerd.events.ContainerCreate, event = { id = 988b・・・, image = } 2020/12/06 04:47:14 topic = /tasks/create, namespace = moby, event.typeurl = containerd.events.TaskCreate, event = { container_id = 988b・・・, pid = 6136, bundle = /var/snap/docker/471/run/docker/containerd/daemon/io.containerd.runtime.v1.linux/moby/988b・・・ } 2020/12/06 04:47:14 topic = /tasks/start, namespace = moby, event.typeurl = containerd.events.TaskStart, event = { container_id = 988b・・・, pid = 6136 } 2020/12/06 04:47:19 topic = /tasks/exit, namespace = moby, event.typeurl = containerd.events.TaskExit, event = { container_id = 988b・・・, pid = 6136, exit_status = 0 } 2020/12/06 04:47:19 topic = /tasks/delete, namespace = moby, event.typeurl = containerd.events.TaskDelete, event = { container_id = 988b・・・, pid = 6136, exit_status = 0 } 2020/12/06 04:47:20 topic = /containers/delete, namespace = moby, event.typeurl = containerd.events.ContainerDelete, event = { id = 988b・・・ } 2020/12/06 04:48:18 syscall: terminated 2020/12/06 04:48:18 ERROR rpc error: code = Canceled desc = context canceled 2020/12/06 04:48:18 close