From 13a64fb72a207c7ded307060111d921f7675f5af Mon Sep 17 00:00:00 2001 From: Cyrill Troxler Date: Thu, 9 May 2024 17:24:46 +0200 Subject: [PATCH] feat(experimental): add grouping of multiple pods on one shim Multiple pods can be grouped into one shim by setting the annotation `io.containerd.runc.v2.group: "zeropod"`. The value defines the group id for each of which one shim process is started. This commit ensures that annotation is passed to the shim and also fixes the metrics to properly cleanup when grouping and pods are removed. This is currently marked as experimental since not much testing has been done and new issues might surface when using grouping. --- README.md | 8 ++++++++ cmd/installer/main.go | 3 ++- config/examples/nginx.yaml | 32 ++++++++++++++++++++------------ runc/task/service_zeropod.go | 16 +++++++++++----- zeropod/container.go | 7 +------ zeropod/metrics.go | 35 +++++++++++++++++++++++++++++------ 6 files changed, 71 insertions(+), 30 deletions(-) diff --git a/README.md b/README.md index 16c6c1b..c46c452 100644 --- a/README.md +++ b/README.md @@ -255,6 +255,14 @@ zeropod.ctrox.dev/pre-dump: "true" # killed on scale-down and all state is lost. This might be useful for some # use-cases where the application is stateless and super fast to startup. zeropod.ctrox.dev/disable-checkpointing: "true" + +# Experimental: +# It's possible to reduce the resource usage further by grouping multiple pods +# into one shim process. The value of the annotation specifies the group id, +# each of which will result in a shim process. This is currently marked as +# experimental since not much testing has been done and new issues might +# surface when using grouping. +io.containerd.runc.v2.group: "zeropod" ``` ## zeropod-node diff --git a/cmd/installer/main.go b/cmd/installer/main.go index 523e7f7..544f81b 100644 --- a/cmd/installer/main.go +++ b/cmd/installer/main.go @@ -68,7 +68,8 @@ network-lock skip "zeropod.ctrox.dev/container-names", "zeropod.ctrox.dev/scaledown-duration", "zeropod.ctrox.dev/disable-checkpointing", - "zeropod.ctrox.dev/pre-dump" + "zeropod.ctrox.dev/pre-dump", + "io.containerd.runc.v2.group" ] ` ) diff --git a/config/examples/nginx.yaml b/config/examples/nginx.yaml index 8293b81..25bd61f 100644 --- a/config/examples/nginx.yaml +++ b/config/examples/nginx.yaml @@ -1,15 +1,23 @@ -apiVersion: v1 -kind: Pod +apiVersion: apps/v1 +kind: Deployment metadata: name: nginx - labels: - app: nginx - annotations: - zeropod.ctrox.dev/scaledown-duration: 10s spec: - runtimeClassName: zeropod - containers: - - name: nginx - image: nginx - ports: - - containerPort: 80 + replicas: 3 + selector: + matchLabels: + app: nginx + template: + metadata: + labels: + app: nginx + annotations: + io.containerd.runc.v2.group: "zeropod" + zeropod.ctrox.dev/scaledown-duration: 10s + spec: + runtimeClassName: zeropod + containers: + - image: nginx + name: nginx + ports: + - containerPort: 80 diff --git a/runc/task/service_zeropod.go b/runc/task/service_zeropod.go index 1cdfd16..4a7718a 100644 --- a/runc/task/service_zeropod.go +++ b/runc/task/service_zeropod.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "os" + "path/filepath" "sync" "time" @@ -70,13 +71,17 @@ func NewZeropodService(ctx context.Context, publisher shim.Publisher, sd shutdow return nil }) - if address, err := shim.ReadAddress("address"); err == nil { - sd.RegisterCallback(func(context.Context) error { - return shim.RemoveSocket(address) - }) + address, err := shim.ReadAddress("address") + if err != nil { + return nil, err } + sd.RegisterCallback(func(context.Context) error { + return shim.RemoveSocket(address) + }) + + go zeropod.StartMetricsServer(ctx, filepath.Base(address)) - return w, err + return w, nil } type wrapper struct { @@ -226,6 +231,7 @@ func (w *wrapper) Kill(ctx context.Context, r *taskAPI.KillRequest) (*emptypb.Em log.G(ctx).Infof("requested scaled down process %d to be killed", zeropodContainer.Process().Pid()) zeropodContainer.Process().SetExited(0) zeropodContainer.InitialProcess().SetExited(0) + zeropodContainer.Stop(ctx) return w.service.Kill(ctx, r) } diff --git a/zeropod/container.go b/zeropod/container.go index 64bb030..e5c3d1b 100644 --- a/zeropod/container.go +++ b/zeropod/container.go @@ -36,7 +36,6 @@ type Container struct { scaleDownTimer *time.Timer platform stdio.Platform tracker socket.Tracker - stopMetrics context.CancelFunc preRestore func() HandleStartedFunc postRestore func(*runc.Container, HandleStartedFunc) @@ -78,9 +77,6 @@ func New(ctx context.Context, cfg *Config, cr *sync.Mutex, container *runc.Conta return nil, err } - metricsCtx, stopMetrics := context.WithCancel(ctx) - go startMetricsServer(metricsCtx, container.ID) - c := &Container{ Container: container, context: ctx, @@ -92,7 +88,6 @@ func New(ctx context.Context, cfg *Config, cr *sync.Mutex, container *runc.Conta logPath: logPath, netNS: targetNS, tracker: tracker, - stopMetrics: stopMetrics, checkpointRestore: cr, } @@ -188,7 +183,7 @@ func (c *Container) Stop(ctx context.Context) { log.G(ctx).Errorf("unable to close tracker: %s", err) } c.StopActivator(ctx) - c.stopMetrics() + c.deleteMetrics() } func (c *Container) Process() process.Process { diff --git a/zeropod/metrics.go b/zeropod/metrics.go index f647765..3c27341 100644 --- a/zeropod/metrics.go +++ b/zeropod/metrics.go @@ -7,8 +7,8 @@ import ( "os" "path/filepath" - "github.com/containerd/log" "github.com/containerd/containerd/runtime/v2/shim" + "github.com/containerd/log" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" ) @@ -75,16 +75,31 @@ func metricsSocketAddress(containerID string) string { return fmt.Sprintf("unix://%s.sock", filepath.Join(MetricsSocketPath, containerID)) } -func startMetricsServer(ctx context.Context, containerID string) { +func StartMetricsServer(ctx context.Context, containerID string) { metricsAddress := metricsSocketAddress(containerID) - log.G(ctx).Infof("starting metrics server at %s", metricsAddress) - listener, err := shim.NewSocket(metricsAddress) if err != nil { - log.G(ctx).WithError(err).Error("failed to create metrics listener") - return + if !shim.SocketEaddrinuse(err) { + log.G(ctx).WithError(err) + return + } + + if shim.CanConnect(metricsAddress) { + log.G(ctx).Debug("metrics socket already exists, skipping server start") + return + } + + if err := shim.RemoveSocket(metricsAddress); err != nil { + log.G(ctx).WithError(fmt.Errorf("remove pre-existing socket: %w", err)) + } + + listener, err = shim.NewSocket(metricsAddress) + if err != nil { + log.G(ctx).WithError(err).Error("failed to create metrics listener") + } } + log.G(ctx).Infof("starting metrics server at %s", metricsAddress) // write metrics address to filesystem if err := shim.WriteAddress("metrics_address", metricsAddress); err != nil { log.G(ctx).WithError(err).Errorf("failed to write metrics address") @@ -131,3 +146,11 @@ func (c *Container) labels() map[string]string { LabelPodNamespace: c.cfg.PodNamespace, } } + +func (c *Container) deleteMetrics() { + checkpointDuration.Delete(c.labels()) + restoreDuration.Delete(c.labels()) + lastCheckpointTime.Delete(c.labels()) + lastRestoreTime.Delete(c.labels()) + running.Delete(c.labels()) +}