Docker の containerd ランタイムからイベントを受信

Docker の containerd ランタイムからイベントを受信する処理を Go 言語で実装してみました。

今回のコードは http://github.com/fits/try_samples/tree/master/blog/20201206/

はじめに

ここでは、snap でインストールした Docker の containerd へ接続します。

バージョンは以下の通りです。

$ docker version
・・・
Server:
 Engine:
  Version:          19.03.11
  API version:      1.40 (minimum version 1.12)
  Go version:       go1.13.12
  Git commit:       77e06fd
  Built:            Mon Jun  8 20:24:59 2020
  OS/Arch:          linux/amd64
  Experimental:     false
 containerd:
  Version:          v1.2.13
  GitCommit:        7ad184331fa3e55e52b890ea95e65ba581ae3429
・・・

containerd へ接続するために containerd.sock を使いますが、この環境では以下のパスで参照できるようになっていました。

/var/snap/docker/current/run/docker/containerd/containerd.sock

更に、ネームスペースは docker ではなく moby となっていました。

コンテナのリストアップ

まずは、ネームスペースの確認も兼ねて containerd で実行中のコンテナをリストアップしてみます。

listup.go
package main

import (
    "context"
    "log"

    "github.com/containerd/containerd"
    "github.com/containerd/containerd/namespaces"
)

const (
    address = "/var/snap/docker/current/run/docker/containerd/containerd.sock"
)

func main() {
    client, err := containerd.New(address)

    if err != nil {
        log.Fatal(err)
    }

    defer client.Close()

    ctx := context.Background()

    ns, err := client.NamespaceService().List(ctx)

    if err != nil {
        log.Printf("ERROR %v", err)
        return
    }

    log.Printf("namespaces: %#v", ns)
    // ネームスペースの設定
    ctx = namespaces.WithNamespace(ctx, ns[0])

    cs, err := client.Containers(ctx)

    if err != nil {
        log.Printf("ERROR %v", err)
        return
    }

    log.Printf("containers: %#v", cs)
}

実行

ビルド
$ go build listup.go

Docker で何も実行していない状態で実行した結果です。

実行結果1
$ sudo ./listup
2020/12/06 04:16:00 namespaces: []string{"moby"}
2020/12/06 04:16:00 containers: []containerd.Container(nil)

docker run した後の実行結果は次のようになりました。

docker run 実行
$ docker run --rm -it alpine /bin/sh
/ # 
実行結果2
$ sudo ./listup
2020/12/06 04:20:34 namespaces: []string{"moby"}
2020/12/06 04:20:34 containers: []containerd.Container{(*containerd.container)(0xc00017e180)}

Subscribe 処理1

Subscribe メソッドを呼び出して containerd からのイベントを受信してみます。

Subscribe は以下のようなシグネチャとなっています。

https://github.com/containerd/containerd/blob/master/client.go
func (c *Client) Subscribe(ctx context.Context, filters ...string) (ch <-chan *events.Envelope, errs <-chan error) {
    ・・・
}

events.Envelope はこのようになっています。

https://github.com/containerd/containerd/blob/master/events/events.go
type Envelope struct {
    Timestamp time.Time
    Namespace string
    Topic     string
    Event     *types.Any
}

イベントの内容は Event フィールドに格納されており、このようになっています。

https://github.com/gogo/protobuf/blob/master/types/any.pb.go
type Any struct {
    TypeUrl string `protobuf:"bytes,1,opt,name=type_url,json=typeUrl,proto3" json:"type_url,omitempty"`
    Value                []byte   `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"`
    XXX_NoUnkeyedLiteral struct{} `json:"-"`
    XXX_unrecognized     []byte   `json:"-"`
    XXX_sizecache        int32    `json:"-"`
}

TypeUrl に型名、Value に Protocol Buffers でシリアライズした結果(バイナリデータ)が設定されており、イベントの内容を参照するには typeurl.UnmarshalAny を使ってデシリアライズする必要がありました。

なお、UnmarshalAny でイベントの内容を復元するには github.com/containerd/containerd/api/events を import する必要がありました。※

 ※ import しておかないと
    type with url containerd.TaskExit: not found のようなエラーが返される事になります

また、listup.go では Context へネームスペースを設定しましたが、ここでは containerd.WithDefaultNamespace を使ってデフォルトのネームスペースを containerd.New 時に指定するようにしています。

subscribe1.go
package main

