From 0d5e39aa813fa80e9ec508f872dee4c9090c8c5a Mon Sep 17 00:00:00 2001 From: Cyrill Troxler Date: Sat, 4 May 2024 10:05:21 +0200 Subject: [PATCH] fix: handling process exits after restore When deleting the container at a point in time where the process is restored our shim would not get the exit signal. To fix this we need to setup the process handlers on restore as if it was a new container. We do this by hooking into the lifecycle with the preRestore and postRestore functions. This allows us to call the relevant task service functions to setup the exit handlers after every restore. --- e2e/e2e_test.go | 17 +++++++++++++++++ e2e/setup_test.go | 16 +++++++++++++++- runc/task/service_zeropod.go | 33 ++++++++++++++++++++++++++------- zeropod/container.go | 13 ++++++++++--- zeropod/restore.go | 9 +++++++-- 5 files changed, 75 insertions(+), 13 deletions(-) diff --git a/e2e/e2e_test.go b/e2e/e2e_test.go index ddcb1e1..22066e7 100644 --- a/e2e/e2e_test.go +++ b/e2e/e2e_test.go @@ -179,6 +179,23 @@ func TestE2E(t *testing.T) { assert.GreaterOrEqual(t, restoreCount(t, client, cfg, pod), 2, "pod should have been restored 2 times") }) + t.Run("delete in restored state", func(t *testing.T) { + // as we want to delete the pod when it is in a restored state, we + // first need to make sure it has checkpointed at least once. We give + // it 2 seconds to checkpoint initially and wait 5 seconds to ensure + // it has finished checkpointing. + pod := testPod(scaleDownAfter(time.Second * 2)) + cleanupPod := createPodAndWait(t, ctx, client, pod) + defer cleanupPod() + + time.Sleep(time.Second * 5) + stdout, stderr, err := podExec(cfg, pod, "date") + require.NoError(t, err) + t.Log(stdout, stderr) + // since the cleanup has been deferred it's called right after the + // exec and should test the deletion in the restored state. + }) + t.Run("metrics", func(t *testing.T) { // create two pods to test metric merging runningPod := testPod(scaleDownAfter(time.Hour)) diff --git a/e2e/setup_test.go b/e2e/setup_test.go index a7e3aa1..d9b113a 100644 --- a/e2e/setup_test.go +++ b/e2e/setup_test.go @@ -143,7 +143,7 @@ func startKind(t testing.TB, name string, port int) (c *rest.Config, err error) }, }}, }), - cluster.CreateWithNodeImage("kindest/node:v1.26.3"), + cluster.CreateWithNodeImage("kindest/node:v1.29.2"), cluster.CreateWithRetain(false), cluster.CreateWithKubeconfigPath(f.Name()), cluster.CreateWithWaitForReady(time.Minute*2), @@ -275,6 +275,20 @@ func deployNode(t testing.TB, ctx context.Context, c client.Client) error { return fmt.Errorf("runtimeClass not found") } + // wait for node pod to be running + nodePods := &corev1.PodList{} + require.NoError(t, c.List(ctx, nodePods, client.MatchingLabels{"app.kubernetes.io/name": "zeropod-node"})) + require.Equal(t, 1, len(nodePods.Items)) + + pod := &nodePods.Items[0] + require.Eventually(t, func() bool { + if err := c.Get(ctx, objectName(pod), pod); err != nil { + return false + } + + return pod.Status.Phase == corev1.PodRunning + }, time.Minute, time.Second, "waiting for node pod to be running") + return nil } diff --git a/runc/task/service_zeropod.go b/runc/task/service_zeropod.go index 5d3a7a0..1cdfd16 100644 --- a/runc/task/service_zeropod.go +++ b/runc/task/service_zeropod.go @@ -135,8 +135,12 @@ func (w *wrapper) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI. return nil, fmt.Errorf("error creating scaled container: %w", err) } - zeropodContainer.RegisterSetContainer(func(c *runc.Container) { - w.setContainer(c) + zeropodContainer.RegisterPreRestore(func() zeropod.HandleStartedFunc { + return w.preRestore() + }) + + zeropodContainer.RegisterPostRestore(func(c *runc.Container, handleStarted zeropod.HandleStartedFunc) { + w.postRestore(c, handleStarted) }) w.zeropodContainers[r.ID] = zeropodContainer @@ -299,10 +303,25 @@ func (w *wrapper) preventExit(cp containerProcess) bool { return false } -// setContainer replaces the container in the task service. This is important +// preRestore should be called before restoring as it calls preStart in the +// task service to get the handleStarted closure. +func (w *wrapper) preRestore() zeropod.HandleStartedFunc { + handleStarted, cleanup := w.preStart(nil) + defer cleanup() + return handleStarted +} + +// postRestore replaces the container in the task service. This is important // to call after restore since the container object will have changed. -func (s *service) setContainer(container *runc.Container) { - s.mu.Lock() - s.containers[container.ID] = container - s.mu.Unlock() +// Additionally, this also calls the passed in handleStarted to make sure we +// monitor the process exits of the newly restored process. +func (w *wrapper) postRestore(container *runc.Container, handleStarted zeropod.HandleStartedFunc) { + w.mu.Lock() + p, _ := container.Process("") + w.containers[container.ID] = container + w.mu.Unlock() + + if handleStarted != nil { + handleStarted(container, p, false) + } } diff --git a/zeropod/container.go b/zeropod/container.go index 39d2ed6..64bb030 100644 --- a/zeropod/container.go +++ b/zeropod/container.go @@ -19,6 +19,8 @@ import ( "github.com/ctrox/zeropod/socket" ) +type HandleStartedFunc func(*runc.Container, process.Process, bool) + type Container struct { *runc.Container @@ -35,7 +37,8 @@ type Container struct { platform stdio.Platform tracker socket.Tracker stopMetrics context.CancelFunc - setContainer func(container *runc.Container) + preRestore func() HandleStartedFunc + postRestore func(*runc.Container, HandleStartedFunc) // mutex to lock during checkpoint/restore operations since concurrent // restores can cause cgroup confusion. This mutex is shared between all @@ -192,8 +195,12 @@ func (c *Container) Process() process.Process { return c.process } -func (c *Container) RegisterSetContainer(f func(*runc.Container)) { - c.setContainer = f +func (c *Container) RegisterPreRestore(f func() HandleStartedFunc) { + c.preRestore = f +} + +func (c *Container) RegisterPostRestore(f func(*runc.Container, HandleStartedFunc)) { + c.postRestore = f } var errNoPortsDetected = errors.New("no listening ports detected") diff --git a/zeropod/restore.go b/zeropod/restore.go index 5b5e242..54954b5 100644 --- a/zeropod/restore.go +++ b/zeropod/restore.go @@ -56,6 +56,11 @@ func (c *Container) Restore(ctx context.Context) (*runc.Container, process.Proce // the process is not yet restored. container.CgroupSet(c.cgroup) + var handleStarted HandleStartedFunc + if c.preRestore != nil { + handleStarted = c.preRestore() + } + p, err := container.Process("") if err != nil { return nil, nil, err @@ -76,8 +81,8 @@ func (c *Container) Restore(ctx context.Context) (*runc.Container, process.Proce c.Container = container c.process = p - if c.setContainer != nil { - c.setContainer(container) + if c.postRestore != nil { + c.postRestore(container, handleStarted) } // process is running again, we don't need to redirect traffic anymore