From 9b6091c696c3daf69978cf98e1c648eb9adb9245 Mon Sep 17 00:00:00 2001 From: Cyrill Troxler Date: Mon, 13 May 2024 20:50:26 +0200 Subject: [PATCH] feat: add status subscription service the shim ttrpc service now exposes a SubscribeStatus function which the manager subscribes to for receiving zeropod status updates, such as when checkpointing and restoring. --- api/shim/v1/shim.pb.go | 409 ++++++++++++++++++++++++++++++----- api/shim/v1/shim.proto | 27 ++- api/shim/v1/shim_ttrpc.pb.go | 80 ++++++- cmd/manager/main.go | 5 + manager/status.go | 101 +++++++++ runc/task/service_zeropod.go | 9 +- runc/task/shim.go | 39 +++- zeropod/container.go | 29 ++- 8 files changed, 631 insertions(+), 68 deletions(-) create mode 100644 manager/status.go diff --git a/api/shim/v1/shim.pb.go b/api/shim/v1/shim.pb.go index c6ffd17..a60b8af 100644 --- a/api/shim/v1/shim.pb.go +++ b/api/shim/v1/shim.pb.go @@ -22,6 +22,146 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) +type ContainerPhase int32 + +const ( + ContainerPhase_SCALED_DOWN ContainerPhase = 0 + ContainerPhase_RUNNING ContainerPhase = 1 +) + +// Enum value maps for ContainerPhase. +var ( + ContainerPhase_name = map[int32]string{ + 0: "SCALED_DOWN", + 1: "RUNNING", + } + ContainerPhase_value = map[string]int32{ + "SCALED_DOWN": 0, + "RUNNING": 1, + } +) + +func (x ContainerPhase) Enum() *ContainerPhase { + p := new(ContainerPhase) + *p = x + return p +} + +func (x ContainerPhase) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (ContainerPhase) Descriptor() protoreflect.EnumDescriptor { + return file_shim_proto_enumTypes[0].Descriptor() +} + +func (ContainerPhase) Type() protoreflect.EnumType { + return &file_shim_proto_enumTypes[0] +} + +func (x ContainerPhase) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use ContainerPhase.Descriptor instead. +func (ContainerPhase) EnumDescriptor() ([]byte, []int) { + return file_shim_proto_rawDescGZIP(), []int{0} +} + +type MetricsRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Empty *emptypb.Empty `protobuf:"bytes,1,opt,name=empty,proto3" json:"empty,omitempty"` +} + +func (x *MetricsRequest) Reset() { + *x = MetricsRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_shim_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *MetricsRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*MetricsRequest) ProtoMessage() {} + +func (x *MetricsRequest) ProtoReflect() protoreflect.Message { + mi := &file_shim_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use MetricsRequest.ProtoReflect.Descriptor instead. +func (*MetricsRequest) Descriptor() ([]byte, []int) { + return file_shim_proto_rawDescGZIP(), []int{0} +} + +func (x *MetricsRequest) GetEmpty() *emptypb.Empty { + if x != nil { + return x.Empty + } + return nil +} + +type SubscribeStatusRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Empty *emptypb.Empty `protobuf:"bytes,1,opt,name=empty,proto3" json:"empty,omitempty"` +} + +func (x *SubscribeStatusRequest) Reset() { + *x = SubscribeStatusRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_shim_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SubscribeStatusRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SubscribeStatusRequest) ProtoMessage() {} + +func (x *SubscribeStatusRequest) ProtoReflect() protoreflect.Message { + mi := &file_shim_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SubscribeStatusRequest.ProtoReflect.Descriptor instead. +func (*SubscribeStatusRequest) Descriptor() ([]byte, []int) { + return file_shim_proto_rawDescGZIP(), []int{1} +} + +func (x *SubscribeStatusRequest) GetEmpty() *emptypb.Empty { + if x != nil { + return x.Empty + } + return nil +} + type MetricsResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -33,7 +173,7 @@ type MetricsResponse struct { func (x *MetricsResponse) Reset() { *x = MetricsResponse{} if protoimpl.UnsafeEnabled { - mi := &file_shim_proto_msgTypes[0] + mi := &file_shim_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -46,7 +186,7 @@ func (x *MetricsResponse) String() string { func (*MetricsResponse) ProtoMessage() {} func (x *MetricsResponse) ProtoReflect() protoreflect.Message { - mi := &file_shim_proto_msgTypes[0] + mi := &file_shim_proto_msgTypes[2] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -59,7 +199,7 @@ func (x *MetricsResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use MetricsResponse.ProtoReflect.Descriptor instead. func (*MetricsResponse) Descriptor() ([]byte, []int) { - return file_shim_proto_rawDescGZIP(), []int{0} + return file_shim_proto_rawDescGZIP(), []int{2} } func (x *MetricsResponse) GetMetrics() []*_go.MetricFamily { @@ -69,31 +209,31 @@ func (x *MetricsResponse) GetMetrics() []*_go.MetricFamily { return nil } -type MetricsRequest struct { +type ContainerRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Empty *emptypb.Empty `protobuf:"bytes,1,opt,name=empty,proto3" json:"empty,omitempty"` + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` } -func (x *MetricsRequest) Reset() { - *x = MetricsRequest{} +func (x *ContainerRequest) Reset() { + *x = ContainerRequest{} if protoimpl.UnsafeEnabled { - mi := &file_shim_proto_msgTypes[1] + mi := &file_shim_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } } -func (x *MetricsRequest) String() string { +func (x *ContainerRequest) String() string { return protoimpl.X.MessageStringOf(x) } -func (*MetricsRequest) ProtoMessage() {} +func (*ContainerRequest) ProtoMessage() {} -func (x *MetricsRequest) ProtoReflect() protoreflect.Message { - mi := &file_shim_proto_msgTypes[1] +func (x *ContainerRequest) ProtoReflect() protoreflect.Message { + mi := &file_shim_proto_msgTypes[3] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -104,16 +244,95 @@ func (x *MetricsRequest) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use MetricsRequest.ProtoReflect.Descriptor instead. -func (*MetricsRequest) Descriptor() ([]byte, []int) { - return file_shim_proto_rawDescGZIP(), []int{1} +// Deprecated: Use ContainerRequest.ProtoReflect.Descriptor instead. +func (*ContainerRequest) Descriptor() ([]byte, []int) { + return file_shim_proto_rawDescGZIP(), []int{3} } -func (x *MetricsRequest) GetEmpty() *emptypb.Empty { +func (x *ContainerRequest) GetId() string { if x != nil { - return x.Empty + return x.Id } - return nil + return "" +} + +type ContainerStatus struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"` + PodName string `protobuf:"bytes,3,opt,name=pod_name,json=podName,proto3" json:"pod_name,omitempty"` + PodNamespace string `protobuf:"bytes,4,opt,name=pod_namespace,json=podNamespace,proto3" json:"pod_namespace,omitempty"` + Phase ContainerPhase `protobuf:"varint,5,opt,name=phase,proto3,enum=zeropod.shim.v1.ContainerPhase" json:"phase,omitempty"` +} + +func (x *ContainerStatus) Reset() { + *x = ContainerStatus{} + if protoimpl.UnsafeEnabled { + mi := &file_shim_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ContainerStatus) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ContainerStatus) ProtoMessage() {} + +func (x *ContainerStatus) ProtoReflect() protoreflect.Message { + mi := &file_shim_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ContainerStatus.ProtoReflect.Descriptor instead. +func (*ContainerStatus) Descriptor() ([]byte, []int) { + return file_shim_proto_rawDescGZIP(), []int{4} +} + +func (x *ContainerStatus) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +func (x *ContainerStatus) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *ContainerStatus) GetPodName() string { + if x != nil { + return x.PodName + } + return "" +} + +func (x *ContainerStatus) GetPodNamespace() string { + if x != nil { + return x.PodNamespace + } + return "" +} + +func (x *ContainerStatus) GetPhase() ContainerPhase { + if x != nil { + return x.Phase + } + return ContainerPhase_SCALED_DOWN } var File_shim_proto protoreflect.FileDescriptor @@ -124,25 +343,57 @@ var file_shim_proto_rawDesc = []byte{ 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x22, 0x69, 0x6f, 0x2f, 0x70, 0x72, 0x6f, 0x6d, 0x65, 0x74, 0x68, 0x65, 0x75, 0x73, 0x2f, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, - 0x2f, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x4f, - 0x0a, 0x0f, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x12, 0x3c, 0x0a, 0x07, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x18, 0x01, 0x20, 0x03, - 0x28, 0x0b, 0x32, 0x22, 0x2e, 0x69, 0x6f, 0x2e, 0x70, 0x72, 0x6f, 0x6d, 0x65, 0x74, 0x68, 0x65, - 0x75, 0x73, 0x2e, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x2e, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, - 0x46, 0x61, 0x6d, 0x69, 0x6c, 0x79, 0x52, 0x07, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x22, - 0x3e, 0x0a, 0x0e, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x12, 0x2c, 0x0a, 0x05, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, - 0x32, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, - 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x52, 0x05, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x32, - 0x54, 0x0a, 0x04, 0x53, 0x68, 0x69, 0x6d, 0x12, 0x4c, 0x0a, 0x07, 0x4d, 0x65, 0x74, 0x72, 0x69, - 0x63, 0x73, 0x12, 0x1f, 0x2e, 0x7a, 0x65, 0x72, 0x6f, 0x70, 0x6f, 0x64, 0x2e, 0x73, 0x68, 0x69, - 0x6d, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x1a, 0x20, 0x2e, 0x7a, 0x65, 0x72, 0x6f, 0x70, 0x6f, 0x64, 0x2e, 0x73, 0x68, - 0x69, 0x6d, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x2a, 0x5a, 0x28, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, - 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x74, 0x72, 0x6f, 0x78, 0x2f, 0x7a, 0x65, 0x72, 0x6f, 0x70, 0x6f, - 0x64, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x68, 0x69, 0x6d, 0x2f, 0x76, 0x31, 0x2f, 0x3b, 0x76, - 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x2f, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x3e, + 0x0a, 0x0e, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x12, 0x2c, 0x0a, 0x05, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, + 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, + 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x52, 0x05, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x46, + 0x0a, 0x16, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x53, 0x74, 0x61, 0x74, 0x75, + 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2c, 0x0a, 0x05, 0x65, 0x6d, 0x70, 0x74, + 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x52, + 0x05, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x4f, 0x0a, 0x0f, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, + 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3c, 0x0a, 0x07, 0x6d, 0x65, 0x74, + 0x72, 0x69, 0x63, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x22, 0x2e, 0x69, 0x6f, 0x2e, + 0x70, 0x72, 0x6f, 0x6d, 0x65, 0x74, 0x68, 0x65, 0x75, 0x73, 0x2e, 0x63, 0x6c, 0x69, 0x65, 0x6e, + 0x74, 0x2e, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x46, 0x61, 0x6d, 0x69, 0x6c, 0x79, 0x52, 0x07, + 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x22, 0x22, 0x0a, 0x10, 0x43, 0x6f, 0x6e, 0x74, 0x61, + 0x69, 0x6e, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, + 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x22, 0xac, 0x01, 0x0a, 0x0f, + 0x43, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, + 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, + 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, + 0x61, 0x6d, 0x65, 0x12, 0x19, 0x0a, 0x08, 0x70, 0x6f, 0x64, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, + 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x70, 0x6f, 0x64, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x23, + 0x0a, 0x0d, 0x70, 0x6f, 0x64, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x18, + 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x70, 0x6f, 0x64, 0x4e, 0x61, 0x6d, 0x65, 0x73, 0x70, + 0x61, 0x63, 0x65, 0x12, 0x35, 0x0a, 0x05, 0x70, 0x68, 0x61, 0x73, 0x65, 0x18, 0x05, 0x20, 0x01, + 0x28, 0x0e, 0x32, 0x1f, 0x2e, 0x7a, 0x65, 0x72, 0x6f, 0x70, 0x6f, 0x64, 0x2e, 0x73, 0x68, 0x69, + 0x6d, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x50, 0x68, + 0x61, 0x73, 0x65, 0x52, 0x05, 0x70, 0x68, 0x61, 0x73, 0x65, 0x2a, 0x2e, 0x0a, 0x0e, 0x43, 0x6f, + 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x50, 0x68, 0x61, 0x73, 0x65, 0x12, 0x0f, 0x0a, 0x0b, + 0x53, 0x43, 0x41, 0x4c, 0x45, 0x44, 0x5f, 0x44, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x0b, 0x0a, + 0x07, 0x52, 0x55, 0x4e, 0x4e, 0x49, 0x4e, 0x47, 0x10, 0x01, 0x32, 0x86, 0x02, 0x0a, 0x04, 0x53, + 0x68, 0x69, 0x6d, 0x12, 0x4c, 0x0a, 0x07, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x12, 0x1f, + 0x2e, 0x7a, 0x65, 0x72, 0x6f, 0x70, 0x6f, 0x64, 0x2e, 0x73, 0x68, 0x69, 0x6d, 0x2e, 0x76, 0x31, + 0x2e, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x20, 0x2e, 0x7a, 0x65, 0x72, 0x6f, 0x70, 0x6f, 0x64, 0x2e, 0x73, 0x68, 0x69, 0x6d, 0x2e, 0x76, + 0x31, 0x2e, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x12, 0x50, 0x0a, 0x09, 0x47, 0x65, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x21, + 0x2e, 0x7a, 0x65, 0x72, 0x6f, 0x70, 0x6f, 0x64, 0x2e, 0x73, 0x68, 0x69, 0x6d, 0x2e, 0x76, 0x31, + 0x2e, 0x43, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x20, 0x2e, 0x7a, 0x65, 0x72, 0x6f, 0x70, 0x6f, 0x64, 0x2e, 0x73, 0x68, 0x69, 0x6d, + 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x53, 0x74, 0x61, + 0x74, 0x75, 0x73, 0x12, 0x5e, 0x0a, 0x0f, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, + 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x27, 0x2e, 0x7a, 0x65, 0x72, 0x6f, 0x70, 0x6f, 0x64, + 0x2e, 0x73, 0x68, 0x69, 0x6d, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, + 0x62, 0x65, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x20, 0x2e, 0x7a, 0x65, 0x72, 0x6f, 0x70, 0x6f, 0x64, 0x2e, 0x73, 0x68, 0x69, 0x6d, 0x2e, 0x76, + 0x31, 0x2e, 0x43, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, + 0x73, 0x30, 0x01, 0x42, 0x2a, 0x5a, 0x28, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, + 0x6d, 0x2f, 0x63, 0x74, 0x72, 0x6f, 0x78, 0x2f, 0x7a, 0x65, 0x72, 0x6f, 0x70, 0x6f, 0x64, 0x2f, + 0x61, 0x70, 0x69, 0x2f, 0x73, 0x68, 0x69, 0x6d, 0x2f, 0x76, 0x31, 0x2f, 0x3b, 0x76, 0x31, 0x62, + 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -157,23 +408,34 @@ func file_shim_proto_rawDescGZIP() []byte { return file_shim_proto_rawDescData } -var file_shim_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_shim_proto_enumTypes = make([]protoimpl.EnumInfo, 1) +var file_shim_proto_msgTypes = make([]protoimpl.MessageInfo, 5) var file_shim_proto_goTypes = []interface{}{ - (*MetricsResponse)(nil), // 0: zeropod.shim.v1.MetricsResponse - (*MetricsRequest)(nil), // 1: zeropod.shim.v1.MetricsRequest - (*_go.MetricFamily)(nil), // 2: io.prometheus.client.MetricFamily - (*emptypb.Empty)(nil), // 3: google.protobuf.Empty + (ContainerPhase)(0), // 0: zeropod.shim.v1.ContainerPhase + (*MetricsRequest)(nil), // 1: zeropod.shim.v1.MetricsRequest + (*SubscribeStatusRequest)(nil), // 2: zeropod.shim.v1.SubscribeStatusRequest + (*MetricsResponse)(nil), // 3: zeropod.shim.v1.MetricsResponse + (*ContainerRequest)(nil), // 4: zeropod.shim.v1.ContainerRequest + (*ContainerStatus)(nil), // 5: zeropod.shim.v1.ContainerStatus + (*emptypb.Empty)(nil), // 6: google.protobuf.Empty + (*_go.MetricFamily)(nil), // 7: io.prometheus.client.MetricFamily } var file_shim_proto_depIdxs = []int32{ - 2, // 0: zeropod.shim.v1.MetricsResponse.metrics:type_name -> io.prometheus.client.MetricFamily - 3, // 1: zeropod.shim.v1.MetricsRequest.empty:type_name -> google.protobuf.Empty - 1, // 2: zeropod.shim.v1.Shim.Metrics:input_type -> zeropod.shim.v1.MetricsRequest - 0, // 3: zeropod.shim.v1.Shim.Metrics:output_type -> zeropod.shim.v1.MetricsResponse - 3, // [3:4] is the sub-list for method output_type - 2, // [2:3] is the sub-list for method input_type - 2, // [2:2] is the sub-list for extension type_name - 2, // [2:2] is the sub-list for extension extendee - 0, // [0:2] is the sub-list for field type_name + 6, // 0: zeropod.shim.v1.MetricsRequest.empty:type_name -> google.protobuf.Empty + 6, // 1: zeropod.shim.v1.SubscribeStatusRequest.empty:type_name -> google.protobuf.Empty + 7, // 2: zeropod.shim.v1.MetricsResponse.metrics:type_name -> io.prometheus.client.MetricFamily + 0, // 3: zeropod.shim.v1.ContainerStatus.phase:type_name -> zeropod.shim.v1.ContainerPhase + 1, // 4: zeropod.shim.v1.Shim.Metrics:input_type -> zeropod.shim.v1.MetricsRequest + 4, // 5: zeropod.shim.v1.Shim.GetStatus:input_type -> zeropod.shim.v1.ContainerRequest + 2, // 6: zeropod.shim.v1.Shim.SubscribeStatus:input_type -> zeropod.shim.v1.SubscribeStatusRequest + 3, // 7: zeropod.shim.v1.Shim.Metrics:output_type -> zeropod.shim.v1.MetricsResponse + 5, // 8: zeropod.shim.v1.Shim.GetStatus:output_type -> zeropod.shim.v1.ContainerStatus + 5, // 9: zeropod.shim.v1.Shim.SubscribeStatus:output_type -> zeropod.shim.v1.ContainerStatus + 7, // [7:10] is the sub-list for method output_type + 4, // [4:7] is the sub-list for method input_type + 4, // [4:4] is the sub-list for extension type_name + 4, // [4:4] is the sub-list for extension extendee + 0, // [0:4] is the sub-list for field type_name } func init() { file_shim_proto_init() } @@ -183,7 +445,7 @@ func file_shim_proto_init() { } if !protoimpl.UnsafeEnabled { file_shim_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*MetricsResponse); i { + switch v := v.(*MetricsRequest); i { case 0: return &v.state case 1: @@ -195,7 +457,43 @@ func file_shim_proto_init() { } } file_shim_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*MetricsRequest); i { + switch v := v.(*SubscribeStatusRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_shim_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*MetricsResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_shim_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ContainerRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_shim_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ContainerStatus); i { case 0: return &v.state case 1: @@ -212,13 +510,14 @@ func file_shim_proto_init() { File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_shim_proto_rawDesc, - NumEnums: 0, - NumMessages: 2, + NumEnums: 1, + NumMessages: 5, NumExtensions: 0, NumServices: 1, }, GoTypes: file_shim_proto_goTypes, DependencyIndexes: file_shim_proto_depIdxs, + EnumInfos: file_shim_proto_enumTypes, MessageInfos: file_shim_proto_msgTypes, }.Build() File_shim_proto = out.File diff --git a/api/shim/v1/shim.proto b/api/shim/v1/shim.proto index f46989f..3c2cc33 100644 --- a/api/shim/v1/shim.proto +++ b/api/shim/v1/shim.proto @@ -8,12 +8,35 @@ import "io/prometheus/client/metrics.proto"; service Shim { rpc Metrics(MetricsRequest) returns (MetricsResponse); + rpc GetStatus(ContainerRequest) returns (ContainerStatus); + rpc SubscribeStatus(SubscribeStatusRequest) returns (stream ContainerStatus); +} + +message MetricsRequest { + google.protobuf.Empty empty = 1; +} + +message SubscribeStatusRequest { + google.protobuf.Empty empty = 1; } message MetricsResponse { repeated io.prometheus.client.MetricFamily metrics = 1; } -message MetricsRequest { - google.protobuf.Empty empty = 1; +message ContainerRequest { + string id = 1; +} + +enum ContainerPhase { + SCALED_DOWN = 0; + RUNNING = 1; +} + +message ContainerStatus { + string id = 1; + string name = 2; + string pod_name = 3; + string pod_namespace = 4; + ContainerPhase phase = 5; } diff --git a/api/shim/v1/shim_ttrpc.pb.go b/api/shim/v1/shim_ttrpc.pb.go index 76a8f7e..de3f60d 100644 --- a/api/shim/v1/shim_ttrpc.pb.go +++ b/api/shim/v1/shim_ttrpc.pb.go @@ -9,6 +9,21 @@ import ( type ShimService interface { Metrics(context.Context, *MetricsRequest) (*MetricsResponse, error) + GetStatus(context.Context, *ContainerRequest) (*ContainerStatus, error) + SubscribeStatus(context.Context, *SubscribeStatusRequest, Shim_SubscribeStatusServer) error +} + +type Shim_SubscribeStatusServer interface { + Send(*ContainerStatus) error + ttrpc.StreamServer +} + +type shimSubscribeStatusServer struct { + ttrpc.StreamServer +} + +func (x *shimSubscribeStatusServer) Send(m *ContainerStatus) error { + return x.StreamServer.SendMsg(m) } func RegisterShimService(srv *ttrpc.Server, svc ShimService) { @@ -21,15 +36,41 @@ func RegisterShimService(srv *ttrpc.Server, svc ShimService) { } return svc.Metrics(ctx, &req) }, + "GetStatus": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) { + var req ContainerRequest + if err := unmarshal(&req); err != nil { + return nil, err + } + return svc.GetStatus(ctx, &req) + }, + }, + Streams: map[string]ttrpc.Stream{ + "SubscribeStatus": { + Handler: func(ctx context.Context, stream ttrpc.StreamServer) (interface{}, error) { + m := new(SubscribeStatusRequest) + if err := stream.RecvMsg(m); err != nil { + return nil, err + } + return nil, svc.SubscribeStatus(ctx, m, &shimSubscribeStatusServer{stream}) + }, + StreamingClient: false, + StreamingServer: true, + }, }, }) } +type ShimClient interface { + Metrics(context.Context, *MetricsRequest) (*MetricsResponse, error) + GetStatus(context.Context, *ContainerRequest) (*ContainerStatus, error) + SubscribeStatus(context.Context, *SubscribeStatusRequest) (Shim_SubscribeStatusClient, error) +} + type shimClient struct { client *ttrpc.Client } -func NewShimClient(client *ttrpc.Client) ShimService { +func NewShimClient(client *ttrpc.Client) ShimClient { return &shimClient{ client: client, } @@ -42,3 +83,40 @@ func (c *shimClient) Metrics(ctx context.Context, req *MetricsRequest) (*Metrics } return &resp, nil } + +func (c *shimClient) GetStatus(ctx context.Context, req *ContainerRequest) (*ContainerStatus, error) { + var resp ContainerStatus + if err := c.client.Call(ctx, "zeropod.shim.v1.Shim", "GetStatus", req, &resp); err != nil { + return nil, err + } + return &resp, nil +} + +func (c *shimClient) SubscribeStatus(ctx context.Context, req *SubscribeStatusRequest) (Shim_SubscribeStatusClient, error) { + stream, err := c.client.NewStream(ctx, &ttrpc.StreamDesc{ + StreamingClient: false, + StreamingServer: true, + }, "zeropod.shim.v1.Shim", "SubscribeStatus", req) + if err != nil { + return nil, err + } + x := &shimSubscribeStatusClient{stream} + return x, nil +} + +type Shim_SubscribeStatusClient interface { + Recv() (*ContainerStatus, error) + ttrpc.ClientStream +} + +type shimSubscribeStatusClient struct { + ttrpc.ClientStream +} + +func (x *shimSubscribeStatusClient) Recv() (*ContainerStatus, error) { + m := new(ContainerStatus) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} diff --git a/cmd/manager/main.go b/cmd/manager/main.go index bec987d..ae36861 100644 --- a/cmd/manager/main.go +++ b/cmd/manager/main.go @@ -36,6 +36,11 @@ func main() { os.Exit(1) } + if err := manager.StartSubscribers(ctx); err != nil { + slog.Error("starting subscribers", "err", err) + os.Exit(1) + } + server := &http.Server{Addr: *metricsAddr} http.HandleFunc("/metrics", manager.Handler) diff --git a/manager/status.go b/manager/status.go new file mode 100644 index 0000000..88248af --- /dev/null +++ b/manager/status.go @@ -0,0 +1,101 @@ +package manager + +import ( + "context" + "errors" + "fmt" + "io" + "log/slog" + "net" + "os" + "path/filepath" + + "github.com/containerd/ttrpc" + v1 "github.com/ctrox/zeropod/api/shim/v1" + "github.com/ctrox/zeropod/runc/task" + "github.com/fsnotify/fsnotify" + "google.golang.org/protobuf/types/known/emptypb" +) + +func StartSubscribers(ctx context.Context) error { + socks, err := os.ReadDir(task.ShimSocketPath) + if err != nil { + return fmt.Errorf("error listing file in shim socket path: %s", err) + } + + for _, sock := range socks { + sock := sock + go func() { + if err := subscribe(ctx, filepath.Join(task.ShimSocketPath, sock.Name())); err != nil { + slog.Error("error subscribing", "sock", sock.Name(), "err", err) + } + }() + } + + go watchForShims(ctx) + + return nil +} + +func subscribe(ctx context.Context, sock string) error { + log := slog.With("sock", sock) + log.Info("subscribing to status events") + + conn, err := net.Dial("unix", sock) + if err != nil { + return err + } + + shimClient := v1.NewShimClient(ttrpc.NewClient(conn)) + // not sure why but the emptypb needs to be set in order for the subscribe + // to be received + client, err := shimClient.SubscribeStatus(ctx, &v1.SubscribeStatusRequest{Empty: &emptypb.Empty{}}) + if err != nil { + return err + } + + for { + status, err := client.Recv() + if err != nil { + if err == io.EOF || errors.Is(err, ttrpc.ErrClosed) { + log.Info("subscribe closed") + } else { + log.Error("subscribe closed", "err", err) + } + break + } + slog.Info("received status", + "container", status.Name, "pod", status.PodName, + "namespace", status.PodNamespace, "phase", status.Phase) + } + + return nil +} + +func watchForShims(ctx context.Context) error { + watcher, err := fsnotify.NewWatcher() + if err != nil { + return err + } + defer watcher.Close() + + if err := watcher.Add(task.ShimSocketPath); err != nil { + return err + } + + for { + select { + case event := <-watcher.Events: + switch event.Op { + case fsnotify.Create: + if err := subscribe(ctx, event.Name); err != nil { + slog.Error("error subscribing", "sock", event.Name, "err", err) + } + } + case err := <-watcher.Errors: + slog.Error("watch error", "err", err) + case <-ctx.Done(): + return nil + } + } +} diff --git a/runc/task/service_zeropod.go b/runc/task/service_zeropod.go index fadd71b..9af5be1 100644 --- a/runc/task/service_zeropod.go +++ b/runc/task/service_zeropod.go @@ -8,6 +8,7 @@ import ( "sync" "time" + v1 "github.com/ctrox/zeropod/api/shim/v1" "github.com/ctrox/zeropod/zeropod" "google.golang.org/protobuf/types/known/emptypb" @@ -58,8 +59,9 @@ func NewZeropodService(ctx context.Context, publisher shim.Publisher, sd shutdow } w := &wrapper{ service: s, - zeropodContainers: make(map[string]*zeropod.Container), checkpointRestore: sync.Mutex{}, + zeropodContainers: make(map[string]*zeropod.Container), + zeropodEvents: make(chan *v1.ContainerStatus, 128), } go w.processExits() runcC.Monitor = reaper.Default @@ -80,7 +82,7 @@ func NewZeropodService(ctx context.Context, publisher shim.Publisher, sd shutdow return shim.RemoveSocket(address) }) - go startShimServer(ctx, filepath.Base(address)) + go startShimServer(ctx, filepath.Base(address), w.zeropodEvents) return w, nil } @@ -91,6 +93,7 @@ type wrapper struct { mut sync.Mutex checkpointRestore sync.Mutex zeropodContainers map[string]*zeropod.Container + zeropodEvents chan *v1.ContainerStatus } func (w *wrapper) RegisterTTRPC(server *ttrpc.Server) error { @@ -136,7 +139,7 @@ func (w *wrapper) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI. log.G(ctx).Infof("creating zeropod container: %s", cfg.ContainerName) - zeropodContainer, err := zeropod.New(w.context, cfg, &w.checkpointRestore, container, w.platform) + zeropodContainer, err := zeropod.New(w.context, cfg, &w.checkpointRestore, container, w.platform, w.zeropodEvents) if err != nil { return nil, fmt.Errorf("error creating scaled container: %w", err) } diff --git a/runc/task/shim.go b/runc/task/shim.go index 4391ebd..efc4651 100644 --- a/runc/task/shim.go +++ b/runc/task/shim.go @@ -20,12 +20,12 @@ func shimSocketAddress(id string) string { return fmt.Sprintf("unix://%s.sock", filepath.Join(ShimSocketPath, id)) } -func startShimServer(ctx context.Context, id string) { +func startShimServer(ctx context.Context, id string, events chan *v1.ContainerStatus) { socket := shimSocketAddress(id) listener, err := shim.NewSocket(socket) if err != nil { if !shim.SocketEaddrinuse(err) { - log.G(ctx).WithError(err) + log.G(ctx).WithError(err).Error("listening to socket") return } @@ -35,7 +35,7 @@ func startShimServer(ctx context.Context, id string) { } if err := shim.RemoveSocket(socket); err != nil { - log.G(ctx).WithError(fmt.Errorf("remove pre-existing socket: %w", err)) + log.G(ctx).WithError(err).Error("remove pre-existing socket") } listener, err = shim.NewSocket(socket) @@ -57,7 +57,8 @@ func startShimServer(ctx context.Context, id string) { return } defer s.Close() - v1.RegisterShimService(s, &shimService{metrics: zeropod.NewRegistry()}) + + v1.RegisterShimService(s, &shimService{metrics: zeropod.NewRegistry(), events: events}) defer func() { listener.Close() @@ -67,7 +68,7 @@ func startShimServer(ctx context.Context, id string) { <-ctx.Done() - log.G(ctx).Info("stopping metrics server") + log.G(ctx).Info("stopping shim server") listener.Close() s.Close() _ = os.RemoveAll(socket) @@ -77,9 +78,35 @@ func startShimServer(ctx context.Context, id string) { // zeropod-specific functions like metrics. type shimService struct { metrics *prometheus.Registry + task wrapper + events chan *v1.ContainerStatus +} + +// SubscribeStatus watches for shim events. +func (s *shimService) SubscribeStatus(ctx context.Context, _ *v1.SubscribeStatusRequest, srv v1.Shim_SubscribeStatusServer) error { + for { + select { + case msg := <-s.events: + if err := srv.Send(msg); err != nil { + log.G(ctx).Errorf("unable to send event message: %s", err) + } + case <-ctx.Done(): + return nil + } + } +} + +// GetStatus returns the status of a zeropod container. +func (s *shimService) GetStatus(ctx context.Context, req *v1.ContainerRequest) (*v1.ContainerStatus, error) { + container, ok := s.task.zeropodContainers[req.Id] + if !ok { + return nil, fmt.Errorf("could not find zeropod container with id: %s", req.Id) + } + + return container.Status(), nil } -// Metrics implements v1.ShimService. +// Metrics returns metrics of the zeropod shim instance. func (s *shimService) Metrics(context.Context, *v1.MetricsRequest) (*v1.MetricsResponse, error) { mfs, err := s.metrics.Gather() return &v1.MetricsResponse{Metrics: mfs}, err diff --git a/zeropod/container.go b/zeropod/container.go index e5c3d1b..4581de5 100644 --- a/zeropod/container.go +++ b/zeropod/container.go @@ -16,6 +16,7 @@ import ( "github.com/containerd/log" "github.com/containernetworking/plugins/pkg/ns" "github.com/ctrox/zeropod/activator" + v1 "github.com/ctrox/zeropod/api/shim/v1" "github.com/ctrox/zeropod/socket" ) @@ -38,6 +39,7 @@ type Container struct { tracker socket.Tracker preRestore func() HandleStartedFunc postRestore func(*runc.Container, HandleStartedFunc) + events chan *v1.ContainerStatus // mutex to lock during checkpoint/restore operations since concurrent // restores can cause cgroup confusion. This mutex is shared between all @@ -45,7 +47,7 @@ type Container struct { checkpointRestore *sync.Mutex } -func New(ctx context.Context, cfg *Config, cr *sync.Mutex, container *runc.Container, pt stdio.Platform) (*Container, error) { +func New(ctx context.Context, cfg *Config, cr *sync.Mutex, container *runc.Container, pt stdio.Platform, events chan *v1.ContainerStatus) (*Container, error) { p, err := container.Process("") if err != nil { return nil, errdefs.ToGRPC(err) @@ -89,9 +91,11 @@ func New(ctx context.Context, cfg *Config, cr *sync.Mutex, container *runc.Conta netNS: targetNS, tracker: tracker, checkpointRestore: cr, + events: events, } running.With(c.labels()).Set(1) + c.sendEvent(c.Status()) return c, c.initActivator(ctx) } @@ -159,6 +163,29 @@ func (c *Container) SetScaledDown(scaledDown bool) { running.With(c.labels()).Set(1) lastRestoreTime.With(c.labels()).Set(float64(time.Now().UnixNano())) } + c.sendEvent(c.Status()) +} + +func (c *Container) Status() *v1.ContainerStatus { + phase := v1.ContainerPhase_RUNNING + if c.ScaledDown() { + phase = v1.ContainerPhase_SCALED_DOWN + } + return &v1.ContainerStatus{ + Id: c.ID(), + Name: c.cfg.ContainerName, + PodName: c.cfg.PodName, + PodNamespace: c.cfg.PodNamespace, + Phase: phase, + } +} + +func (c *Container) sendEvent(event *v1.ContainerStatus) { + select { + case c.events <- event: + default: + log.G(c.context).Infof("channel full, discarding event: %v", event) + } } func (c *Container) ScaledDown() bool {