import (
    "context"
    "log"

    "github.com/containerd/containerd"
    "github.com/containerd/containerd/events"
    "github.com/containerd/typeurl"
    // UnmarshalAny でイベントの内容を復元するために以下の import が必要
    _ "github.com/containerd/containerd/api/events" 
)

const (
    address = "/var/snap/docker/current/run/docker/containerd/containerd.sock"
    namespace = "moby"
)

func printEnvelope(env *events.Envelope) {
    // イベント内容のデシリアライズ
    event, err := typeurl.UnmarshalAny(env.Event)

    if err != nil {
        log.Printf("ERROR unmarshal %v", err)
    }

    log.Printf(
        "topic = %s, namespace = %s, event.typeurl = %s, event = %v", 
        env.Topic, env.Namespace, env.Event.TypeUrl, event,
    )
}

func main() {
    client, err := containerd.New(
        address, 
        containerd.WithDefaultNamespace(namespace), // デフォルトのネームスペースを指定
    )

    if err != nil {
        log.Fatal(err)
    }

    defer client.Close()

    ctx := context.Background()
    // Subscribe の実施
    ch, errs := client.Subscribe(ctx)

    for {
        select {
        case env := <-ch:
            printEnvelope(env)
        case e := <-errs:
            // log.Fatal は defer を実行せずに終了してしまう
            log.Fatal(e)
        }
    }
}

実行

ビルドと実行
$ go build subscribe1.go
$ sudo ./subscribe1

docker run を実行した後、すぐに終了してみます。

docker run 実行と終了
$ docker run --rm -it alpine /bin/sh
/ # exit

出力結果は以下のようになりました。

subscribe1 出力結果
$ sudo ./subscribe1
2020/12/06 04:33:18 topic = /containers/create, namespace = moby, event.typeurl = containerd.events.ContainerCreate, event = &ContainerCreate{ID:c7bed・・・,Image:,Runtime:&ContainerCreate_Runtime{Name:io.containerd.runtime.v1.linux,Options:&types.Any{TypeUrl:containerd.linux.runc.RuncOptions,Value:[10 4 114 ・・・ 99],XXX_unrecognized:[],},XXX_unrecognized:[],},XXX_unrecognized:[],}
2020/12/06 04:33:18 topic = /tasks/create, namespace = moby, event.typeurl = containerd.events.TaskCreate, event = &TaskCreate{ContainerID:c7bed・・・,Bundle:/var/snap/docker/471/run/docker/containerd/daemon/io.containerd.runtime.v1.linux/moby/c7bed・・・,Rootfs:[]*Mount{},IO:&TaskIO{・・・,},Checkpoint:,Pid:5926,XXX_unrecognized:[],}
2020/12/06 04:33:18 topic = /tasks/start, namespace = moby, event.typeurl = containerd.events.TaskStart, event = &TaskStart{ContainerID:c7bed・・・,Pid:5926,XXX_unrecognized:[],}
2020/12/06 04:33:24 topic = /tasks/exit, namespace = moby, event.typeurl = containerd.events.TaskExit, event = &TaskExit{ContainerID:c7bed・・・,Pid:5926,ExitStatus:0,ExitedAt:2020-12-05 19:33:24.325472986 +0000 UTC,XXX_unrecognized:[],}
2020/12/06 04:33:24 topic = /tasks/delete, namespace = moby, event.typeurl = containerd.events.TaskDelete, event = &TaskDelete{ContainerID:c7bed・・・,Pid:5926,ExitStatus:0,ExitedAt:2020-12-05 19:33:24.325472986 +0000 UTC,ID:,XXX_unrecognized:[],}
2020/12/06 04:33:25 topic = /containers/delete, namespace = moby, event.typeurl = containerd.events.ContainerDelete, event = &ContainerDelete{ID:c7bed・・・,XXX_unrecognized:[],}

Subscribe 処理2

switch case を使ってイベントの型毎に出力内容を変更するようにしてみます。

subscribe1.go は kill したり Subscribe でエラーが発生した場合に defer で指定した client.Close() を実行しないので、ここでは Context のキャンセル機能を使って client.Close() を呼び出して終了するようにしてみました。

subscribe2.go
package main

import (
    "context"
    "fmt"
    "log"
    "os"
    "os/signal"
    "syscall"

    "github.com/containerd/containerd"
    "github.com/containerd/containerd/events"
    "github.com/containerd/typeurl"
    apievents "github.com/containerd/containerd/api/events"
)

