Skip to content

Commit

Permalink
feat: implement bpf-steered activator
Browse files Browse the repository at this point in the history
The old activator had several problems:

* it was listening on the same port as the container which made things
  way more difficult than they need to be. Even with the "network
  locking" there were cases where clients would get a connection refused
  as at some point the socket needs to be closed/reopened.
* the network lock had another side effect that it would trigger TCP
  retransmits which delayed some requests by a whole second.

While the new activator is not fully realised in eBPF, it's way more
reliable as we can simply steer traffic without any interruptions just
with a few maps. Essentially activation now works like this:

1. container is in checkpointed state.
2. incoming packet destined to container.
3. eBPF program redirects packet to userspace TCP proxy listening on
   random free port.
4. proxy accepts TCP session and triggers restore of container.
5. proxy connects to container as soon as it's running.
6. proxy shuffles data back and forth for this TCP session and all other
   connections that were established while the container was restoring.
7. write to eBPF map to indicate it no longer needs to redirect to
   proxy.
8. traffic flows to container directly as usual without going through the
   proxy for as long as it's alive.
9. on checkpoint the redirect is enabled again.

It still only needs to proxy the requests during restore while having a
more reliable activator that never drops a packet. The current
implementation is using TC as it allows to modify ingress and egress
packets. A full eBPF solution has been experimented with but the main
issue is that we need to "hold back" packets while the container is
being restored without dropping them. As soon as the initial TCP SYN is
dropped, the client will wait 1 second for retransmitting and make
everything quite slow. I was unable to find a solution for this as of
now so instead the userspace proxy is still required.
  • Loading branch information
ctrox committed Jan 2, 2024
1 parent 7ddf3c7 commit cf2c247
Show file tree
Hide file tree
Showing 20 changed files with 704 additions and 581 deletions.
182 changes: 87 additions & 95 deletions activator/activator.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"syscall"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/containerd/containerd/log"
"github.com/containernetworking/plugins/pkg/ns"
)
Expand All @@ -21,76 +20,113 @@ type Server struct {
quit chan interface{}
wg sync.WaitGroup
onAccept OnAccept
onClosed OnClosed
connectTimeout time.Duration
proxyTimeout time.Duration
proxyCancel context.CancelFunc
ns ns.NetNS
once sync.Once
Network NetworkLocker
firstAccept sync.Once
bpfCloseFunc func()
bpfObjs *bpfObjects
}

type OnAccept func() error
type OnClosed func() error

