Skip to content

Commit

Permalink
feat: add status subscription service
Browse files Browse the repository at this point in the history
the shim ttrpc service now exposes a SubscribeStatus function which the
manager subscribes to for receiving zeropod status updates, such as when
checkpointing and restoring.
  • Loading branch information
ctrox committed Jun 1, 2024
1 parent 5a18258 commit 9b6091c
Show file tree
Hide file tree
Showing 8 changed files with 631 additions and 68 deletions.
409 changes: 354 additions & 55 deletions api/shim/v1/shim.pb.go

Large diffs are not rendered by default.

27 changes: 25 additions & 2 deletions api/shim/v1/shim.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,35 @@ import "io/prometheus/client/metrics.proto";

service Shim {
rpc Metrics(MetricsRequest) returns (MetricsResponse);
rpc GetStatus(ContainerRequest) returns (ContainerStatus);
rpc SubscribeStatus(SubscribeStatusRequest) returns (stream ContainerStatus);
}

message MetricsRequest {
google.protobuf.Empty empty = 1;
}

message SubscribeStatusRequest {
google.protobuf.Empty empty = 1;
}

message MetricsResponse {
repeated io.prometheus.client.MetricFamily metrics = 1;
}

message MetricsRequest {
google.protobuf.Empty empty = 1;
message ContainerRequest {
string id = 1;
}

enum ContainerPhase {
SCALED_DOWN = 0;
RUNNING = 1;
}

message ContainerStatus {
string id = 1;
string name = 2;
string pod_name = 3;
string pod_namespace = 4;
ContainerPhase phase = 5;
}
80 changes: 79 additions & 1 deletion api/shim/v1/shim_ttrpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ func main() {
os.Exit(1)
}

if err := manager.StartSubscribers(ctx); err != nil {
slog.Error("starting subscribers", "err", err)
os.Exit(1)
}

server := &http.Server{Addr: *metricsAddr}
http.HandleFunc("/metrics", manager.Handler)

Expand Down
101 changes: 101 additions & 0 deletions manager/status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package manager

import (
"context"
"errors"
"fmt"
"io"
"log/slog"
"net"
"os"
"path/filepath"

"github.com/containerd/ttrpc"
v1 "github.com/ctrox/zeropod/api/shim/v1"
"github.com/ctrox/zeropod/runc/task"
"github.com/fsnotify/fsnotify"
"google.golang.org/protobuf/types/known/emptypb"
)

func StartSubscribers(ctx context.Context) error {
socks, err := os.ReadDir(task.ShimSocketPath)
if err != nil {
return fmt.Errorf("error listing file in shim socket path: %s", err)
}

for _, sock := range socks {
sock := sock
go func() {
if err := subscribe(ctx, filepath.Join(task.ShimSocketPath, sock.Name())); err != nil {
slog.Error("error subscribing", "sock", sock.Name(), "err", err)
}
}()
}

go watchForShims(ctx)

return nil
}

func subscribe(ctx context.Context, sock string) error {
log := slog.With("sock", sock)
log.Info("subscribing to status events")

conn, err := net.Dial("unix", sock)
if err != nil {
return err
}

shimClient := v1.NewShimClient(ttrpc.NewClient(conn))
// not sure why but the emptypb needs to be set in order for the subscribe
// to be received
client, err := shimClient.SubscribeStatus(ctx, &v1.SubscribeStatusRequest{Empty: &emptypb.Empty{}})
if err != nil {
return err
}

for {
status, err := client.Recv()
if err != nil {
if err == io.EOF || errors.Is(err, ttrpc.ErrClosed) {
log.Info("subscribe closed")
} else {
log.Error("subscribe closed", "err", err)
}
break
}
slog.Info("received status",
"container", status.Name, "pod", status.PodName,
"namespace", status.PodNamespace, "phase", status.Phase)
}

return nil
}

func watchForShims(ctx context.Context) error {
watcher, err := fsnotify.NewWatcher()
if err != nil {
return err
}
defer watcher.Close()

if err := watcher.Add(task.ShimSocketPath); err != nil {
return err
}

for {
select {
case event := <-watcher.Events:
switch event.Op {
case fsnotify.Create:
if err := subscribe(ctx, event.Name); err != nil {
slog.Error("error subscribing", "sock", event.Name, "err", err)
}
}
case err := <-watcher.Errors:
slog.Error("watch error", "err", err)
case <-ctx.Done():
return nil
}
}
}
9 changes: 6 additions & 3 deletions runc/task/service_zeropod.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"
"time"

v1 "github.com/ctrox/zeropod/api/shim/v1"
"github.com/ctrox/zeropod/zeropod"
"google.golang.org/protobuf/types/known/emptypb"

Expand Down Expand Up @@ -58,8 +59,9 @@ func NewZeropodService(ctx context.Context, publisher shim.Publisher, sd shutdow
}
w := &wrapper{
service: s,
zeropodContainers: make(map[string]*zeropod.Container),
checkpointRestore: sync.Mutex{},
zeropodContainers: make(map[string]*zeropod.Container),
zeropodEvents: make(chan *v1.ContainerStatus, 128),
}
go w.processExits()
runcC.Monitor = reaper.Default
Expand All @@ -80,7 +82,7 @@ func NewZeropodService(ctx context.Context, publisher shim.Publisher, sd shutdow
return shim.RemoveSocket(address)
})

