Skip to content

Commit

Permalink
feat(experimental): add grouping of multiple pods on one shim
Browse files Browse the repository at this point in the history
Multiple pods can be grouped into one shim by setting the annotation
`io.containerd.runc.v2.group: "zeropod"`. The value defines the group
id for each of which one shim process is started. This commit ensures
that annotation is passed to the shim and also fixes the metrics to
properly cleanup when grouping and pods are removed.

This is currently marked as experimental since not much testing has been
done and new issues might surface when using grouping.
  • Loading branch information
ctrox committed May 9, 2024
1 parent 72bc23f commit 13a64fb
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 30 deletions.
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,14 @@ zeropod.ctrox.dev/pre-dump: "true"
# killed on scale-down and all state is lost. This might be useful for some
# use-cases where the application is stateless and super fast to startup.
zeropod.ctrox.dev/disable-checkpointing: "true"

# Experimental:
# It's possible to reduce the resource usage further by grouping multiple pods
# into one shim process. The value of the annotation specifies the group id,
# each of which will result in a shim process. This is currently marked as
# experimental since not much testing has been done and new issues might
# surface when using grouping.
io.containerd.runc.v2.group: "zeropod"
```
## zeropod-node
Expand Down
3 changes: 2 additions & 1 deletion cmd/installer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ network-lock skip
"zeropod.ctrox.dev/container-names",
"zeropod.ctrox.dev/scaledown-duration",
"zeropod.ctrox.dev/disable-checkpointing",
"zeropod.ctrox.dev/pre-dump"
"zeropod.ctrox.dev/pre-dump",
"io.containerd.runc.v2.group"
]
`
)
Expand Down
32 changes: 20 additions & 12 deletions config/examples/nginx.yaml
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
apiVersion: v1
kind: Pod
apiVersion: apps/v1
kind: Deployment
metadata:
name: nginx
labels:
app: nginx
annotations:
zeropod.ctrox.dev/scaledown-duration: 10s
spec:
runtimeClassName: zeropod
containers:
- name: nginx
image: nginx
ports:
- containerPort: 80
replicas: 3
selector:
matchLabels:
app: nginx
template:
metadata:
labels:
app: nginx
annotations:
io.containerd.runc.v2.group: "zeropod"
zeropod.ctrox.dev/scaledown-duration: 10s
spec:
runtimeClassName: zeropod
containers:
- image: nginx
name: nginx
ports:
- containerPort: 80
16 changes: 11 additions & 5 deletions runc/task/service_zeropod.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"os"
"path/filepath"
"sync"
"time"

Expand Down Expand Up @@ -70,13 +71,17 @@ func NewZeropodService(ctx context.Context, publisher shim.Publisher, sd shutdow
return nil
})

if address, err := shim.ReadAddress("address"); err == nil {
sd.RegisterCallback(func(context.Context) error {
return shim.RemoveSocket(address)
})
address, err := shim.ReadAddress("address")
if err != nil {
return nil, err
}
sd.RegisterCallback(func(context.Context) error {
return shim.RemoveSocket(address)
})

go zeropod.StartMetricsServer(ctx, filepath.Base(address))

return w, err
return w, nil
}

type wrapper struct {
Expand Down Expand Up @@ -226,6 +231,7 @@ func (w *wrapper) Kill(ctx context.Context, r *taskAPI.KillRequest) (*emptypb.Em
log.G(ctx).Infof("requested scaled down process %d to be killed", zeropodContainer.Process().Pid())
zeropodContainer.Process().SetExited(0)
zeropodContainer.InitialProcess().SetExited(0)
zeropodContainer.Stop(ctx)

return w.service.Kill(ctx, r)
}
Expand Down
7 changes: 1 addition & 6 deletions zeropod/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ type Container struct {
scaleDownTimer *time.Timer
platform stdio.Platform
tracker socket.Tracker
stopMetrics context.CancelFunc
preRestore func() HandleStartedFunc
postRestore func(*runc.Container, HandleStartedFunc)

Expand Down Expand Up @@ -78,9 +77,6 @@ func New(ctx context.Context, cfg *Config, cr *sync.Mutex, container *runc.Conta
return nil, err
}

metricsCtx, stopMetrics := context.WithCancel(ctx)
go startMetricsServer(metricsCtx, container.ID)

c := &Container{
Container: container,
context: ctx,
Expand All @@ -92,7 +88,6 @@ func New(ctx context.Context, cfg *Config, cr *sync.Mutex, container *runc.Conta
logPath: logPath,
netNS: targetNS,
tracker: tracker,
stopMetrics: stopMetrics,
checkpointRestore: cr,
}

Expand Down Expand Up @@ -188,7 +183,7 @@ func (c *Container) Stop(ctx context.Context) {
log.G(ctx).Errorf("unable to close tracker: %s", err)
}
c.StopActivator(ctx)
c.stopMetrics()
c.deleteMetrics()
}

func (c *Container) Process() process.Process {
Expand Down
35 changes: 29 additions & 6 deletions zeropod/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
"os"
"path/filepath"

"github.com/containerd/log"
"github.com/containerd/containerd/runtime/v2/shim"
"github.com/containerd/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
Expand Down Expand Up @@ -75,16 +75,31 @@ func metricsSocketAddress(containerID string) string {
return fmt.Sprintf("unix://%s.sock", filepath.Join(MetricsSocketPath, containerID))
}

func startMetricsServer(ctx context.Context, containerID string) {
func StartMetricsServer(ctx context.Context, containerID string) {
metricsAddress := metricsSocketAddress(containerID)
log.G(ctx).Infof("starting metrics server at %s", metricsAddress)

listener, err := shim.NewSocket(metricsAddress)
if err != nil {
log.G(ctx).WithError(err).Error("failed to create metrics listener")
return
if !shim.SocketEaddrinuse(err) {
log.G(ctx).WithError(err)
return
}

if shim.CanConnect(metricsAddress) {
log.G(ctx).Debug("metrics socket already exists, skipping server start")
return
}

if err := shim.RemoveSocket(metricsAddress); err != nil {
log.G(ctx).WithError(fmt.Errorf("remove pre-existing socket: %w", err))
}

listener, err = shim.NewSocket(metricsAddress)
if err != nil {
log.G(ctx).WithError(err).Error("failed to create metrics listener")
}
}

log.G(ctx).Infof("starting metrics server at %s", metricsAddress)
// write metrics address to filesystem
if err := shim.WriteAddress("metrics_address", metricsAddress); err != nil {
log.G(ctx).WithError(err).Errorf("failed to write metrics address")
Expand Down Expand Up @@ -131,3 +146,11 @@ func (c *Container) labels() map[string]string {
LabelPodNamespace: c.cfg.PodNamespace,
}
}

func (c *Container) deleteMetrics() {
checkpointDuration.Delete(c.labels())
restoreDuration.Delete(c.labels())
lastCheckpointTime.Delete(c.labels())
lastRestoreTime.Delete(c.labels())
running.Delete(c.labels())
}

0 comments on commit 13a64fb

Please sign in to comment.