diff --git a/e2e/e2e_test.go b/e2e/e2e_test.go index ddcb1e1..22066e7 100644 --- a/e2e/e2e_test.go +++ b/e2e/e2e_test.go @@ -179,6 +179,23 @@ func TestE2E(t *testing.T) { assert.GreaterOrEqual(t, restoreCount(t, client, cfg, pod), 2, "pod should have been restored 2 times") }) + t.Run("delete in restored state", func(t *testing.T) { + // as we want to delete the pod when it is in a restored state, we + // first need to make sure it has checkpointed at least once. We give + // it 2 seconds to checkpoint initially and wait 5 seconds to ensure + // it has finished checkpointing. + pod := testPod(scaleDownAfter(time.Second * 2)) + cleanupPod := createPodAndWait(t, ctx, client, pod) + defer cleanupPod() + + time.Sleep(time.Second * 5) + stdout, stderr, err := podExec(cfg, pod, "date") + require.NoError(t, err) + t.Log(stdout, stderr) + // since the cleanup has been deferred it's called right after the + // exec and should test the deletion in the restored state. + }) + t.Run("metrics", func(t *testing.T) { // create two pods to test metric merging runningPod := testPod(scaleDownAfter(time.Hour)) diff --git a/e2e/setup_test.go b/e2e/setup_test.go index a7e3aa1..d9b113a 100644 --- a/e2e/setup_test.go +++ b/e2e/setup_test.go @@ -143,7 +143,7 @@ func startKind(t testing.TB, name string, port int) (c *rest.Config, err error) }, }}, }), - cluster.CreateWithNodeImage("kindest/node:v1.26.3"), + cluster.CreateWithNodeImage("kindest/node:v1.29.2"), cluster.CreateWithRetain(false), cluster.CreateWithKubeconfigPath(f.Name()), cluster.CreateWithWaitForReady(time.Minute*2), @@ -275,6 +275,20 @@ func deployNode(t testing.TB, ctx context.Context, c client.Client) error { return fmt.Errorf("runtimeClass not found") } + // wait for node pod to be running + nodePods := &corev1.PodList{} + require.NoError(t, c.List(ctx, nodePods, client.MatchingLabels{"app.kubernetes.io/name": "zeropod-node"})) + require.Equal(t, 1, len(nodePods.Items)) + + pod := &nodePods.Items[0] + require.Eventually(t, func() bool { + if err := c.Get(ctx, objectName(pod), pod); err != nil { + return false + } + + return pod.Status.Phase == corev1.PodRunning + }, time.Minute, time.Second, "waiting for node pod to be running") + return nil } diff --git a/runc/task/service_zeropod.go b/runc/task/service_zeropod.go index 5d3a7a0..1cdfd16 100644 --- a/runc/task/service_zeropod.go +++ b/runc/task/service_zeropod.go @@ -135,8 +135,12 @@ func (w *wrapper) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI. return nil, fmt.Errorf("error creating scaled container: %w", err) } - zeropodContainer.RegisterSetContainer(func(c *runc.Container) { - w.setContainer(c) + zeropodContainer.RegisterPreRestore(func() zeropod.HandleStartedFunc { + return w.preRestore() + }) + + zeropodContainer.RegisterPostRestore(func(c *runc.Container, handleStarted zeropod.HandleStartedFunc) { + w.postRestore(c, handleStarted) }) w.zeropodContainers[r.ID] = zeropodContainer @@ -299,10 +303,25 @@ func (w *wrapper) preventExit(cp containerProcess) bool { return false } -// setContainer replaces the container in the task service. This is important +// preRestore should be called before restoring as it calls preStart in the +// task service to get the handleStarted closure. +func (w *wrapper) preRestore() zeropod.HandleStartedFunc { + handleStarted, cleanup := w.preStart(nil) + defer cleanup() + return handleStarted +} + +// postRestore replaces the container in the task service. This is important // to call after restore since the container object will have changed. -func (s *service) setContainer(container *runc.Container) { - s.mu.Lock() - s.containers[container.ID] = container - s.mu.Unlock() +// Additionally, this also calls the passed in handleStarted to make sure we +// monitor the process exits of the newly restored process. +func (w *wrapper) postRestore(container *runc.Container, handleStarted zeropod.HandleStartedFunc) { + w.mu.Lock() + p, _ := container.Process("") + w.containers[container.ID] = container + w.mu.Unlock() + + if handleStarted != nil { + handleStarted(container, p, false) + } } diff --git a/zeropod/container.go b/zeropod/container.go index 39d2ed6..64bb030 100644 --- a/zeropod/container.go +++ b/zeropod/container.go @@ -19,6 +19,8 @@ import ( "github.com/ctrox/zeropod/socket" ) +type HandleStartedFunc func(*runc.Container, process.Process, bool) + type Container struct { *runc.Container @@ -35,7 +37,8 @@ type Container struct { platform stdio.Platform tracker socket.Tracker stopMetrics context.CancelFunc - setContainer func(container *runc.Container) + preRestore func() HandleStartedFunc + postRestore func(*runc.Container, HandleStartedFunc) // mutex to lock during checkpoint/restore operations since concurrent // restores can cause cgroup confusion. This mutex is shared between all @@ -192,8 +195,12 @@ func (c *Container) Process() process.Process { return c.process } -func (c *Container) RegisterSetContainer(f func(*runc.Container)) { - c.setContainer = f +func (c *Container) RegisterPreRestore(f func() HandleStartedFunc) { + c.preRestore = f +} + +func (c *Container) RegisterPostRestore(f func(*runc.Container, HandleStartedFunc)) { + c.postRestore = f } var errNoPortsDetected = errors.New("no listening ports detected") diff --git a/zeropod/restore.go b/zeropod/restore.go index 5b5e242..54954b5 100644 --- a/zeropod/restore.go +++ b/zeropod/restore.go @@ -56,6 +56,11 @@ func (c *Container) Restore(ctx context.Context) (*runc.Container, process.Proce // the process is not yet restored. container.CgroupSet(c.cgroup) + var handleStarted HandleStartedFunc + if c.preRestore != nil { + handleStarted = c.preRestore() + } + p, err := container.Process("") if err != nil { return nil, nil, err @@ -76,8 +81,8 @@ func (c *Container) Restore(ctx context.Context) (*runc.Container, process.Proce c.Container = container c.process = p - if c.setContainer != nil { - c.setContainer(container) + if c.postRestore != nil { + c.postRestore(container, handleStarted) } // process is running again, we don't need to redirect traffic anymore