diff --git a/README.md b/README.md index c46c452..2d2cc3a 100644 --- a/README.md +++ b/README.md @@ -285,13 +285,34 @@ the shim otherwise. For example, loading eBPF programs can be quite memory intensive so they have been moved from the shim to the manager to keep the shim memory usage as minimal as possible. -In addition to that it collects metrics from all the shim processes and -exposes those metrics on an HTTP endpoint. +These are the responsibilities of the manager: + +- Loading eBPF programs that the shim(s) rely on. +- Collect metrics from all shim processes and expose them on HTTP for scraping. +- Subscribes to shim scaling events and adjusts Pod requests. + +#### In-place Resource scaling (Experimental) + +This makes use of the feature flag +[InPlacePodVerticalScaling](https://github.com/kubernetes/enhancements/tree/master/keps/sig-node/1287-in-place-update-pod-resources) +to automatically update the pod resource requests to a minimum on scale down +events and revert them again on scale up. Once the Kubernetes feature flag is +enabled, it also needs to be enabled using the manager flag +`-in-place-scaling=true`. + +#### Flags + +``` +-metrics-addr=":8080" sets the address of the metrics server +-debug enables debug logging +-in-place-scaling=false enable in-place resource scaling, requires InPlacePodVerticalScaling feature flag +``` ## Metrics The zeropod-node pod exposes metrics on `0.0.0.0:8080/metrics` in Prometheus -format on each installed node. The following metrics are currently available: +format on each installed node. The metrics address can be configured with the +`-metrics-addr` flag. The following metrics are currently available: ```bash # HELP zeropod_checkpoint_duration_seconds The duration of the last checkpoint in seconds. diff --git a/cmd/manager/main.go b/cmd/manager/main.go index ae36861..3ef59f4 100644 --- a/cmd/manager/main.go +++ b/cmd/manager/main.go @@ -15,11 +15,18 @@ import ( ) var ( - metricsAddr = flag.String("metrics-addr", ":8080", "address of the metrics server") + metricsAddr = flag.String("metrics-addr", ":8080", "address of the metrics server") + debug = flag.Bool("debug", true, "enable debug logs") + inPlaceScaling = flag.Bool("in-place-scaling", false, + "enable in-place resource scaling, requires InPlacePodVerticalScaling feature flag") ) func main() { flag.Parse() + + if *debug { + slog.SetLogLoggerLevel(slog.LevelDebug) + } slog.Info("starting manager", "metrics-addr", *metricsAddr) ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) @@ -36,7 +43,17 @@ func main() { os.Exit(1) } - if err := manager.StartSubscribers(ctx); err != nil { + subscribers := []manager.StatusHandler{} + if *inPlaceScaling { + podScaler, err := manager.NewPodScaler() + if err != nil { + slog.Error("podScaler init", "err", err) + os.Exit(1) + } + subscribers = append(subscribers, podScaler) + } + + if err := manager.StartSubscribers(ctx, subscribers...); err != nil { slog.Error("starting subscribers", "err", err) os.Exit(1) } diff --git a/config/base/node-daemonset.yaml b/config/base/node-daemonset.yaml index e4a2452..87fad0b 100644 --- a/config/base/node-daemonset.yaml +++ b/config/base/node-daemonset.yaml @@ -52,6 +52,9 @@ spec: - name: manager image: manager imagePullPolicy: IfNotPresent + command: ["/zeropod-manager"] + args: + - -metrics-addr=:8080 ports: - name: metrics containerPort: 8080 diff --git a/config/base/rbac.yaml b/config/base/rbac.yaml index 8ebdd8e..59a3958 100644 --- a/config/base/rbac.yaml +++ b/config/base/rbac.yaml @@ -8,7 +8,7 @@ metadata: apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: - name: runtimeclass-installer + name: zeropod:runtimeclass-installer rules: - apiGroups: - node.k8s.io @@ -22,11 +22,38 @@ rules: apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding metadata: - name: runtimeclass-installer + name: zeropod:runtimeclass-installer roleRef: apiGroup: rbac.authorization.k8s.io kind: ClusterRole - name: runtimeclass-installer + name: zeropod:runtimeclass-installer +subjects: + - kind: ServiceAccount + name: zeropod-node + namespace: zeropod-system +--- +# the manager needs to get/update pods for dynamic resource requests +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: zeropod:pod-updater +rules: + - apiGroups: + - "" + resources: + - pods + verbs: + - get + - update +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: zeropod:pod-updater +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: zeropod:pod-updater subjects: - kind: ServiceAccount name: zeropod-node diff --git a/config/examples/nginx.yaml b/config/examples/nginx.yaml index 25bd61f..62e4804 100644 --- a/config/examples/nginx.yaml +++ b/config/examples/nginx.yaml @@ -3,7 +3,7 @@ kind: Deployment metadata: name: nginx spec: - replicas: 3 + replicas: 1 selector: matchLabels: app: nginx @@ -21,3 +21,7 @@ spec: name: nginx ports: - containerPort: 80 + resources: + requests: + cpu: 100m + memory: 128Mi diff --git a/config/kind/kustomization.yaml b/config/kind/kustomization.yaml index 559dc9b..ff89457 100644 --- a/config/kind/kustomization.yaml +++ b/config/kind/kustomization.yaml @@ -7,3 +7,10 @@ images: - name: installer newName: ghcr.io/ctrox/zeropod-installer newTag: dev +patches: + - patch: |- + - op: add + path: /spec/template/spec/containers/0/args/- + value: -in-place-scaling=true + target: + kind: DaemonSet diff --git a/e2e/e2e_test.go b/e2e/e2e_test.go index f4f36df..8e18176 100644 --- a/e2e/e2e_test.go +++ b/e2e/e2e_test.go @@ -9,11 +9,13 @@ import ( "testing" "time" + "github.com/ctrox/zeropod/manager" "github.com/ctrox/zeropod/zeropod" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" "k8s.io/utils/ptr" ) @@ -193,6 +195,33 @@ func TestE2E(t *testing.T) { // exec and should test the deletion in the restored state. }) + t.Run("resources scaling", func(t *testing.T) { + pod := testPod(scaleDownAfter(0), agnContainer("agn", 8080), resources(corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100m"), + corev1.ResourceMemory: resource.MustParse("100Mi"), + }, + })) + + cleanupPod := createPodAndWait(t, ctx, client, pod) + defer cleanupPod() + require.Eventually(t, func() bool { + if err := client.Get(ctx, objectName(pod), pod); err != nil { + return false + } + + resourcesScaledDown := false + for _, container := range pod.Status.ContainerStatuses { + t.Logf("allocated resources: %v", container.AllocatedResources) + resourcesScaledDown = container.AllocatedResources != nil && + container.AllocatedResources[corev1.ResourceCPU] == manager.ScaledDownCPU && + container.AllocatedResources[corev1.ResourceMemory] == manager.ScaledDownMemory + } + + return resourcesScaledDown + }, time.Second*10, time.Second) + }) + t.Run("metrics", func(t *testing.T) { // create two pods to test metric merging runningPod := testPod(scaleDownAfter(time.Hour)) diff --git a/e2e/kind.yaml b/e2e/kind.yaml index 911779e..8209778 100644 --- a/e2e/kind.yaml +++ b/e2e/kind.yaml @@ -1,5 +1,7 @@ kind: Cluster apiVersion: kind.x-k8s.io/v1alpha4 +featureGates: + InPlacePodVerticalScaling: true nodes: - role: control-plane extraMounts: diff --git a/e2e/setup_test.go b/e2e/setup_test.go index 0373aa4..7fbf493 100644 --- a/e2e/setup_test.go +++ b/e2e/setup_test.go @@ -121,6 +121,9 @@ func startKind(t testing.TB, name string, port int) (c *rest.Config, err error) if err := provider.Create(name, cluster.CreateWithV1Alpha4Config(&v1alpha4.Cluster{ Name: name, + FeatureGates: map[string]bool{ + "InPlacePodVerticalScaling": true, + }, Nodes: []v1alpha4.Node{{ Labels: map[string]string{zeropod.NodeLabel: "true"}, // setup port map for our node port @@ -349,6 +352,14 @@ func portsAnnotation(portsMap string) podOption { }) } +func resources(res corev1.ResourceRequirements) podOption { + return func(p *pod) { + for i := range p.Spec.Containers { + p.Spec.Containers[i].Resources = res + } + } +} + const agnHostImage = "registry.k8s.io/e2e-test-images/agnhost:2.39" func agnContainer(name string, port int) podOption { diff --git a/go.mod b/go.mod index 8cc6cf4..c939e4e 100644 --- a/go.mod +++ b/go.mod @@ -59,6 +59,7 @@ require ( github.com/docker/go-metrics v0.0.1 // indirect github.com/docker/go-units v0.5.0 // indirect github.com/emicklei/go-restful/v3 v3.11.2 // indirect + github.com/evanphx/json-patch v5.6.0+incompatible // indirect github.com/evanphx/json-patch/v5 v5.7.0 // indirect github.com/felixge/httpsnoop v1.0.3 // indirect github.com/go-errors/errors v1.4.2 // indirect diff --git a/go.sum b/go.sum index f678aad..348374a 100644 --- a/go.sum +++ b/go.sum @@ -81,6 +81,8 @@ github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymF github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/evanphx/json-patch v5.6.0+incompatible h1:jBYDEEiFBPxA0v50tFdvOzQQTCvpL6mnFh5mB2/l16U= +github.com/evanphx/json-patch v5.6.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/evanphx/json-patch/v5 v5.6.0/go.mod h1:G79N1coSVB93tBe7j6PhzjmR3/2VvlbKOFpnXhI9Bw4= github.com/evanphx/json-patch/v5 v5.7.0 h1:nJqP7uwL84RJInrohHfW0Fx3awjbm8qZeFv0nW9SYGc= github.com/evanphx/json-patch/v5 v5.7.0/go.mod h1:VNkHZ/282BpEyt/tObQO8s5CMPmYYq14uClGH4abBuQ= diff --git a/manager/pod_scaler.go b/manager/pod_scaler.go new file mode 100644 index 0000000..ae4bf3e --- /dev/null +++ b/manager/pod_scaler.go @@ -0,0 +1,196 @@ +package manager + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + + v1 "github.com/ctrox/zeropod/api/shim/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/config" +) + +const ( + CPUAnnotationKey = "zeropod.ctrox.dev/cpu-requests" + MemoryAnnotationKey = "zeropod.ctrox.dev/memory-requests" +) + +var ( + ScaledDownCPU = resource.MustParse("1m") + ScaledDownMemory = resource.MustParse("1Ki") +) + +type containerResource map[string]resource.Quantity + +type PodScaler struct { + client client.Client +} + +func NewPodScaler() (*PodScaler, error) { + slog.Info("pod scaler init") + cfg, err := config.GetConfig() + if err != nil { + return nil, err + } + c, err := client.New(cfg, client.Options{}) + return &PodScaler{client: c}, err +} + +func (ps *PodScaler) Handle(ctx context.Context, status *v1.ContainerStatus) error { + clog := slog.With("container", status.Name, "pod", status.PodName, + "namespace", status.PodNamespace, "phase", status.Phase) + clog.Info("handling pod") + + pod := &corev1.Pod{} + podName := types.NamespacedName{Name: status.PodName, Namespace: status.PodNamespace} + if err := ps.client.Get(ctx, podName, pod); err != nil { + return err + } + + updatePod := false + for i, container := range pod.Spec.Containers { + if container.Name != status.Name { + continue + } + + _, hasCPU := container.Resources.Requests[corev1.ResourceCPU] + _, hasMemory := container.Resources.Requests[corev1.ResourceMemory] + if !hasCPU || !hasMemory { + clog.Debug("ignoring container without resources") + continue + } + + initial, err := ps.initialRequests(container, pod.Annotations) + if err != nil { + return fmt.Errorf("getting initial requests from pod failed: %w", err) + } + + current := container.Resources.Requests + if ps.isUpToDate(initial, current, status) { + clog.Debug("container is up to date", "initial", printResources(initial)) + continue + } + + if err := ps.setAnnotations(pod); err != nil { + return err + } + + new := ps.newRequests(initial, current, status) + pod.Spec.Containers[i].Resources.Requests = new + clog.Debug("container needs to be updated", "current", printResources(current), "new", printResources(new)) + updatePod = true + } + + if !updatePod { + return nil + } + + if err := ps.updateRequests(ctx, pod); err != nil { + if errors.IsInvalid(err) { + clog.Error("in-place scaling failed, ensure InPlacePodVerticalScaling feature flag is enabled") + return nil + } + return err + } + + return nil +} + +func (ps *PodScaler) isUpToDate(initial, current corev1.ResourceList, status *v1.ContainerStatus) bool { + switch status.Phase { + case v1.ContainerPhase_SCALED_DOWN: + return current[corev1.ResourceCPU] == ScaledDownCPU && + current[corev1.ResourceMemory] == ScaledDownMemory + case v1.ContainerPhase_RUNNING: + return current[corev1.ResourceCPU] == initial[corev1.ResourceCPU] && + current[corev1.ResourceMemory] == initial[corev1.ResourceMemory] + default: + return true + } +} + +func (ps *PodScaler) newRequests(initial, current corev1.ResourceList, status *v1.ContainerStatus) corev1.ResourceList { + switch status.Phase { + case v1.ContainerPhase_SCALED_DOWN: + current[corev1.ResourceCPU] = ScaledDownCPU + current[corev1.ResourceMemory] = ScaledDownMemory + return current + case v1.ContainerPhase_RUNNING: + return initial + default: + return current + } +} + +func (ps *PodScaler) initialRequests(container corev1.Container, podAnnotations map[string]string) (corev1.ResourceList, error) { + initial := container.DeepCopy().Resources.Requests + containerCPUs := containerResource{} + if cpuReq, ok := podAnnotations[CPUAnnotationKey]; ok { + if err := json.Unmarshal([]byte(cpuReq), &containerCPUs); err != nil { + return nil, err + } + } + + containerMemory := containerResource{} + if memortReq, ok := podAnnotations[MemoryAnnotationKey]; ok { + if err := json.Unmarshal([]byte(memortReq), &containerMemory); err != nil { + return nil, err + } + } + + if cpu, ok := containerCPUs[container.Name]; ok { + initial[corev1.ResourceCPU] = cpu + } + + if memory, ok := containerMemory[container.Name]; ok { + initial[corev1.ResourceMemory] = memory + } + + return initial, nil +} + +func (ps *PodScaler) setAnnotations(pod *corev1.Pod) error { + containerCPUs := containerResource{} + containerMemory := containerResource{} + for _, container := range pod.Spec.Containers { + containerCPUs[container.Name] = container.Resources.Requests[corev1.ResourceCPU] + containerMemory[container.Name] = container.Resources.Requests[corev1.ResourceMemory] + } + + if pod.Annotations == nil { + pod.Annotations = map[string]string{} + } + + if _, ok := pod.Annotations[CPUAnnotationKey]; !ok { + val, err := json.Marshal(containerCPUs) + if err != nil { + return err + } + pod.Annotations[CPUAnnotationKey] = string(val) + } + + if _, ok := pod.Annotations[MemoryAnnotationKey]; !ok { + val, err := json.Marshal(containerMemory) + if err != nil { + return err + } + pod.Annotations[MemoryAnnotationKey] = string(val) + } + + return nil +} + +func (ps *PodScaler) updateRequests(ctx context.Context, pod *corev1.Pod) error { + return ps.client.Update(ctx, pod) +} + +func printResources(res corev1.ResourceList) string { + cpu := res[corev1.ResourceCPU] + memory := res[corev1.ResourceMemory] + return fmt.Sprintf("cpu: %s, memory: %s", cpu.String(), memory.String()) +} diff --git a/manager/pod_scaler_test.go b/manager/pod_scaler_test.go new file mode 100644 index 0000000..8b1f978 --- /dev/null +++ b/manager/pod_scaler_test.go @@ -0,0 +1,132 @@ +package manager + +import ( + "context" + "log/slog" + "testing" + + v1 "github.com/ctrox/zeropod/api/shim/v1" + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client/fake" +) + +func TestHandlePod(t *testing.T) { + slog.SetLogLoggerLevel(slog.LevelDebug) + + scheme := runtime.NewScheme() + if err := corev1.AddToScheme(scheme); err != nil { + t.Fatal(err) + } + runningCPU, runningMemory := resource.MustParse("100m"), resource.MustParse("100Mi") + + cases := map[string]struct { + statusEventPhase v1.ContainerPhase + beforeEvent corev1.ResourceList + expected corev1.ResourceList + }{ + "running pod is not updated": { + statusEventPhase: v1.ContainerPhase_RUNNING, + beforeEvent: corev1.ResourceList{ + corev1.ResourceCPU: runningCPU, + corev1.ResourceMemory: runningMemory, + }, + expected: corev1.ResourceList{ + corev1.ResourceCPU: runningCPU, + corev1.ResourceMemory: runningMemory, + }, + }, + "running is updated when scaling down": { + statusEventPhase: v1.ContainerPhase_SCALED_DOWN, + beforeEvent: corev1.ResourceList{ + corev1.ResourceCPU: runningCPU, + corev1.ResourceMemory: runningMemory, + }, + expected: corev1.ResourceList{ + corev1.ResourceCPU: ScaledDownCPU, + corev1.ResourceMemory: ScaledDownMemory, + }, + }, + "scaled down pod is not updated": { + statusEventPhase: v1.ContainerPhase_SCALED_DOWN, + beforeEvent: corev1.ResourceList{ + corev1.ResourceCPU: ScaledDownCPU, + corev1.ResourceMemory: ScaledDownMemory, + }, + expected: corev1.ResourceList{ + corev1.ResourceCPU: ScaledDownCPU, + corev1.ResourceMemory: ScaledDownMemory, + }, + }, + "scaled down pod requests are restored when starting": { + statusEventPhase: v1.ContainerPhase_RUNNING, + beforeEvent: corev1.ResourceList{ + corev1.ResourceCPU: ScaledDownCPU, + corev1.ResourceMemory: ScaledDownMemory, + }, + expected: corev1.ResourceList{ + corev1.ResourceCPU: runningCPU, + corev1.ResourceMemory: runningMemory, + }, + }, + } + + for name, tc := range cases { + tc := tc + t.Run(name, func(t *testing.T) { + client := fake.NewClientBuilder().WithScheme(scheme).Build() + ps := &PodScaler{ + client: client, + } + + initialPod := newPod(corev1.ResourceList{corev1.ResourceCPU: runningCPU, corev1.ResourceMemory: runningMemory}) + ps.setAnnotations(initialPod) + pod := newPod(tc.beforeEvent) + pod.SetAnnotations(initialPod.GetAnnotations()) + + ctx := context.Background() + if err := client.Create(ctx, pod); err != nil { + t.Fatal(err) + } + + if err := ps.Handle( + context.Background(), + &v1.ContainerStatus{ + Name: pod.Spec.Containers[0].Name, + PodName: pod.Name, + PodNamespace: pod.Namespace, + Phase: tc.statusEventPhase, + }, + ); err != nil { + t.Fatal(err) + } + + if err := client.Get(ctx, types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}, pod); err != nil { + t.Fatal(err) + } + + assert.Equal(t, pod.Spec.Containers[0].Resources.Requests, tc.expected) + }) + } +} + +func newPod(req corev1.ResourceList) *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "scaled-pod", + Namespace: "default", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: "first-container", + Resources: corev1.ResourceRequirements{ + Requests: req, + }, + }}, + }, + } +} diff --git a/manager/status.go b/manager/status.go index 88248af..0279b8c 100644 --- a/manager/status.go +++ b/manager/status.go @@ -17,7 +17,17 @@ import ( "google.golang.org/protobuf/types/known/emptypb" ) -func StartSubscribers(ctx context.Context) error { +type StatusHandler interface { + Handle(context.Context, *v1.ContainerStatus) error +} + +func StartSubscribers(ctx context.Context, handlers ...StatusHandler) error { + if _, err := os.Stat(task.ShimSocketPath); errors.Is(err, os.ErrNotExist) { + if err := os.Mkdir(task.ShimSocketPath, os.ModePerm); err != nil { + return err + } + } + socks, err := os.ReadDir(task.ShimSocketPath) if err != nil { return fmt.Errorf("error listing file in shim socket path: %s", err) @@ -26,18 +36,18 @@ func StartSubscribers(ctx context.Context) error { for _, sock := range socks { sock := sock go func() { - if err := subscribe(ctx, filepath.Join(task.ShimSocketPath, sock.Name())); err != nil { + if err := subscribe(ctx, filepath.Join(task.ShimSocketPath, sock.Name()), handlers); err != nil { slog.Error("error subscribing", "sock", sock.Name(), "err", err) } }() } - go watchForShims(ctx) + go watchForShims(ctx, handlers) return nil } -func subscribe(ctx context.Context, sock string) error { +func subscribe(ctx context.Context, sock string, handlers []StatusHandler) error { log := slog.With("sock", sock) log.Info("subscribing to status events") @@ -64,15 +74,19 @@ func subscribe(ctx context.Context, sock string) error { } break } - slog.Info("received status", - "container", status.Name, "pod", status.PodName, + clog := slog.With("container", status.Name, "pod", status.PodName, "namespace", status.PodNamespace, "phase", status.Phase) + for _, h := range handlers { + if err := h.Handle(ctx, status); err != nil { + clog.Error("handling status update", "err", err) + } + } } return nil } -func watchForShims(ctx context.Context) error { +func watchForShims(ctx context.Context, handlers []StatusHandler) error { watcher, err := fsnotify.NewWatcher() if err != nil { return err @@ -88,7 +102,7 @@ func watchForShims(ctx context.Context) error { case event := <-watcher.Events: switch event.Op { case fsnotify.Create: - if err := subscribe(ctx, event.Name); err != nil { + if err := subscribe(ctx, event.Name, handlers); err != nil { slog.Error("error subscribing", "sock", event.Name, "err", err) } }