From f09f89269056ef380a8b3936834de80bc1e9f4a6 Mon Sep 17 00:00:00 2001 From: Cyrill Troxler Date: Sun, 30 Jun 2024 15:09:14 +0200 Subject: [PATCH] fix: race condition with process exits This gets rid of an older workaround where we set the container to scaled down before checkpointing to avoid the exit caused by the checkpoint to be processed. Instead we now keep track of PIDs which we expect to be checkpointed and then look up the PID when processing an exit to tell if the exit was related to a checkpoint instead of any other reason. --- runc/task/service_zeropod.go | 8 +++++ zeropod/checkpoint.go | 35 +++++++++++++--------- zeropod/container.go | 58 +++++++++++++++++++++++++----------- 3 files changed, 70 insertions(+), 31 deletions(-) diff --git a/runc/task/service_zeropod.go b/runc/task/service_zeropod.go index ebe7ce7..849d253 100644 --- a/runc/task/service_zeropod.go +++ b/runc/task/service_zeropod.go @@ -266,7 +266,9 @@ func (w *wrapper) Kill(ctx context.Context, r *taskAPI.KillRequest) (*emptypb.Em func (w *wrapper) processExits() { for e := range w.ec { + w.lifecycleMu.Lock() cps := w.running[e.Pid] + w.lifecycleMu.Unlock() preventExit := false for _, cp := range cps { if w.preventExit(cp) { @@ -312,6 +314,12 @@ func (w *wrapper) preventExit(cp containerProcess) bool { return true } + if zeropodContainer.CheckpointedPID(cp.Process.Pid()) { + log.G(w.context).Infof("not setting exited because process has been checkpointed: %v", cp.Process.Pid()) + zeropodContainer.DeleteCheckpointedPID(cp.Process.Pid()) + return true + } + // we need to set the original process as being exited so we can exit cleanly if zeropodContainer.InitialProcess() != nil && cp.Process.ID() == zeropodContainer.InitialProcess().ID() || diff --git a/zeropod/checkpoint.go b/zeropod/checkpoint.go index 2f77ac2..4d772e1 100644 --- a/zeropod/checkpoint.go +++ b/zeropod/checkpoint.go @@ -41,21 +41,32 @@ func (c *Container) scaleDown(ctx context.Context) error { } if c.cfg.DisableCheckpointing { - log.G(ctx).Infof("checkpointing is disabled, scaling down by killing") - - c.SetScaledDown(true) - if err := c.process.Kill(ctx, 9, false); err != nil { - return err - } - } else { - if err := c.checkpoint(ctx); err != nil { + if err := c.kill(ctx); err != nil { return err } + return nil + } + + if err := c.checkpoint(ctx); err != nil { + return err } return nil } +func (c *Container) kill(ctx context.Context) error { + c.checkpointRestore.Lock() + defer c.checkpointRestore.Unlock() + log.G(ctx).Infof("checkpointing is disabled, scaling down by killing") + c.AddCheckpointedPID(c.Pid()) + + if err := c.process.Kill(ctx, 9, false); err != nil { + return err + } + c.SetScaledDown(true) + return nil +} + func (c *Container) checkpoint(ctx context.Context) error { c.checkpointRestore.Lock() defer c.checkpointRestore.Unlock() @@ -89,8 +100,6 @@ func (c *Container) checkpoint(ctx context.Context) error { beforePreDump := time.Now() if err := initProcess.Runtime().Checkpoint(ctx, c.ID(), opts, runcC.PreDump); err != nil { - c.SetScaledDown(false) - log.G(ctx).Errorf("error pre-dumping container: %s", err) b, err := os.ReadFile(path.Join(workDir, "dump.log")) if err != nil { @@ -103,20 +112,17 @@ func (c *Container) checkpoint(ctx context.Context) error { log.G(ctx).Infof("pre-dumping done in %s", time.Since(beforePreDump)) } - c.SetScaledDown(true) - if c.cfg.PreDump { // ParentPath is the relative path from the ImagePath to the pre-dump dir. opts.ParentPath = relativePreDumpDir() } + c.AddCheckpointedPID(c.Pid()) // ImagePath is always the same, regardless of pre-dump opts.ImagePath = containerDir(c.Bundle) beforeCheckpoint := time.Now() if err := initProcess.Runtime().Checkpoint(ctx, c.ID(), opts); err != nil { - c.SetScaledDown(false) - log.G(ctx).Errorf("error checkpointing container: %s", err) b, err := os.ReadFile(path.Join(workDir, "dump.log")) if err != nil { @@ -126,6 +132,7 @@ func (c *Container) checkpoint(ctx context.Context) error { return err } + c.SetScaledDown(true) checkpointDuration.With(c.labels()).Observe(time.Since(beforeCheckpoint).Seconds()) log.G(ctx).Infof("checkpointing done in %s", time.Since(beforeCheckpoint)) diff --git a/zeropod/container.go b/zeropod/container.go index 8de2655..6600314 100644 --- a/zeropod/container.go +++ b/zeropod/container.go @@ -25,22 +25,23 @@ type HandleStartedFunc func(*runc.Container, process.Process, bool) type Container struct { *runc.Container - context context.Context - activator *activator.Server - cfg *Config - initialProcess process.Process - process process.Process - cgroup any - logPath string - scaledDown bool - netNS ns.NetNS - scaleDownTimer *time.Timer - platform stdio.Platform - tracker socket.Tracker - preRestore func() HandleStartedFunc - postRestore func(*runc.Container, HandleStartedFunc) - events chan *v1.ContainerStatus - + context context.Context + activator *activator.Server + cfg *Config + initialProcess process.Process + process process.Process + cgroup any + logPath string + scaledDown bool + netNS ns.NetNS + scaleDownTimer *time.Timer + platform stdio.Platform + tracker socket.Tracker + preRestore func() HandleStartedFunc + postRestore func(*runc.Container, HandleStartedFunc) + events chan *v1.ContainerStatus + checkpointedPIDs map[int]struct{} + pidsMu sync.Mutex // mutex to lock during checkpoint/restore operations since concurrent // restores can cause cgroup confusion. This mutex is shared between all // containers. @@ -92,6 +93,7 @@ func New(ctx context.Context, cfg *Config, cr *sync.Mutex, container *runc.Conta tracker: tracker, checkpointRestore: cr, events: events, + checkpointedPIDs: map[int]struct{}{}, } running.With(c.labels()).Set(1) @@ -204,6 +206,28 @@ func (c *Container) StopActivator(ctx context.Context) { c.activator.Stop(ctx) } +// CheckpointedPID indicates if the pid has been checkpointed before. +func (c *Container) CheckpointedPID(pid int) bool { + c.pidsMu.Lock() + defer c.pidsMu.Unlock() + _, ok := c.checkpointedPIDs[pid] + return ok +} + +// AddCheckpointedPID registers a new pid that should be considered checkpointed. +func (c *Container) AddCheckpointedPID(pid int) { + c.pidsMu.Lock() + defer c.pidsMu.Unlock() + c.checkpointedPIDs[pid] = struct{}{} +} + +// DeleteCheckpointedPID deletes a pid from the map of checkpointed pids. +func (c *Container) DeleteCheckpointedPID(pid int) { + c.pidsMu.Lock() + defer c.pidsMu.Unlock() + delete(c.checkpointedPIDs, pid) +} + func (c *Container) Stop(ctx context.Context) { c.CancelScaleDown() if err := c.tracker.Close(); err != nil { @@ -302,7 +326,7 @@ func (c *Container) restoreHandler(ctx context.Context) activator.OnAccept { c.Container = restoredContainer if err := c.tracker.TrackPid(uint32(p.Pid())); err != nil { - return fmt.Errorf("unable to track pid %d: %w", p.Pid(), err) + log.G(ctx).Errorf("unable to track pid %d: %s", p.Pid(), err) } log.G(ctx).Printf("restored process: %d in %s", p.Pid(), time.Since(beforeRestore))