From 5a18258fb73b623778290527c597186dc018e01b Mon Sep 17 00:00:00 2001 From: Cyrill Troxler Date: Sun, 12 May 2024 14:32:13 +0200 Subject: [PATCH] feat: use ttrpc for shim metrics Instead of serving the metrics over HTTP, we now use ttrpc and implement a more generic "shim service" that can be used for different purposes than just metrics. --- Makefile | 6 + api/shim/v1/shim.pb.go | 228 +++++++++++++++++++++++++++++++++++ api/shim/v1/shim.proto | 19 +++ api/shim/v1/shim_ttrpc.pb.go | 44 +++++++ go.mod | 6 +- go.sum | 8 +- manager/metrics_collector.go | 62 ++++------ runc/task/service_zeropod.go | 2 +- runc/task/shim.go | 86 +++++++++++++ zeropod/metrics.go | 70 +---------- 10 files changed, 416 insertions(+), 115 deletions(-) create mode 100644 api/shim/v1/shim.pb.go create mode 100644 api/shim/v1/shim.proto create mode 100644 api/shim/v1/shim_ttrpc.pb.go create mode 100644 runc/task/shim.go diff --git a/Makefile b/Makefile index 06a1981..4d88d3f 100644 --- a/Makefile +++ b/Makefile @@ -83,6 +83,12 @@ generate: export BPF_CFLAGS := $(CFLAGS) generate: docker run --rm -v $(PWD):/app:Z --user $(shell id -u):$(shell id -g) --env=BPF_CLANG="$(CLANG)" --env=BPF_CFLAGS="$(CFLAGS)" $(EBPF_IMAGE) +ttrpc: + cd api/shim/v1; protoc --go_out=. --go_opt=paths=source_relative \ + --ttrpc_out=. --plugin=protoc-gen-ttrpc=`which protoc-gen-go-ttrpc` \ + --ttrpc_opt=paths=source_relative *.proto -I. \ + -I ${GOPATH}/pkg/mod/github.com/prometheus/client_model@v0.5.0 + # to improve reproducibility of the bpf builds, we dump the vmlinux.h and # store it compressed in git instead of dumping it during the build. update-vmlinux: diff --git a/api/shim/v1/shim.pb.go b/api/shim/v1/shim.pb.go new file mode 100644 index 0000000..c6ffd17 --- /dev/null +++ b/api/shim/v1/shim.pb.go @@ -0,0 +1,228 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.31.0 +// protoc v3.19.6 +// source: shim.proto + +package v1 + +import ( + _go "github.com/prometheus/client_model/go" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + emptypb "google.golang.org/protobuf/types/known/emptypb" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type MetricsResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Metrics []*_go.MetricFamily `protobuf:"bytes,1,rep,name=metrics,proto3" json:"metrics,omitempty"` +} + +func (x *MetricsResponse) Reset() { + *x = MetricsResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_shim_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *MetricsResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*MetricsResponse) ProtoMessage() {} + +func (x *MetricsResponse) ProtoReflect() protoreflect.Message { + mi := &file_shim_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use MetricsResponse.ProtoReflect.Descriptor instead. +func (*MetricsResponse) Descriptor() ([]byte, []int) { + return file_shim_proto_rawDescGZIP(), []int{0} +} + +func (x *MetricsResponse) GetMetrics() []*_go.MetricFamily { + if x != nil { + return x.Metrics + } + return nil +} + +type MetricsRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Empty *emptypb.Empty `protobuf:"bytes,1,opt,name=empty,proto3" json:"empty,omitempty"` +} + +func (x *MetricsRequest) Reset() { + *x = MetricsRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_shim_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *MetricsRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*MetricsRequest) ProtoMessage() {} + +func (x *MetricsRequest) ProtoReflect() protoreflect.Message { + mi := &file_shim_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use MetricsRequest.ProtoReflect.Descriptor instead. +func (*MetricsRequest) Descriptor() ([]byte, []int) { + return file_shim_proto_rawDescGZIP(), []int{1} +} + +func (x *MetricsRequest) GetEmpty() *emptypb.Empty { + if x != nil { + return x.Empty + } + return nil +} + +var File_shim_proto protoreflect.FileDescriptor + +var file_shim_proto_rawDesc = []byte{ + 0x0a, 0x0a, 0x73, 0x68, 0x69, 0x6d, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0f, 0x7a, 0x65, + 0x72, 0x6f, 0x70, 0x6f, 0x64, 0x2e, 0x73, 0x68, 0x69, 0x6d, 0x2e, 0x76, 0x31, 0x1a, 0x1b, 0x67, + 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, + 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x22, 0x69, 0x6f, 0x2f, 0x70, + 0x72, 0x6f, 0x6d, 0x65, 0x74, 0x68, 0x65, 0x75, 0x73, 0x2f, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, + 0x2f, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x4f, + 0x0a, 0x0f, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x12, 0x3c, 0x0a, 0x07, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x18, 0x01, 0x20, 0x03, + 0x28, 0x0b, 0x32, 0x22, 0x2e, 0x69, 0x6f, 0x2e, 0x70, 0x72, 0x6f, 0x6d, 0x65, 0x74, 0x68, 0x65, + 0x75, 0x73, 0x2e, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x2e, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, + 0x46, 0x61, 0x6d, 0x69, 0x6c, 0x79, 0x52, 0x07, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x22, + 0x3e, 0x0a, 0x0e, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x12, 0x2c, 0x0a, 0x05, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, + 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x52, 0x05, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x32, + 0x54, 0x0a, 0x04, 0x53, 0x68, 0x69, 0x6d, 0x12, 0x4c, 0x0a, 0x07, 0x4d, 0x65, 0x74, 0x72, 0x69, + 0x63, 0x73, 0x12, 0x1f, 0x2e, 0x7a, 0x65, 0x72, 0x6f, 0x70, 0x6f, 0x64, 0x2e, 0x73, 0x68, 0x69, + 0x6d, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x20, 0x2e, 0x7a, 0x65, 0x72, 0x6f, 0x70, 0x6f, 0x64, 0x2e, 0x73, 0x68, + 0x69, 0x6d, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x2a, 0x5a, 0x28, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, + 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x74, 0x72, 0x6f, 0x78, 0x2f, 0x7a, 0x65, 0x72, 0x6f, 0x70, 0x6f, + 0x64, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x68, 0x69, 0x6d, 0x2f, 0x76, 0x31, 0x2f, 0x3b, 0x76, + 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_shim_proto_rawDescOnce sync.Once + file_shim_proto_rawDescData = file_shim_proto_rawDesc +) + +func file_shim_proto_rawDescGZIP() []byte { + file_shim_proto_rawDescOnce.Do(func() { + file_shim_proto_rawDescData = protoimpl.X.CompressGZIP(file_shim_proto_rawDescData) + }) + return file_shim_proto_rawDescData +} + +var file_shim_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_shim_proto_goTypes = []interface{}{ + (*MetricsResponse)(nil), // 0: zeropod.shim.v1.MetricsResponse + (*MetricsRequest)(nil), // 1: zeropod.shim.v1.MetricsRequest + (*_go.MetricFamily)(nil), // 2: io.prometheus.client.MetricFamily + (*emptypb.Empty)(nil), // 3: google.protobuf.Empty +} +var file_shim_proto_depIdxs = []int32{ + 2, // 0: zeropod.shim.v1.MetricsResponse.metrics:type_name -> io.prometheus.client.MetricFamily + 3, // 1: zeropod.shim.v1.MetricsRequest.empty:type_name -> google.protobuf.Empty + 1, // 2: zeropod.shim.v1.Shim.Metrics:input_type -> zeropod.shim.v1.MetricsRequest + 0, // 3: zeropod.shim.v1.Shim.Metrics:output_type -> zeropod.shim.v1.MetricsResponse + 3, // [3:4] is the sub-list for method output_type + 2, // [2:3] is the sub-list for method input_type + 2, // [2:2] is the sub-list for extension type_name + 2, // [2:2] is the sub-list for extension extendee + 0, // [0:2] is the sub-list for field type_name +} + +func init() { file_shim_proto_init() } +func file_shim_proto_init() { + if File_shim_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_shim_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*MetricsResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_shim_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*MetricsRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_shim_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_shim_proto_goTypes, + DependencyIndexes: file_shim_proto_depIdxs, + MessageInfos: file_shim_proto_msgTypes, + }.Build() + File_shim_proto = out.File + file_shim_proto_rawDesc = nil + file_shim_proto_goTypes = nil + file_shim_proto_depIdxs = nil +} diff --git a/api/shim/v1/shim.proto b/api/shim/v1/shim.proto new file mode 100644 index 0000000..f46989f --- /dev/null +++ b/api/shim/v1/shim.proto @@ -0,0 +1,19 @@ +syntax = "proto3"; + +package zeropod.shim.v1; +option go_package = "github.com/ctrox/zeropod/api/shim/v1/;v1"; + +import "google/protobuf/empty.proto"; +import "io/prometheus/client/metrics.proto"; + +service Shim { + rpc Metrics(MetricsRequest) returns (MetricsResponse); +} + +message MetricsResponse { + repeated io.prometheus.client.MetricFamily metrics = 1; +} + +message MetricsRequest { + google.protobuf.Empty empty = 1; +} diff --git a/api/shim/v1/shim_ttrpc.pb.go b/api/shim/v1/shim_ttrpc.pb.go new file mode 100644 index 0000000..76a8f7e --- /dev/null +++ b/api/shim/v1/shim_ttrpc.pb.go @@ -0,0 +1,44 @@ +// Code generated by protoc-gen-go-ttrpc. DO NOT EDIT. +// source: shim.proto +package v1 + +import ( + context "context" + ttrpc "github.com/containerd/ttrpc" +) + +type ShimService interface { + Metrics(context.Context, *MetricsRequest) (*MetricsResponse, error) +} + +func RegisterShimService(srv *ttrpc.Server, svc ShimService) { + srv.RegisterService("zeropod.shim.v1.Shim", &ttrpc.ServiceDesc{ + Methods: map[string]ttrpc.Method{ + "Metrics": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) { + var req MetricsRequest + if err := unmarshal(&req); err != nil { + return nil, err + } + return svc.Metrics(ctx, &req) + }, + }, + }) +} + +type shimClient struct { + client *ttrpc.Client +} + +func NewShimClient(client *ttrpc.Client) ShimService { + return &shimClient{ + client: client, + } +} + +func (c *shimClient) Metrics(ctx context.Context, req *MetricsRequest) (*MetricsResponse, error) { + var resp MetricsResponse + if err := c.client.Call(ctx, "zeropod.shim.v1.Shim", "Metrics", req, &resp); err != nil { + return nil, err + } + return &resp, nil +} diff --git a/go.mod b/go.mod index a51322e..8cc6cf4 100644 --- a/go.mod +++ b/go.mod @@ -27,8 +27,9 @@ require ( github.com/sirupsen/logrus v1.9.3 github.com/stretchr/testify v1.8.4 github.com/vishvananda/netlink v1.2.1-beta.2 + golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f golang.org/x/sys v0.19.0 - google.golang.org/protobuf v1.33.0 + google.golang.org/protobuf v1.34.1 k8s.io/api v0.29.0 k8s.io/apimachinery v0.29.0 k8s.io/client-go v0.29.0 @@ -69,7 +70,7 @@ require ( github.com/godbus/dbus/v5 v5.1.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect - github.com/golang/protobuf v1.5.3 // indirect + github.com/golang/protobuf v1.5.4 // indirect github.com/google/gnostic-models v0.6.8 // indirect github.com/google/go-cmp v0.6.0 // indirect github.com/google/gofuzz v1.2.0 // indirect @@ -116,7 +117,6 @@ require ( go.opentelemetry.io/otel/metric v1.19.0 // indirect go.opentelemetry.io/otel/trace v1.19.0 // indirect go.starlark.net v0.0.0-20200306205701-8dd3e2ee1dd5 // indirect - golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f // indirect golang.org/x/mod v0.17.0 // indirect golang.org/x/net v0.24.0 // indirect golang.org/x/oauth2 v0.16.0 // indirect diff --git a/go.sum b/go.sum index 5611955..f678aad 100644 --- a/go.sum +++ b/go.sum @@ -135,8 +135,8 @@ github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QD github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= -github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= -github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/google/gnostic-models v0.6.8 h1:yo/ABAfM5IMRsS1VnXjTBvUb61tFIHozhlYvRgGre9I= github.com/google/gnostic-models v0.6.8/go.mod h1:5n7qKqH0f5wFt+aWF8CW6pZLLNOfYuF5OpfBSENuI8U= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= @@ -485,8 +485,8 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= -google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg= +google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/manager/metrics_collector.go b/manager/metrics_collector.go index f56bd1f..65988e7 100644 --- a/manager/metrics_collector.go +++ b/manager/metrics_collector.go @@ -1,19 +1,21 @@ package manager import ( - "fmt" + "context" "io" "log/slog" "net" "net/http" "os" "path/filepath" - "sort" + "slices" - "github.com/ctrox/zeropod/zeropod" + "github.com/containerd/ttrpc" + v1 "github.com/ctrox/zeropod/api/shim/v1" + "github.com/ctrox/zeropod/runc/task" dto "github.com/prometheus/client_model/go" - "github.com/prometheus/common/expfmt" + "golang.org/x/exp/maps" ) func Handler(w http.ResponseWriter, req *http.Request) { @@ -23,42 +25,40 @@ func Handler(w http.ResponseWriter, req *http.Request) { // fetchMetricsAndMerge gets metrics from each socket, merges them together // and writes them to w. func fetchMetricsAndMerge(w io.Writer) { - socks, err := os.ReadDir(zeropod.MetricsSocketPath) + socks, err := os.ReadDir(task.ShimSocketPath) if err != nil { - slog.Error("error listing file in metrics socket path", "path", zeropod.MetricsSocketPath, "err", err) + slog.Error("error listing file in shim socket path", "path", task.ShimSocketPath, "err", err) return } mfs := map[string]*dto.MetricFamily{} for _, sock := range socks { - sockName := filepath.Join(zeropod.MetricsSocketPath, sock.Name()) - slog.Info("reading sock", "name", sockName) + sockName := filepath.Join(task.ShimSocketPath, sock.Name()) + slog.Debug("getting metrics", "name", sockName) - res, err := getMetrics(sockName) + shimMetrics, err := getMetricsOverTTRPC(context.Background(), sockName) if err != nil { slog.Error("getting metrics", "err", err) // we still want to read the rest of the sockets continue } + for _, mf := range shimMetrics { + if mf.Name == nil { + continue + } - for n, mf := range res { - mfo, ok := mfs[n] + mfo, ok := mfs[*mf.Name] if ok { mfo.Metric = append(mfo.Metric, mf.Metric...) } else { - mfs[n] = mf + mfs[*mf.Name] = mf } } } - - names := []string{} - for n := range mfs { - names = append(names, n) - } - sort.Strings(names) - + keys := maps.Keys(mfs) + slices.Sort(keys) enc := expfmt.NewEncoder(w, expfmt.FmtText) - for _, n := range names { + for _, n := range keys { err := enc.Encode(mfs[n]) if err != nil { slog.Error("encoding metrics", "err", err) @@ -67,30 +67,16 @@ func fetchMetricsAndMerge(w io.Writer) { } } -func getMetrics(sock string) (map[string]*dto.MetricFamily, error) { - tr := &http.Transport{ - Dial: func(proto, addr string) (conn net.Conn, err error) { - return net.Dial("unix", sock) - }, - } - - client := &http.Client{Transport: tr} - - // the host does not seem to matter when using unix sockets - resp, err := client.Get("http://localhost/metrics") +func getMetricsOverTTRPC(ctx context.Context, sock string) ([]*dto.MetricFamily, error) { + conn, err := net.Dial("unix", sock) if err != nil { return nil, err } - if resp.StatusCode != http.StatusOK { - return nil, fmt.Errorf("expected status 200, got %v", resp.StatusCode) - } - - var parser expfmt.TextParser - mfs, err := parser.TextToMetricFamilies(resp.Body) + resp, err := v1.NewShimClient(ttrpc.NewClient(conn)).Metrics(ctx, &v1.MetricsRequest{}) if err != nil { return nil, err } - return mfs, nil + return resp.Metrics, nil } diff --git a/runc/task/service_zeropod.go b/runc/task/service_zeropod.go index 1f022e1..fadd71b 100644 --- a/runc/task/service_zeropod.go +++ b/runc/task/service_zeropod.go @@ -80,7 +80,7 @@ func NewZeropodService(ctx context.Context, publisher shim.Publisher, sd shutdow return shim.RemoveSocket(address) }) - go zeropod.StartMetricsServer(ctx, filepath.Base(address)) + go startShimServer(ctx, filepath.Base(address)) return w, nil } diff --git a/runc/task/shim.go b/runc/task/shim.go new file mode 100644 index 0000000..4391ebd --- /dev/null +++ b/runc/task/shim.go @@ -0,0 +1,86 @@ +package task + +import ( + "context" + "fmt" + "os" + "path/filepath" + + "github.com/containerd/containerd/runtime/v2/shim" + "github.com/containerd/log" + "github.com/containerd/ttrpc" + v1 "github.com/ctrox/zeropod/api/shim/v1" + "github.com/ctrox/zeropod/zeropod" + "github.com/prometheus/client_golang/prometheus" +) + +const ShimSocketPath = "/run/zeropod/s/" + +func shimSocketAddress(id string) string { + return fmt.Sprintf("unix://%s.sock", filepath.Join(ShimSocketPath, id)) +} + +func startShimServer(ctx context.Context, id string) { + socket := shimSocketAddress(id) + listener, err := shim.NewSocket(socket) + if err != nil { + if !shim.SocketEaddrinuse(err) { + log.G(ctx).WithError(err) + return + } + + if shim.CanConnect(socket) { + log.G(ctx).Debug("shim socket already exists, skipping server start") + return + } + + if err := shim.RemoveSocket(socket); err != nil { + log.G(ctx).WithError(fmt.Errorf("remove pre-existing socket: %w", err)) + } + + listener, err = shim.NewSocket(socket) + if err != nil { + log.G(ctx).WithError(err).Error("failed to create shim listener") + } + } + + log.G(ctx).Infof("starting shim server at %s", socket) + // write shim address to filesystem + if err := shim.WriteAddress("shim_address", socket); err != nil { + log.G(ctx).WithError(err).Errorf("failed to write shim address") + return + } + + s, err := ttrpc.NewServer() + if err != nil { + log.G(ctx).WithError(err).Errorf("failed to create ttrpc server") + return + } + defer s.Close() + v1.RegisterShimService(s, &shimService{metrics: zeropod.NewRegistry()}) + + defer func() { + listener.Close() + os.Remove(socket) + }() + go s.Serve(ctx, listener) + + <-ctx.Done() + + log.G(ctx).Info("stopping metrics server") + listener.Close() + s.Close() + _ = os.RemoveAll(socket) +} + +// shimService is an extension to the shim task service to provide +// zeropod-specific functions like metrics. +type shimService struct { + metrics *prometheus.Registry +} + +// Metrics implements v1.ShimService. +func (s *shimService) Metrics(context.Context, *v1.MetricsRequest) (*v1.MetricsResponse, error) { + mfs, err := s.metrics.Gather() + return &v1.MetricsResponse{Metrics: mfs}, err +} diff --git a/zeropod/metrics.go b/zeropod/metrics.go index 3c27341..6a708d4 100644 --- a/zeropod/metrics.go +++ b/zeropod/metrics.go @@ -1,16 +1,7 @@ package zeropod import ( - "context" - "fmt" - "net/http" - "os" - "path/filepath" - - "github.com/containerd/containerd/runtime/v2/shim" - "github.com/containerd/log" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promhttp" ) const ( @@ -69,66 +60,7 @@ var ( }, commonLabels) ) -const MetricsSocketPath = "/run/zeropod/s/" - -func metricsSocketAddress(containerID string) string { - return fmt.Sprintf("unix://%s.sock", filepath.Join(MetricsSocketPath, containerID)) -} - -func StartMetricsServer(ctx context.Context, containerID string) { - metricsAddress := metricsSocketAddress(containerID) - listener, err := shim.NewSocket(metricsAddress) - if err != nil { - if !shim.SocketEaddrinuse(err) { - log.G(ctx).WithError(err) - return - } - - if shim.CanConnect(metricsAddress) { - log.G(ctx).Debug("metrics socket already exists, skipping server start") - return - } - - if err := shim.RemoveSocket(metricsAddress); err != nil { - log.G(ctx).WithError(fmt.Errorf("remove pre-existing socket: %w", err)) - } - - listener, err = shim.NewSocket(metricsAddress) - if err != nil { - log.G(ctx).WithError(err).Error("failed to create metrics listener") - } - } - - log.G(ctx).Infof("starting metrics server at %s", metricsAddress) - // write metrics address to filesystem - if err := shim.WriteAddress("metrics_address", metricsAddress); err != nil { - log.G(ctx).WithError(err).Errorf("failed to write metrics address") - return - } - - mux := http.NewServeMux() - handler := promhttp.HandlerFor( - newRegistry(), - promhttp.HandlerOpts{ - EnableOpenMetrics: false, - }, - ) - - mux.Handle("/metrics", handler) - - server := http.Server{Handler: mux} - - go server.Serve(listener) - - <-ctx.Done() - - log.G(ctx).Info("stopping metrics server") - listener.Close() - server.Close() - _ = os.RemoveAll(metricsAddress) -} - -func newRegistry() *prometheus.Registry { +func NewRegistry() *prometheus.Registry { reg := prometheus.NewRegistry() reg.MustRegister(