Skip to content

Commit

Permalink
fix: handling process exits after restore
Browse files Browse the repository at this point in the history
When deleting the container at a point in time where the process is
restored our shim would not get the exit signal. To fix this we need to
setup the process handlers on restore as if it was a new container. We
do this by hooking into the lifecycle with the preRestore and
postRestore functions. This allows us to call the relevant task service
functions to setup the exit handlers after every restore.
  • Loading branch information
ctrox committed May 4, 2024
1 parent 4a34cfd commit 0d5e39a
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 13 deletions.
17 changes: 17 additions & 0 deletions e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,23 @@ func TestE2E(t *testing.T) {
assert.GreaterOrEqual(t, restoreCount(t, client, cfg, pod), 2, "pod should have been restored 2 times")
})

t.Run("delete in restored state", func(t *testing.T) {
// as we want to delete the pod when it is in a restored state, we
// first need to make sure it has checkpointed at least once. We give
// it 2 seconds to checkpoint initially and wait 5 seconds to ensure
// it has finished checkpointing.
pod := testPod(scaleDownAfter(time.Second * 2))
cleanupPod := createPodAndWait(t, ctx, client, pod)
defer cleanupPod()

time.Sleep(time.Second * 5)
stdout, stderr, err := podExec(cfg, pod, "date")
require.NoError(t, err)
t.Log(stdout, stderr)
// since the cleanup has been deferred it's called right after the
// exec and should test the deletion in the restored state.
})

t.Run("metrics", func(t *testing.T) {
// create two pods to test metric merging
runningPod := testPod(scaleDownAfter(time.Hour))
Expand Down
16 changes: 15 additions & 1 deletion e2e/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func startKind(t testing.TB, name string, port int) (c *rest.Config, err error)
},
}},
}),
cluster.CreateWithNodeImage("kindest/node:v1.26.3"),
cluster.CreateWithNodeImage("kindest/node:v1.29.2"),
cluster.CreateWithRetain(false),
cluster.CreateWithKubeconfigPath(f.Name()),
cluster.CreateWithWaitForReady(time.Minute*2),
Expand Down Expand Up @@ -275,6 +275,20 @@ func deployNode(t testing.TB, ctx context.Context, c client.Client) error {
return fmt.Errorf("runtimeClass not found")
}

// wait for node pod to be running
nodePods := &corev1.PodList{}
require.NoError(t, c.List(ctx, nodePods, client.MatchingLabels{"app.kubernetes.io/name": "zeropod-node"}))
require.Equal(t, 1, len(nodePods.Items))

pod := &nodePods.Items[0]
require.Eventually(t, func() bool {
if err := c.Get(ctx, objectName(pod), pod); err != nil {
return false
}

return pod.Status.Phase == corev1.PodRunning
}, time.Minute, time.Second, "waiting for node pod to be running")

return nil
}

Expand Down
33 changes: 26 additions & 7 deletions runc/task/service_zeropod.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,12 @@ func (w *wrapper) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
return nil, fmt.Errorf("error creating scaled container: %w", err)
}

zeropodContainer.RegisterSetContainer(func(c *runc.Container) {
w.setContainer(c)
zeropodContainer.RegisterPreRestore(func() zeropod.HandleStartedFunc {
return w.preRestore()
})

zeropodContainer.RegisterPostRestore(func(c *runc.Container, handleStarted zeropod.HandleStartedFunc) {
w.postRestore(c, handleStarted)
})

w.zeropodContainers[r.ID] = zeropodContainer
Expand Down Expand Up @@ -299,10 +303,25 @@ func (w *wrapper) preventExit(cp containerProcess) bool {
return false
}

// setContainer replaces the container in the task service. This is important
// preRestore should be called before restoring as it calls preStart in the
// task service to get the handleStarted closure.
func (w *wrapper) preRestore() zeropod.HandleStartedFunc {
handleStarted, cleanup := w.preStart(nil)
defer cleanup()
return handleStarted
}

// postRestore replaces the container in the task service. This is important
// to call after restore since the container object will have changed.
func (s *service) setContainer(container *runc.Container) {
s.mu.Lock()
s.containers[container.ID] = container
s.mu.Unlock()
// Additionally, this also calls the passed in handleStarted to make sure we
// monitor the process exits of the newly restored process.
func (w *wrapper) postRestore(container *runc.Container, handleStarted zeropod.HandleStartedFunc) {
w.mu.Lock()
p, _ := container.Process("")
w.containers[container.ID] = container
w.mu.Unlock()

if handleStarted != nil {
handleStarted(container, p, false)
}
}
13 changes: 10 additions & 3 deletions zeropod/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"github.com/ctrox/zeropod/socket"
)

type HandleStartedFunc func(*runc.Container, process.Process, bool)

type Container struct {
*runc.Container

Expand All @@ -35,7 +37,8 @@ type Container struct {
platform stdio.Platform
tracker socket.Tracker
stopMetrics context.CancelFunc
setContainer func(container *runc.Container)
preRestore func() HandleStartedFunc
postRestore func(*runc.Container, HandleStartedFunc)

// mutex to lock during checkpoint/restore operations since concurrent
// restores can cause cgroup confusion. This mutex is shared between all
Expand Down Expand Up @@ -192,8 +195,12 @@ func (c *Container) Process() process.Process {
return c.process
}

func (c *Container) RegisterSetContainer(f func(*runc.Container)) {
c.setContainer = f
func (c *Container) RegisterPreRestore(f func() HandleStartedFunc) {
c.preRestore = f
}

func (c *Container) RegisterPostRestore(f func(*runc.Container, HandleStartedFunc)) {
c.postRestore = f
}

var errNoPortsDetected = errors.New("no listening ports detected")
Expand Down
9 changes: 7 additions & 2 deletions zeropod/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ func (c *Container) Restore(ctx context.Context) (*runc.Container, process.Proce
// the process is not yet restored.
container.CgroupSet(c.cgroup)

var handleStarted HandleStartedFunc
if c.preRestore != nil {
handleStarted = c.preRestore()
}

p, err := container.Process("")
if err != nil {
return nil, nil, err
Expand All @@ -76,8 +81,8 @@ func (c *Container) Restore(ctx context.Context) (*runc.Container, process.Proce
c.Container = container
c.process = p

if c.setContainer != nil {
c.setContainer(container)
if c.postRestore != nil {
c.postRestore(container, handleStarted)
}

// process is running again, we don't need to redirect traffic anymore
Expand Down

0 comments on commit 0d5e39a

Please sign in to comment.