func NewServer(ctx context.Context, ports []uint16, ns ns.NetNS, locker NetworkLocker) (*Server, error) {
func NewServer(ctx context.Context, ports []uint16, nn ns.NetNS) (*Server, error) {
s := &Server{
quit: make(chan interface{}),
ports: ports,
connectTimeout: time.Second * 5,
proxyTimeout: time.Second * 5,
ns: ns,
Network: locker,
ns: nn,
}

if err := nn.Do(func(_ ns.NetNS) error {
// TODO: is this really always eth0?
// we need loopback for port-forwarding to work
objs, close, err := initBPF("lo", "eth0")
if err != nil {
return err
}
s.bpfObjs = objs
s.bpfCloseFunc = close
return nil
}); err != nil {
return nil, err
}

return s, nil
}

func (s *Server) Start(ctx context.Context, onAccept OnAccept, onClosed OnClosed) error {
func (s *Server) Start(ctx context.Context, onAccept OnAccept) error {
for _, port := range s.ports {
proxyPort, err := s.listen(ctx, port, onAccept)
if err != nil {
return err
}

log.G(ctx).Infof("redirecting port %d -> %d", port, proxyPort)
if err := s.RedirectPort(port, uint16(proxyPort)); err != nil {
return fmt.Errorf("redirecting port: %w", err)
}
}

return nil
}

func (s *Server) Reset() error {
s.firstAccept = sync.Once{}
for _, port := range s.ports {
if err := s.listen(ctx, port, onAccept, onClosed); err != nil {
if err := s.enableRedirect(port); err != nil {
return err
}
}
return nil
}

func (s *Server) DisableRedirects() error {
for _, port := range s.ports {
if err := s.disableRedirect(port); err != nil {
return err
}
}
return nil
}

func (s *Server) listen(ctx context.Context, port uint16, onAccept OnAccept, onClosed OnClosed) error {
addr := fmt.Sprintf("0.0.0.0:%d", port)
func (s *Server) listen(ctx context.Context, port uint16, onAccept OnAccept) (int, error) {
// use a random free port for our proxy
addr := "0.0.0.0:0"
cfg := net.ListenConfig{}

// for some reason, sometimes the address will still be in use after
// checkpointing, so we wrap the listen in a retry.
var listener net.Listener
if err := backoff.Retry(
func() error {
// make sure to run the listener in our target namespace
return s.ns.Do(func(_ ns.NetNS) error {
l, err := cfg.Listen(ctx, "tcp4", addr)
if err != nil {
return fmt.Errorf("unable to listen: %w", err)
}
if err := s.ns.Do(func(_ ns.NetNS) error {
l, err := cfg.Listen(ctx, "tcp4", addr)
if err != nil {
return fmt.Errorf("unable to listen: %w", err)
}

listener = l
s.listeners = append(s.listeners, l)
return nil
})
},
newBackOff(),
); err != nil {
return err
listener = l
s.listeners = append(s.listeners, l)
return nil
}); err != nil {
return 0, err
}

log.G(ctx).Infof("listening on %s in ns %s", addr, s.ns.Path())
log.G(ctx).Infof("listening on %s in ns %s", listener.Addr(), s.ns.Path())

s.once = sync.Once{}
s.firstAccept = sync.Once{}
s.onAccept = onAccept
s.onClosed = onClosed

s.wg.Add(1)
go s.serve(ctx, listener, port)
return nil

tcpAddr, ok := listener.Addr().(*net.TCPAddr)
if !ok {
return 0, fmt.Errorf("unable to get TCP Addr from remote addr: %T", listener.Addr())
}

return tcpAddr.Port, nil
}

func (s *Server) Stop(ctx context.Context) {
Expand All @@ -104,6 +140,8 @@ func (s *Server) Stop(ctx context.Context) {
l.Close()
}

s.bpfCloseFunc()

s.wg.Wait()
log.G(ctx).Info("activator stopped")
}
Expand Down Expand Up @@ -140,61 +178,35 @@ func (s *Server) serve(ctx context.Context, listener net.Listener, port uint16)
}

func (s *Server) handleConection(ctx context.Context, conn net.Conn, port uint16) {
// we close our listener after accepting the first connection so it's free
// to use for the to-be-activated program.

// TODO: there is still a small chance a TCP connection ends up timing out
// as there might be a backlog of already established connections that are
// killed when we close the listener. Not sure if it's even possible to
// fix that. It's reproducible by running TestActivator a bunch of times
// (something like -count=50).
defer conn.Close()

var err error
listenerClosed := false
s.once.Do(func() {
// we lock the network, close the listener, call onAccept and unlock only for
// the first connection we get.
beforeLock := time.Now()
if err := s.Network.Lock([]uint16{port}); err != nil {
log.G(ctx).Errorf("error locking network: %s", err)
return
}
log.G(ctx).Printf("took %s to lock network", time.Since(beforeLock))

var closeErr error
for _, listener := range s.listeners {
closeErr = listener.Close()
}
if closeErr != nil {
log.G(ctx).Errorf("error during listener close: %s", closeErr)
}
listenerClosed = true

s.firstAccept.Do(func() {
if err := s.onAccept(); err != nil {
log.G(ctx).Errorf("error during onAccept: %s", err)
log.G(ctx).Errorf("accept function: %s", err)
return
}

beforeUnlock := time.Now()
if err := s.Network.Unlock([]uint16{port}); err != nil {
log.G(ctx).Errorf("error unlocking network: %s", err)
return
}
log.G(ctx).Printf("took %s to unlock network", time.Since(beforeUnlock))
})

log.G(ctx).Println("proxying initial connection to program", conn.RemoteAddr().String())
tcpAddr, ok := conn.RemoteAddr().(*net.TCPAddr)
if !ok {
log.G(ctx).Errorf("unable to get TCP Addr from remote addr: %T", conn.RemoteAddr())
return
}

log.G(ctx).Infof("registering connection on port %d", tcpAddr.Port)
if err := s.registerConnection(uint16(tcpAddr.Port)); err != nil {
log.G(ctx).Errorf("error registering fade out port: %s", err)
}

log.G(ctx).Printf("proxying connection to program at localhost:%d", port)

// fork is done but we need to finish up the initial connection. We do
// this by connecting to our forked process and piping the tcpConn that we
// initially accpted.
initialConn, err := s.connect(ctx, port)
if err != nil {
log.G(ctx).Errorf("error establishing initial connection: %s", err)
log.G(ctx).Errorf("error establishing connection: %s", err)
return
}
defer initialConn.Close()

log.G(ctx).Println("dial succeeded", initialConn.RemoteAddr().String())

requestContext, cancel := context.WithTimeout(ctx, s.proxyTimeout)
Expand All @@ -204,13 +216,7 @@ func (s *Server) handleConection(ctx context.Context, conn net.Conn, port uint16
log.G(ctx).Errorf("error proxying request: %s", err)
}

log.G(ctx).Println("initial connection closed", conn.RemoteAddr().String())

if listenerClosed {
if err := s.onClosed(); err != nil {
log.G(ctx).Error(err)
}
}
log.G(ctx).Println("connection closed", conn.RemoteAddr().String())
}

func (s *Server) connect(ctx context.Context, port uint16) (net.Conn, error) {
Expand Down Expand Up @@ -275,17 +281,3 @@ func copy(done chan struct{}, errors chan error, dst io.Writer, src io.Reader) {
errors <- err
}
}

func newBackOff() *backoff.ExponentialBackOff {
b := &backoff.ExponentialBackOff{
InitialInterval: time.Millisecond,
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
MaxInterval: 100 * time.Millisecond,
MaxElapsedTime: 300 * time.Millisecond,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}
b.Reset()
return b
}
Loading

0 comments on commit cf2c247

Please sign in to comment.