go startShimServer(ctx, filepath.Base(address))
go startShimServer(ctx, filepath.Base(address), w.zeropodEvents)

return w, nil
}
Expand All @@ -91,6 +93,7 @@ type wrapper struct {
mut sync.Mutex
checkpointRestore sync.Mutex
zeropodContainers map[string]*zeropod.Container
zeropodEvents chan *v1.ContainerStatus
}

func (w *wrapper) RegisterTTRPC(server *ttrpc.Server) error {
Expand Down Expand Up @@ -136,7 +139,7 @@ func (w *wrapper) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.

log.G(ctx).Infof("creating zeropod container: %s", cfg.ContainerName)

zeropodContainer, err := zeropod.New(w.context, cfg, &w.checkpointRestore, container, w.platform)
zeropodContainer, err := zeropod.New(w.context, cfg, &w.checkpointRestore, container, w.platform, w.zeropodEvents)
if err != nil {
return nil, fmt.Errorf("error creating scaled container: %w", err)
}
Expand Down
39 changes: 33 additions & 6 deletions runc/task/shim.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ func shimSocketAddress(id string) string {
return fmt.Sprintf("unix://%s.sock", filepath.Join(ShimSocketPath, id))
}

func startShimServer(ctx context.Context, id string) {
func startShimServer(ctx context.Context, id string, events chan *v1.ContainerStatus) {
socket := shimSocketAddress(id)
listener, err := shim.NewSocket(socket)
if err != nil {
if !shim.SocketEaddrinuse(err) {
log.G(ctx).WithError(err)
log.G(ctx).WithError(err).Error("listening to socket")
return
}

Expand All @@ -35,7 +35,7 @@ func startShimServer(ctx context.Context, id string) {
}

if err := shim.RemoveSocket(socket); err != nil {
log.G(ctx).WithError(fmt.Errorf("remove pre-existing socket: %w", err))
log.G(ctx).WithError(err).Error("remove pre-existing socket")
}

listener, err = shim.NewSocket(socket)
Expand All @@ -57,7 +57,8 @@ func startShimServer(ctx context.Context, id string) {
return
}
defer s.Close()
v1.RegisterShimService(s, &shimService{metrics: zeropod.NewRegistry()})

v1.RegisterShimService(s, &shimService{metrics: zeropod.NewRegistry(), events: events})

defer func() {
listener.Close()
Expand All @@ -67,7 +68,7 @@ func startShimServer(ctx context.Context, id string) {

<-ctx.Done()

log.G(ctx).Info("stopping metrics server")
log.G(ctx).Info("stopping shim server")
listener.Close()
s.Close()
_ = os.RemoveAll(socket)
Expand All @@ -77,9 +78,35 @@ func startShimServer(ctx context.Context, id string) {
// zeropod-specific functions like metrics.
type shimService struct {
metrics *prometheus.Registry
task wrapper
events chan *v1.ContainerStatus
}

// SubscribeStatus watches for shim events.
func (s *shimService) SubscribeStatus(ctx context.Context, _ *v1.SubscribeStatusRequest, srv v1.Shim_SubscribeStatusServer) error {
for {
select {
case msg := <-s.events:
if err := srv.Send(msg); err != nil {
log.G(ctx).Errorf("unable to send event message: %s", err)
}
case <-ctx.Done():
return nil
}
}
}

// GetStatus returns the status of a zeropod container.
func (s *shimService) GetStatus(ctx context.Context, req *v1.ContainerRequest) (*v1.ContainerStatus, error) {
container, ok := s.task.zeropodContainers[req.Id]
if !ok {
return nil, fmt.Errorf("could not find zeropod container with id: %s", req.Id)
}

return container.Status(), nil
}

// Metrics implements v1.ShimService.
// Metrics returns metrics of the zeropod shim instance.
func (s *shimService) Metrics(context.Context, *v1.MetricsRequest) (*v1.MetricsResponse, error) {
mfs, err := s.metrics.Gather()
return &v1.MetricsResponse{Metrics: mfs}, err
Expand Down
Loading

0 comments on commit 9b6091c

Please sign in to comment.