Skip to content

Commit

Permalink
Merge pull request #24 from ctrox/process-exits-race
Browse files Browse the repository at this point in the history
fix: race condition with process exits
  • Loading branch information
ctrox authored Jun 30, 2024
2 parents 6bd9a42 + f09f892 commit 86ff9ed
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 31 deletions.
8 changes: 8 additions & 0 deletions runc/task/service_zeropod.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,9 @@ func (w *wrapper) Kill(ctx context.Context, r *taskAPI.KillRequest) (*emptypb.Em

func (w *wrapper) processExits() {
for e := range w.ec {
w.lifecycleMu.Lock()
cps := w.running[e.Pid]
w.lifecycleMu.Unlock()
preventExit := false
for _, cp := range cps {
if w.preventExit(cp) {
Expand Down Expand Up @@ -312,6 +314,12 @@ func (w *wrapper) preventExit(cp containerProcess) bool {
return true
}

if zeropodContainer.CheckpointedPID(cp.Process.Pid()) {
log.G(w.context).Infof("not setting exited because process has been checkpointed: %v", cp.Process.Pid())
zeropodContainer.DeleteCheckpointedPID(cp.Process.Pid())
return true
}

// we need to set the original process as being exited so we can exit cleanly
if zeropodContainer.InitialProcess() != nil &&
cp.Process.ID() == zeropodContainer.InitialProcess().ID() ||
Expand Down
35 changes: 21 additions & 14 deletions zeropod/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,32 @@ func (c *Container) scaleDown(ctx context.Context) error {
}

if c.cfg.DisableCheckpointing {
log.G(ctx).Infof("checkpointing is disabled, scaling down by killing")

c.SetScaledDown(true)
if err := c.process.Kill(ctx, 9, false); err != nil {
return err
}
} else {
if err := c.checkpoint(ctx); err != nil {
if err := c.kill(ctx); err != nil {
return err
}
return nil
}

if err := c.checkpoint(ctx); err != nil {
return err
}

return nil
}

func (c *Container) kill(ctx context.Context) error {
c.checkpointRestore.Lock()
defer c.checkpointRestore.Unlock()
log.G(ctx).Infof("checkpointing is disabled, scaling down by killing")
c.AddCheckpointedPID(c.Pid())

if err := c.process.Kill(ctx, 9, false); err != nil {
return err
}
c.SetScaledDown(true)
return nil
}

func (c *Container) checkpoint(ctx context.Context) error {
c.checkpointRestore.Lock()
defer c.checkpointRestore.Unlock()
Expand Down Expand Up @@ -89,8 +100,6 @@ func (c *Container) checkpoint(ctx context.Context) error {

beforePreDump := time.Now()
if err := initProcess.Runtime().Checkpoint(ctx, c.ID(), opts, runcC.PreDump); err != nil {
c.SetScaledDown(false)

log.G(ctx).Errorf("error pre-dumping container: %s", err)
b, err := os.ReadFile(path.Join(workDir, "dump.log"))
if err != nil {
Expand All @@ -103,20 +112,17 @@ func (c *Container) checkpoint(ctx context.Context) error {
log.G(ctx).Infof("pre-dumping done in %s", time.Since(beforePreDump))
}

c.SetScaledDown(true)

if c.cfg.PreDump {
// ParentPath is the relative path from the ImagePath to the pre-dump dir.
opts.ParentPath = relativePreDumpDir()
}

c.AddCheckpointedPID(c.Pid())
// ImagePath is always the same, regardless of pre-dump
opts.ImagePath = containerDir(c.Bundle)

beforeCheckpoint := time.Now()
if err := initProcess.Runtime().Checkpoint(ctx, c.ID(), opts); err != nil {
c.SetScaledDown(false)

log.G(ctx).Errorf("error checkpointing container: %s", err)
b, err := os.ReadFile(path.Join(workDir, "dump.log"))
if err != nil {
Expand All @@ -126,6 +132,7 @@ func (c *Container) checkpoint(ctx context.Context) error {
return err
}

c.SetScaledDown(true)
checkpointDuration.With(c.labels()).Observe(time.Since(beforeCheckpoint).Seconds())
log.G(ctx).Infof("checkpointing done in %s", time.Since(beforeCheckpoint))

Expand Down
58 changes: 41 additions & 17 deletions zeropod/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,23 @@ type HandleStartedFunc func(*runc.Container, process.Process, bool)
type Container struct {
*runc.Container

context context.Context
activator *activator.Server
cfg *Config
initialProcess process.Process
process process.Process
cgroup any
logPath string
scaledDown bool
netNS ns.NetNS
scaleDownTimer *time.Timer
platform stdio.Platform
tracker socket.Tracker
preRestore func() HandleStartedFunc
postRestore func(*runc.Container, HandleStartedFunc)
events chan *v1.ContainerStatus

context context.Context
activator *activator.Server
cfg *Config
initialProcess process.Process
process process.Process
cgroup any
logPath string
scaledDown bool
netNS ns.NetNS
scaleDownTimer *time.Timer
platform stdio.Platform
tracker socket.Tracker
preRestore func() HandleStartedFunc
postRestore func(*runc.Container, HandleStartedFunc)
events chan *v1.ContainerStatus
checkpointedPIDs map[int]struct{}
pidsMu sync.Mutex
// mutex to lock during checkpoint/restore operations since concurrent
// restores can cause cgroup confusion. This mutex is shared between all
// containers.
Expand Down Expand Up @@ -92,6 +93,7 @@ func New(ctx context.Context, cfg *Config, cr *sync.Mutex, container *runc.Conta
tracker: tracker,
checkpointRestore: cr,
events: events,
checkpointedPIDs: map[int]struct{}{},
}

running.With(c.labels()).Set(1)
Expand Down Expand Up @@ -204,6 +206,28 @@ func (c *Container) StopActivator(ctx context.Context) {
c.activator.Stop(ctx)
}

// CheckpointedPID indicates if the pid has been checkpointed before.
func (c *Container) CheckpointedPID(pid int) bool {
c.pidsMu.Lock()
defer c.pidsMu.Unlock()
_, ok := c.checkpointedPIDs[pid]
return ok
}

// AddCheckpointedPID registers a new pid that should be considered checkpointed.
func (c *Container) AddCheckpointedPID(pid int) {
c.pidsMu.Lock()
defer c.pidsMu.Unlock()
c.checkpointedPIDs[pid] = struct{}{}
}

// DeleteCheckpointedPID deletes a pid from the map of checkpointed pids.
func (c *Container) DeleteCheckpointedPID(pid int) {
c.pidsMu.Lock()
defer c.pidsMu.Unlock()
delete(c.checkpointedPIDs, pid)
}

func (c *Container) Stop(ctx context.Context) {
c.CancelScaleDown()
if err := c.tracker.Close(); err != nil {
Expand Down Expand Up @@ -302,7 +326,7 @@ func (c *Container) restoreHandler(ctx context.Context) activator.OnAccept {
c.Container = restoredContainer

if err := c.tracker.TrackPid(uint32(p.Pid())); err != nil {
return fmt.Errorf("unable to track pid %d: %w", p.Pid(), err)
log.G(ctx).Errorf("unable to track pid %d: %s", p.Pid(), err)
}

log.G(ctx).Printf("restored process: %d in %s", p.Pid(), time.Since(beforeRestore))
Expand Down

0 comments on commit 86ff9ed

Please sign in to comment.