Skip to content

Commit

Permalink
Merge pull request #23 from ctrox/accept-idempotent
Browse files Browse the repository at this point in the history
fix: make restore idempotent
  • Loading branch information
ctrox authored Jun 30, 2024
2 parents b599a95 + 8751be9 commit 6bd9a42
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 29 deletions.
13 changes: 4 additions & 9 deletions activator/activator.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ type Server struct {
proxyTimeout time.Duration
proxyCancel context.CancelFunc
ns ns.NetNS
firstAccept sync.Once
maps bpfMaps
sandboxPid int
started bool
Expand Down Expand Up @@ -95,7 +94,6 @@ func (s *Server) Started() bool {
}

func (s *Server) Reset() error {
s.firstAccept = sync.Once{}
for _, port := range s.ports {
if err := s.enableRedirect(port); err != nil {
return err
Expand Down Expand Up @@ -134,7 +132,6 @@ func (s *Server) listen(ctx context.Context, port uint16, onAccept OnAccept) (in

log.G(ctx).Debugf("listening on %s in ns %s", listener.Addr(), s.ns.Path())

s.firstAccept = sync.Once{}
s.onAccept = onAccept

s.wg.Add(1)
Expand Down Expand Up @@ -213,12 +210,10 @@ func (s *Server) handleConection(ctx context.Context, conn net.Conn, port uint16
return
}

s.firstAccept.Do(func() {
if err := s.onAccept(); err != nil {
log.G(ctx).Errorf("accept function: %s", err)
return
}
})
if err := s.onAccept(); err != nil {
log.G(ctx).Errorf("accept function: %s", err)
return
}

backendConn, err := s.connect(ctx, port)
if err != nil {
Expand Down
38 changes: 20 additions & 18 deletions activator/activator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,26 +40,28 @@ func TestActivator(t *testing.T) {
fmt.Fprint(w, response)
}))

once := sync.Once{}
err = s.Start(ctx, []uint16{uint16(port)}, func() error {
// simulate a delay until our server is started
time.Sleep(time.Millisecond * 200)
l, err := net.Listen("tcp4", fmt.Sprintf(":%d", port))
require.NoError(t, err)

if err := s.DisableRedirects(); err != nil {
return fmt.Errorf("could not disable redirects: %w", err)
}

// replace listener of server
ts.Listener.Close()
ts.Listener = l
ts.Start()
t.Logf("listening on :%d", port)

t.Cleanup(func() {
ts.Close()
once.Do(func() {
// simulate a delay until our server is started
time.Sleep(time.Millisecond * 200)
l, err := net.Listen("tcp4", fmt.Sprintf(":%d", port))
require.NoError(t, err)

if err := s.DisableRedirects(); err != nil {
t.Errorf("could not disable redirects: %s", err)
}

// replace listener of server
ts.Listener.Close()
ts.Listener = l
ts.Start()
t.Logf("listening on :%d", port)

t.Cleanup(func() {
ts.Close()
})
})

return nil
})
require.NoError(t, err)
Expand Down
1 change: 0 additions & 1 deletion runc/task/service_zeropod.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,6 @@ func (w *wrapper) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*emp
os.Exit(1)
}

zeropodContainer.SetScaledDown(false)
log.G(ctx).Printf("restored process for exec: %d in %s", p.Pid(), time.Since(beforeRestore))
}

Expand Down
5 changes: 4 additions & 1 deletion zeropod/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,10 @@ func (c *Container) restoreHandler(ctx context.Context) activator.OnAccept {
beforeRestore := time.Now()
restoredContainer, p, err := c.Restore(ctx)
if err != nil {
if errors.Is(err, ErrAlreadyRestored) {
log.G(ctx).Info("container is already restored, ignoring request")
return nil
}
// restore failed, this is currently unrecoverable, so we shutdown
// our shim and let containerd recreate it.
log.G(ctx).Fatalf("error restoring container, exiting shim: %s", err)
Expand All @@ -301,7 +305,6 @@ func (c *Container) restoreHandler(ctx context.Context) activator.OnAccept {
return fmt.Errorf("unable to track pid %d: %w", p.Pid(), err)
}

c.SetScaledDown(false)
log.G(ctx).Printf("restored process: %d in %s", p.Pid(), time.Since(beforeRestore))

return c.ScheduleScaleDown()
Expand Down
7 changes: 7 additions & 0 deletions zeropod/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package zeropod

import (
"context"
"errors"
"fmt"
"io"
"os"
Expand All @@ -19,9 +20,14 @@ import (
"github.com/containerd/log"
)

var ErrAlreadyRestored = errors.New("container is already restored")

func (c *Container) Restore(ctx context.Context) (*runc.Container, process.Process, error) {
c.checkpointRestore.Lock()
defer c.checkpointRestore.Unlock()
if !c.ScaledDown() {
return nil, nil, ErrAlreadyRestored
}

beforeRestore := time.Now()
go func() {
Expand Down Expand Up @@ -80,6 +86,7 @@ func (c *Container) Restore(ctx context.Context) (*runc.Container, process.Proce

c.Container = container
c.process = p
c.SetScaledDown(false)

if c.postRestore != nil {
c.postRestore(container, handleStarted)
Expand Down

0 comments on commit 6bd9a42

Please sign in to comment.