const (
    address = "/var/snap/docker/current/run/docker/containerd/containerd.sock"
    namespace = "moby"
)

func printEnvelope(env *events.Envelope) {
    event, err := typeurl.UnmarshalAny(env.Event)

    if err != nil {
        log.Printf("ERROR unmarshal %v", err)
    }

    var s string
    // 型毎の処理
    switch ev := event.(type) {
    case *apievents.ContainerCreate:
        s = fmt.Sprintf("{ id = %s, image = %s }", ev.ID, ev.Image)
    case *apievents.ContainerDelete:
        s = fmt.Sprintf("{ id = %s }", ev.ID)
    case *apievents.TaskCreate:
        s = fmt.Sprintf(
            "{ container_id = %s, pid = %d, bundle = %s }", 
            ev.ContainerID, ev.Pid, ev.Bundle,
        )
    case *apievents.TaskStart:
        s = fmt.Sprintf(
            "{ container_id = %s, pid = %d }", 
            ev.ContainerID, ev.Pid,
        )
    case *apievents.TaskExit:
        s = fmt.Sprintf(
            "{ container_id = %s, pid = %d, exit_status = %d }", 
            ev.ContainerID, ev.Pid, ev.ExitStatus,
        )
    case *apievents.TaskDelete:
        s = fmt.Sprintf(
            "{ container_id = %s, pid = %d, exit_status = %d }", 
            ev.ContainerID, ev.Pid, ev.ExitStatus,
        )
    }

    log.Printf(
        "topic = %s, namespace = %s, event.typeurl = %s, event = %v", 
        env.Topic, env.Namespace, env.Event.TypeUrl, s,
    )
}

func main() {
    client, err := containerd.New(
        address, 
        containerd.WithDefaultNamespace(namespace),
    )

    if err != nil {
        log.Fatal(err)
    }

    //defer client.Close()
    defer func() {
        log.Print("close") // defer の実施を確認するためのログ出力
        client.Close()
    }()

    // 終了シグナルなどの受信設定
    sig := make(chan os.Signal, 1)
    signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)

    ctx, cancel := context.WithCancel(context.Background())

    go func() {
        s := <-sig
        log.Printf("syscall: %v", s)
        // Context のキャンセルを実施
        cancel()
    }()

    ch, errs := client.Subscribe(ctx)

    for {
        select {
        case env := <-ch:
            printEnvelope(env)
        case e := <-errs:
            log.Printf("ERROR %v", e)
            // defer を呼び出して終了するための措置
            return
        }
    }
}

実行

ビルドと実行
$ go build subscribe2.go
$ sudo ./subscribe2
docker run 実行と終了
$ docker run --rm -it alpine /bin/sh
/ # exit

subscribe2 のプロセスを kill します。

subscribe2 プロセスの終了(kill)
$ sudo kill `pgrep subscribe2`

出力結果は以下のようになり、defer の実行を確認できました。

subscribe2 出力結果
$ sudo ./subscribe2
2020/12/06 04:47:13 topic = /containers/create, namespace = moby, event.typeurl = containerd.events.ContainerCreate, event = { id = 988b・・・, image =  }
2020/12/06 04:47:14 topic = /tasks/create, namespace = moby, event.typeurl = containerd.events.TaskCreate, event = { container_id = 988b・・・, pid = 6136, bundle = /var/snap/docker/471/run/docker/containerd/daemon/io.containerd.runtime.v1.linux/moby/988b・・・ }
2020/12/06 04:47:14 topic = /tasks/start, namespace = moby, event.typeurl = containerd.events.TaskStart, event = { container_id = 988b・・・, pid = 6136 }
2020/12/06 04:47:19 topic = /tasks/exit, namespace = moby, event.typeurl = containerd.events.TaskExit, event = { container_id = 988b・・・, pid = 6136, exit_status = 0 }
2020/12/06 04:47:19 topic = /tasks/delete, namespace = moby, event.typeurl = containerd.events.TaskDelete, event = { container_id = 988b・・・, pid = 6136, exit_status = 0 }
2020/12/06 04:47:20 topic = /containers/delete, namespace = moby, event.typeurl = containerd.events.ContainerDelete, event = { id = 988b・・・ }
2020/12/06 04:48:18 syscall: terminated
2020/12/06 04:48:18 ERROR rpc error: code = Canceled desc = context canceled
2020/12/06 04:48:18 close