From 3ce745ef9c0af75751f402a939e138846844c844 Mon Sep 17 00:00:00 2001 From: YUN SUN Date: Tue, 4 Jun 2024 16:38:15 +0800 Subject: [PATCH 1/5] [YUNIKORN-2651] Update the unchecked error for make lint warnings (#850) Closes: #850 Signed-off-by: Chia-Ping Tsai --- .golangci.yml | 2 ++ pkg/admission/conf/am_conf.go | 9 ++++-- pkg/admission/namespace_cache.go | 11 +++++-- pkg/admission/namespace_cache_test.go | 6 ++-- pkg/admission/priority_class_cache.go | 11 +++++-- pkg/admission/priority_class_cache_test.go | 6 ++-- pkg/cache/context.go | 33 +++++++++++++++++---- pkg/client/apifactory.go | 32 +++++++++++++------- pkg/client/apifactory_mock.go | 7 +++-- pkg/cmd/admissioncontroller/main.go | 18 +++++++++-- pkg/shim/scheduler_mock_test.go | 4 --- test/e2e/framework/helpers/k8s/k8s_utils.go | 8 ++++- 12 files changed, 108 insertions(+), 39 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index 2d9bbc302..179df3820 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -35,6 +35,8 @@ linters-settings: local-prefixes: github.com/apache/yunikorn govet: check-shadowing: true + goconst: + min-occurrences: 5 funlen: lines: 120 statements: 80 diff --git a/pkg/admission/conf/am_conf.go b/pkg/admission/conf/am_conf.go index 154df9ced..f375af5e3 100644 --- a/pkg/admission/conf/am_conf.go +++ b/pkg/admission/conf/am_conf.go @@ -117,8 +117,13 @@ func NewAdmissionControllerConf(configMaps []*v1.ConfigMap) *AdmissionController return acc } -func (acc *AdmissionControllerConf) RegisterHandlers(configMaps informersv1.ConfigMapInformer) { - configMaps.Informer().AddEventHandler(&configMapUpdateHandler{conf: acc}) +func (acc *AdmissionControllerConf) RegisterHandlers(configMaps informersv1.ConfigMapInformer) error { + _, err := configMaps.Informer().AddEventHandler(&configMapUpdateHandler{conf: acc}) + if err != nil { + return fmt.Errorf("failed to create register handlers: %w", err) + } + + return nil } func (acc *AdmissionControllerConf) GetNamespace() string { diff --git a/pkg/admission/namespace_cache.go b/pkg/admission/namespace_cache.go index c5ff80654..963ffc27b 100644 --- a/pkg/admission/namespace_cache.go +++ b/pkg/admission/namespace_cache.go @@ -19,6 +19,8 @@ package admission import ( + "fmt" + v1 "k8s.io/api/core/v1" informersv1 "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/tools/cache" @@ -53,14 +55,17 @@ type nsFlags struct { } // NewNamespaceCache creates a new cache and registers the handler for the cache with the Informer. -func NewNamespaceCache(namespaces informersv1.NamespaceInformer) *NamespaceCache { +func NewNamespaceCache(namespaces informersv1.NamespaceInformer) (*NamespaceCache, error) { nsc := &NamespaceCache{ nameSpaces: make(map[string]nsFlags), } if namespaces != nil { - namespaces.Informer().AddEventHandler(&namespaceUpdateHandler{cache: nsc}) + _, err := namespaces.Informer().AddEventHandler(&namespaceUpdateHandler{cache: nsc}) + if err != nil { + return nil, fmt.Errorf("failed to create namespace cache: %w", err) + } } - return nsc + return nsc, nil } // enableYuniKorn returns the value for the enableYuniKorn flag (tri-state UNSET, TRUE or FALSE) for the namespace. diff --git a/pkg/admission/namespace_cache_test.go b/pkg/admission/namespace_cache_test.go index 792a2fcdc..5d9e2bcd1 100644 --- a/pkg/admission/namespace_cache_test.go +++ b/pkg/admission/namespace_cache_test.go @@ -35,7 +35,8 @@ import ( const testNS = "test-ns" func TestFlags(t *testing.T) { - cache := NewNamespaceCache(nil) + cache, nsErr := NewNamespaceCache(nil) + assert.NilError(t, nsErr) cache.nameSpaces["notset"] = nsFlags{ enableYuniKorn: UNSET, generateAppID: UNSET, @@ -69,7 +70,8 @@ func TestNamespaceHandlers(t *testing.T) { kubeClient := client.NewKubeClientMock(false) informers := NewInformers(kubeClient, "default") - cache := NewNamespaceCache(informers.Namespace) + cache, nsErr := NewNamespaceCache(informers.Namespace) + assert.NilError(t, nsErr) informers.Start() defer informers.Stop() diff --git a/pkg/admission/priority_class_cache.go b/pkg/admission/priority_class_cache.go index b640ce834..87a4dd873 100644 --- a/pkg/admission/priority_class_cache.go +++ b/pkg/admission/priority_class_cache.go @@ -19,6 +19,8 @@ package admission import ( + "fmt" + schedulingv1 "k8s.io/api/scheduling/v1" informersv1 "k8s.io/client-go/informers/scheduling/v1" "k8s.io/client-go/tools/cache" @@ -36,14 +38,17 @@ type PriorityClassCache struct { } // NewPriorityClassCache creates a new cache and registers the handler for the cache with the Informer. -func NewPriorityClassCache(priorityClasses informersv1.PriorityClassInformer) *PriorityClassCache { +func NewPriorityClassCache(priorityClasses informersv1.PriorityClassInformer) (*PriorityClassCache, error) { pcc := &PriorityClassCache{ priorityClasses: make(map[string]bool), } if priorityClasses != nil { - priorityClasses.Informer().AddEventHandler(&priorityClassUpdateHandler{cache: pcc}) + _, err := priorityClasses.Informer().AddEventHandler(&priorityClassUpdateHandler{cache: pcc}) + if err != nil { + return nil, fmt.Errorf("failed to create a new cache and register the handler: %w", err) + } } - return pcc + return pcc, nil } // isPreemptSelfAllowed returns the preemption value. Only returns false if configured. diff --git a/pkg/admission/priority_class_cache_test.go b/pkg/admission/priority_class_cache_test.go index 0fe287afd..de755eb5c 100644 --- a/pkg/admission/priority_class_cache_test.go +++ b/pkg/admission/priority_class_cache_test.go @@ -35,7 +35,8 @@ import ( const testPC = "test-pc" func TestIsPreemptSelfAllowed(t *testing.T) { - cache := NewPriorityClassCache(nil) + cache, pcErr := NewPriorityClassCache(nil) + assert.NilError(t, pcErr) cache.priorityClasses["yes"] = true cache.priorityClasses["no"] = false @@ -49,7 +50,8 @@ func TestPriorityClassHandlers(t *testing.T) { kubeClient := client.NewKubeClientMock(false) informers := NewInformers(kubeClient, "default") - cache := NewPriorityClassCache(informers.PriorityClass) + cache, pcErr := NewPriorityClassCache(informers.PriorityClass) + assert.NilError(t, pcErr) informers.Start() defer informers.Stop() diff --git a/pkg/cache/context.go b/pkg/cache/context.go index 20036eec2..a59b224eb 100644 --- a/pkg/cache/context.go +++ b/pkg/cache/context.go @@ -111,33 +111,50 @@ func NewContextWithBootstrapConfigMaps(apis client.APIProvider, bootstrapConfigM return ctx } -func (ctx *Context) AddSchedulingEventHandlers() { - ctx.apiProvider.AddEventHandler(&client.ResourceEventHandlers{ +func (ctx *Context) AddSchedulingEventHandlers() error { + err := ctx.apiProvider.AddEventHandler(&client.ResourceEventHandlers{ Type: client.ConfigMapInformerHandlers, FilterFn: ctx.filterConfigMaps, AddFn: ctx.addConfigMaps, UpdateFn: ctx.updateConfigMaps, DeleteFn: ctx.deleteConfigMaps, }) - ctx.apiProvider.AddEventHandler(&client.ResourceEventHandlers{ + if err != nil { + return err + } + + err = ctx.apiProvider.AddEventHandler(&client.ResourceEventHandlers{ Type: client.PriorityClassInformerHandlers, FilterFn: ctx.filterPriorityClasses, AddFn: ctx.addPriorityClass, UpdateFn: ctx.updatePriorityClass, DeleteFn: ctx.deletePriorityClass, }) - ctx.apiProvider.AddEventHandler(&client.ResourceEventHandlers{ + if err != nil { + return err + } + + err = ctx.apiProvider.AddEventHandler(&client.ResourceEventHandlers{ Type: client.NodeInformerHandlers, AddFn: ctx.addNode, UpdateFn: ctx.updateNode, DeleteFn: ctx.deleteNode, }) - ctx.apiProvider.AddEventHandler(&client.ResourceEventHandlers{ + if err != nil { + return err + } + + err = ctx.apiProvider.AddEventHandler(&client.ResourceEventHandlers{ Type: client.PodInformerHandlers, AddFn: ctx.AddPod, UpdateFn: ctx.UpdatePod, DeleteFn: ctx.DeletePod, }) + if err != nil { + return err + } + + return nil } func (ctx *Context) IsPluginMode() bool { @@ -1449,7 +1466,11 @@ func (ctx *Context) InitializeState() error { // Step 5: Start scheduling event handlers. At this point, initialization is mostly complete, and any existing // objects will show up as newly added objects. Since the add/update event handlers are idempotent, this is fine. - ctx.AddSchedulingEventHandlers() + err = ctx.AddSchedulingEventHandlers() + if err != nil { + log.Log(log.Admission).Error("failed to add scheduling event handlers", zap.Error(err)) + return err + } // Step 6: Finalize priority classes. Between the start of initialization and when the informer event handlers are // registered, it is possible that a priority class object was deleted. Process them again and remove diff --git a/pkg/client/apifactory.go b/pkg/client/apifactory.go index 62bdcb203..beedfbd91 100644 --- a/pkg/client/apifactory.go +++ b/pkg/client/apifactory.go @@ -19,6 +19,7 @@ package client import ( + "fmt" "time" "go.uber.org/zap" @@ -53,7 +54,7 @@ func (t Type) String() string { type APIProvider interface { GetAPIs() *Clients - AddEventHandler(handlers *ResourceEventHandlers) + AddEventHandler(handlers *ResourceEventHandlers) error Start() Stop() WaitForSync() @@ -143,7 +144,7 @@ func (s *APIFactory) IsTestingMode() bool { return s.testMode } -func (s *APIFactory) AddEventHandler(handlers *ResourceEventHandlers) { +func (s *APIFactory) AddEventHandler(handlers *ResourceEventHandlers) error { s.lock.Lock() defer s.lock.Unlock() // register all handlers @@ -166,34 +167,43 @@ func (s *APIFactory) AddEventHandler(handlers *ResourceEventHandlers) { } log.Log(log.ShimClient).Info("registering event handler", zap.Stringer("type", handlers.Type)) - s.addEventHandlers(handlers.Type, h, 0) + if err := s.addEventHandlers(handlers.Type, h, 0); err != nil { + return fmt.Errorf("failed to initialize event handlers: %w", err) + } + return nil } func (s *APIFactory) addEventHandlers( - handlerType Type, handler cache.ResourceEventHandler, resyncPeriod time.Duration) { + handlerType Type, handler cache.ResourceEventHandler, resyncPeriod time.Duration) error { + var err error switch handlerType { case PodInformerHandlers: - s.GetAPIs().PodInformer.Informer(). + _, err = s.GetAPIs().PodInformer.Informer(). AddEventHandlerWithResyncPeriod(handler, resyncPeriod) case NodeInformerHandlers: - s.GetAPIs().NodeInformer.Informer(). + _, err = s.GetAPIs().NodeInformer.Informer(). AddEventHandlerWithResyncPeriod(handler, resyncPeriod) case ConfigMapInformerHandlers: - s.GetAPIs().ConfigMapInformer.Informer(). + _, err = s.GetAPIs().ConfigMapInformer.Informer(). AddEventHandlerWithResyncPeriod(handler, resyncPeriod) case StorageInformerHandlers: - s.GetAPIs().StorageInformer.Informer(). + _, err = s.GetAPIs().StorageInformer.Informer(). AddEventHandlerWithResyncPeriod(handler, resyncPeriod) case PVInformerHandlers: - s.GetAPIs().PVInformer.Informer(). + _, err = s.GetAPIs().PVInformer.Informer(). AddEventHandlerWithResyncPeriod(handler, resyncPeriod) case PVCInformerHandlers: - s.GetAPIs().PVCInformer.Informer(). + _, err = s.GetAPIs().PVCInformer.Informer(). AddEventHandlerWithResyncPeriod(handler, resyncPeriod) case PriorityClassInformerHandlers: - s.GetAPIs().PriorityClassInformer.Informer(). + _, err = s.GetAPIs().PriorityClassInformer.Informer(). AddEventHandlerWithResyncPeriod(handler, resyncPeriod) } + + if err != nil { + return fmt.Errorf("failed to add event handlers: %w", err) + } + return nil } func (s *APIFactory) WaitForSync() { diff --git a/pkg/client/apifactory_mock.go b/pkg/client/apifactory_mock.go index 733923901..fee2d2579 100644 --- a/pkg/client/apifactory_mock.go +++ b/pkg/client/apifactory_mock.go @@ -19,6 +19,7 @@ package client import ( + "fmt" "time" "go.uber.org/zap" @@ -219,16 +220,18 @@ func (m *MockedAPIProvider) IsTestingMode() bool { return true } -func (m *MockedAPIProvider) AddEventHandler(handlers *ResourceEventHandlers) { +func (m *MockedAPIProvider) AddEventHandler(handlers *ResourceEventHandlers) error { m.Lock() defer m.Unlock() if !m.running { - return + return fmt.Errorf("mocked API provider is not running") } m.eventHandler <- handlers log.Log(log.Test).Info("registering event handler", zap.Stringer("type", handlers.Type)) + + return nil } func (m *MockedAPIProvider) RunEventHandler() { diff --git a/pkg/cmd/admissioncontroller/main.go b/pkg/cmd/admissioncontroller/main.go index 815f92947..08f2771e2 100644 --- a/pkg/cmd/admissioncontroller/main.go +++ b/pkg/cmd/admissioncontroller/main.go @@ -62,9 +62,21 @@ func main() { kubeClient := client.NewKubeClient(amConf.GetKubeConfig()) informers := admission.NewInformers(kubeClient, amConf.GetNamespace()) - amConf.RegisterHandlers(informers.ConfigMap) - pcCache := admission.NewPriorityClassCache(informers.PriorityClass) - nsCache := admission.NewNamespaceCache(informers.Namespace) + + if hadlerErr := amConf.RegisterHandlers(informers.ConfigMap); hadlerErr != nil { + log.Log(log.Admission).Fatal("Failed to register handlers", zap.Error(hadlerErr)) + return + } + pcCache, pcErr := admission.NewPriorityClassCache(informers.PriorityClass) + if pcErr != nil { + log.Log(log.Admission).Fatal("Failed to create new priority class cache", zap.Error(pcErr)) + return + } + nsCache, nsErr := admission.NewNamespaceCache(informers.Namespace) + if nsErr != nil { + log.Log(log.Admission).Fatal("Failed to create namespace cache", zap.Error(nsErr)) + return + } informers.Start() wm, err := admission.NewWebhookManager(amConf) diff --git a/pkg/shim/scheduler_mock_test.go b/pkg/shim/scheduler_mock_test.go index 6b6b9af64..1bbe5f02c 100644 --- a/pkg/shim/scheduler_mock_test.go +++ b/pkg/shim/scheduler_mock_test.go @@ -162,10 +162,6 @@ func (fc *MockScheduler) waitAndAssertApplicationState(t *testing.T, appID, expe } } -func (fc *MockScheduler) removeApplication(appId string) error { - return fc.context.RemoveApplication(appId) -} - func (fc *MockScheduler) waitAndAssertTaskState(t *testing.T, appID, taskID, expectedState string) { app := fc.context.GetApplication(appID) assert.Equal(t, app != nil, true) diff --git a/test/e2e/framework/helpers/k8s/k8s_utils.go b/test/e2e/framework/helpers/k8s/k8s_utils.go index 92fb07a83..77a472f57 100644 --- a/test/e2e/framework/helpers/k8s/k8s_utils.go +++ b/test/e2e/framework/helpers/k8s/k8s_utils.go @@ -32,6 +32,7 @@ import ( "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" + "go.uber.org/zap" appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" @@ -58,6 +59,7 @@ import ( "github.com/apache/yunikorn-k8shim/pkg/common/constants" "github.com/apache/yunikorn-k8shim/pkg/common/utils" "github.com/apache/yunikorn-k8shim/pkg/locking" + "github.com/apache/yunikorn-k8shim/pkg/log" "github.com/apache/yunikorn-k8shim/test/e2e/framework/configmanager" "github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/common" ) @@ -716,7 +718,11 @@ func (k *KubeCtl) StartConfigMapInformer(namespace string, stopChan <-chan struc informerFactory := informers.NewSharedInformerFactoryWithOptions(k.clientSet, 0, informers.WithNamespace(namespace)) informerFactory.Start(stopChan) configMapInformer := informerFactory.Core().V1().ConfigMaps() - configMapInformer.Informer().AddEventHandler(eventHandler) + _, err := configMapInformer.Informer().AddEventHandler(eventHandler) + if err != nil { + log.Log(log.AdmissionConf).Error("Error adding event handler", zap.Error(err)) + return err + } go configMapInformer.Informer().Run(stopChan) if err := utils.WaitForCondition(func() bool { return configMapInformer.Informer().HasSynced() From 36111c41d97658e168e640c284fe8d71921883b4 Mon Sep 17 00:00:00 2001 From: 0lai0 Date: Tue, 4 Jun 2024 10:51:52 +0200 Subject: [PATCH 2/5] [YUNIKORN-2654] Remove unused code in k8shim context Closes: #854 Signed-off-by: Peter Bacsko --- pkg/cache/context.go | 23 ----------------------- 1 file changed, 23 deletions(-) diff --git a/pkg/cache/context.go b/pkg/cache/context.go index a59b224eb..550eaf620 100644 --- a/pkg/cache/context.go +++ b/pkg/cache/context.go @@ -904,29 +904,6 @@ func (ctx *Context) StartPodAllocation(podKey string, nodeID string) bool { return ctx.schedulerCache.StartPodAllocation(podKey, nodeID) } -// inform the scheduler that the application is completed, -// the complete state may further explained to completed_with_errors(failed) or successfully_completed, -// either way we need to release all allocations (if exists) for this application -func (ctx *Context) NotifyApplicationComplete(appID string) { - if app := ctx.GetApplication(appID); app != nil { - log.Log(log.ShimContext).Debug("NotifyApplicationComplete", - zap.String("appID", appID), - zap.String("currentAppState", app.GetApplicationState())) - ev := NewSimpleApplicationEvent(appID, CompleteApplication) - dispatcher.Dispatch(ev) - } -} - -func (ctx *Context) NotifyApplicationFail(appID string) { - if app := ctx.GetApplication(appID); app != nil { - log.Log(log.ShimContext).Debug("NotifyApplicationFail", - zap.String("appID", appID), - zap.String("currentAppState", app.GetApplicationState())) - ev := NewSimpleApplicationEvent(appID, FailApplication) - dispatcher.Dispatch(ev) - } -} - func (ctx *Context) NotifyTaskComplete(appID, taskID string) { ctx.lock.Lock() defer ctx.lock.Unlock() From 0bb441db5789c07cfff6eee8ab05a3faf03c8719 Mon Sep 17 00:00:00 2001 From: 0lai0 Date: Thu, 6 Jun 2024 13:36:04 +0200 Subject: [PATCH 3/5] Complete web_server_test#TestProxy (#853) Closes: #853 Signed-off-by: Peter Bacsko --- pkg/webtest/web_server_test.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/pkg/webtest/web_server_test.go b/pkg/webtest/web_server_test.go index a4a2a2c73..1e8383e6a 100644 --- a/pkg/webtest/web_server_test.go +++ b/pkg/webtest/web_server_test.go @@ -78,7 +78,3 @@ func TestWebServer(t *testing.T) { str = string(body) assert.Check(t, strings.Contains(str, "OK"), "test string not found") } - -func TestProxy(t *testing.T) { - -} From ccfd01e5584f977be6f030d01dcf046af790381a Mon Sep 17 00:00:00 2001 From: Jacob Salway Date: Thu, 6 Jun 2024 18:13:12 +0200 Subject: [PATCH 4/5] [YUNIKORN-2561] Add TopologySpreadConstraints to placeholder pods (#845) Closes: #845 Signed-off-by: Peter Bacsko --- pkg/cache/amprotocol.go | 17 +++++++++-------- pkg/cache/placeholder.go | 13 +++++++------ pkg/cache/placeholder_test.go | 30 ++++++++++++++++++++++++++++++ 3 files changed, 46 insertions(+), 14 deletions(-) diff --git a/pkg/cache/amprotocol.go b/pkg/cache/amprotocol.go index 5f2dd31c6..076ca7165 100644 --- a/pkg/cache/amprotocol.go +++ b/pkg/cache/amprotocol.go @@ -45,14 +45,15 @@ type ApplicationMetadata struct { } type TaskGroup struct { - Name string - MinMember int32 - Labels map[string]string - Annotations map[string]string - MinResource map[string]resource.Quantity - NodeSelector map[string]string - Tolerations []v1.Toleration - Affinity *v1.Affinity + Name string + MinMember int32 + Labels map[string]string + Annotations map[string]string + MinResource map[string]resource.Quantity + NodeSelector map[string]string + Tolerations []v1.Toleration + Affinity *v1.Affinity + TopologySpreadConstraints []v1.TopologySpreadConstraint } type TaskMetadata struct { diff --git a/pkg/cache/placeholder.go b/pkg/cache/placeholder.go index 80562c78a..8235bf11c 100644 --- a/pkg/cache/placeholder.go +++ b/pkg/cache/placeholder.go @@ -113,12 +113,13 @@ func newPlaceholder(placeholderName string, app *Application, taskGroup TaskGrou }, }, }, - RestartPolicy: constants.PlaceholderPodRestartPolicy, - SchedulerName: constants.SchedulerName, - NodeSelector: taskGroup.NodeSelector, - Tolerations: taskGroup.Tolerations, - Affinity: taskGroup.Affinity, - PriorityClassName: priorityClassName, + RestartPolicy: constants.PlaceholderPodRestartPolicy, + SchedulerName: constants.SchedulerName, + NodeSelector: taskGroup.NodeSelector, + Tolerations: taskGroup.Tolerations, + Affinity: taskGroup.Affinity, + TopologySpreadConstraints: taskGroup.TopologySpreadConstraints, + PriorityClassName: priorityClassName, }, } diff --git a/pkg/cache/placeholder_test.go b/pkg/cache/placeholder_test.go index b6cfed150..35914af38 100644 --- a/pkg/cache/placeholder_test.go +++ b/pkg/cache/placeholder_test.go @@ -88,6 +88,19 @@ var taskGroups = []TaskGroup{ }, }, }, + TopologySpreadConstraints: []v1.TopologySpreadConstraint{ + { + MaxSkew: 1, + TopologyKey: v1.LabelTopologyZone, + WhenUnsatisfiable: v1.DoNotSchedule, + LabelSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "labelKey0": "labelKeyValue0", + "labelKey1": "labelKeyValue1", + }, + }, + }, + }, }, } @@ -265,3 +278,20 @@ func TestNewPlaceholderWithPriorityClassName(t *testing.T) { assert.Equal(t, priority, holder.pod.Spec.Priority) assert.Equal(t, priorityClassName, holder.pod.Spec.PriorityClassName) } + +func TestNewPlaceholderWithTopologySpreadConstraints(t *testing.T) { + mockedSchedulerAPI := newMockSchedulerAPI() + app := NewApplication(appID, queue, + "bob", testGroups, map[string]string{constants.AppTagNamespace: namespace}, mockedSchedulerAPI) + app.setTaskGroups(taskGroups) + + holder := newPlaceholder("ph-name", app, app.taskGroups[0]) + assert.Equal(t, len(holder.pod.Spec.TopologySpreadConstraints), 1) + assert.Equal(t, holder.pod.Spec.TopologySpreadConstraints[0].MaxSkew, int32(1)) + assert.Equal(t, holder.pod.Spec.TopologySpreadConstraints[0].TopologyKey, v1.LabelTopologyZone) + assert.Equal(t, holder.pod.Spec.TopologySpreadConstraints[0].WhenUnsatisfiable, v1.DoNotSchedule) + assert.DeepEqual(t, holder.pod.Spec.TopologySpreadConstraints[0].LabelSelector.MatchLabels, map[string]string{ + "labelKey0": "labelKeyValue0", + "labelKey1": "labelKeyValue1", + }) +} From d8b8a7adb15985144941f5caaf5a944ebe5aaad7 Mon Sep 17 00:00:00 2001 From: Yu-Lin Chen Date: Fri, 7 Jun 2024 08:27:07 +0000 Subject: [PATCH 5/5] [YUNIKORN-2631] Support canonical labels for queue/applicationId in Admission Controller --- pkg/admission/admission_controller.go | 2 +- pkg/admission/admission_controller_test.go | 32 +++++++++++----- pkg/admission/util.go | 17 ++++++++- pkg/admission/util_test.go | 44 +++++++++++++++------- pkg/common/constants/constants.go | 2 + 5 files changed, 71 insertions(+), 26 deletions(-) diff --git a/pkg/admission/admission_controller.go b/pkg/admission/admission_controller.go index 3b058da6b..35c3fecb0 100644 --- a/pkg/admission/admission_controller.go +++ b/pkg/admission/admission_controller.go @@ -322,7 +322,7 @@ func (c *AdmissionController) processPodUpdate(req *admissionv1.AdmissionRequest func (c *AdmissionController) shouldProcessAdmissionReview(namespace string, labels map[string]string) bool { if c.shouldProcessNamespace(namespace) && - (labels[constants.LabelApplicationID] != "" || labels[constants.SparkLabelAppID] != "" || c.shouldLabelNamespace(namespace)) { + (labels[constants.CanonicalLabelApplicationID] != "" || labels[constants.LabelApplicationID] != "" || labels[constants.SparkLabelAppID] != "" || c.shouldLabelNamespace(namespace)) { return true } diff --git a/pkg/admission/admission_controller_test.go b/pkg/admission/admission_controller_test.go index 88bf8d6a8..e5c66189e 100644 --- a/pkg/admission/admission_controller_test.go +++ b/pkg/admission/admission_controller_test.go @@ -81,9 +81,11 @@ func TestUpdateLabels(t *testing.T) { assert.Equal(t, patch[0].Op, "add") assert.Equal(t, patch[0].Path, "/metadata/labels") if updatedMap, ok := patch[0].Value.(map[string]string); ok { - assert.Equal(t, len(updatedMap), 3) + assert.Equal(t, len(updatedMap), 5) assert.Equal(t, updatedMap["random"], "random") + assert.Equal(t, updatedMap[constants.CanonicalLabelQueueName], "root.default") assert.Equal(t, updatedMap[constants.LabelQueueName], "root.default") + assert.Equal(t, strings.HasPrefix(updatedMap[constants.CanonicalLabelApplicationID], constants.AutoGenAppPrefix), true) assert.Equal(t, strings.HasPrefix(updatedMap[constants.LabelApplicationID], constants.AutoGenAppPrefix), true) } else { t.Fatal("patch info content is not as expected") @@ -104,8 +106,8 @@ func TestUpdateLabels(t *testing.T) { UID: "7f5fd6c5d5", ResourceVersion: "10654", Labels: map[string]string{ - "random": "random", - constants.LabelApplicationID: "app-0001", + "random": "random", + constants.CanonicalLabelApplicationID: "app-0001", }, }, Spec: v1.PodSpec{}, @@ -117,9 +119,11 @@ func TestUpdateLabels(t *testing.T) { assert.Equal(t, patch[0].Op, "add") assert.Equal(t, patch[0].Path, "/metadata/labels") if updatedMap, ok := patch[0].Value.(map[string]string); ok { - assert.Equal(t, len(updatedMap), 3) + assert.Equal(t, len(updatedMap), 5) assert.Equal(t, updatedMap["random"], "random") + assert.Equal(t, updatedMap[constants.CanonicalLabelQueueName], "root.default") assert.Equal(t, updatedMap[constants.LabelQueueName], "root.default") + assert.Equal(t, updatedMap[constants.CanonicalLabelApplicationID], "app-0001") assert.Equal(t, updatedMap[constants.LabelApplicationID], "app-0001") } else { t.Fatal("patch info content is not as expected") @@ -140,8 +144,8 @@ func TestUpdateLabels(t *testing.T) { UID: "7f5fd6c5d5", ResourceVersion: "10654", Labels: map[string]string{ - "random": "random", - constants.LabelQueueName: "root.abc", + "random": "random", + constants.CanonicalLabelQueueName: "root.abc", }, }, Spec: v1.PodSpec{}, @@ -154,9 +158,11 @@ func TestUpdateLabels(t *testing.T) { assert.Equal(t, patch[0].Op, "add") assert.Equal(t, patch[0].Path, "/metadata/labels") if updatedMap, ok := patch[0].Value.(map[string]string); ok { - assert.Equal(t, len(updatedMap), 3) + assert.Equal(t, len(updatedMap), 5) assert.Equal(t, updatedMap["random"], "random") + assert.Equal(t, updatedMap[constants.CanonicalLabelQueueName], "root.abc") assert.Equal(t, updatedMap[constants.LabelQueueName], "root.abc") + assert.Equal(t, strings.HasPrefix(updatedMap[constants.CanonicalLabelApplicationID], constants.AutoGenAppPrefix), true) assert.Equal(t, strings.HasPrefix(updatedMap[constants.LabelApplicationID], constants.AutoGenAppPrefix), true) } else { t.Fatal("patch info content is not as expected") @@ -186,8 +192,10 @@ func TestUpdateLabels(t *testing.T) { assert.Equal(t, patch[0].Op, "add") assert.Equal(t, patch[0].Path, "/metadata/labels") if updatedMap, ok := patch[0].Value.(map[string]string); ok { - assert.Equal(t, len(updatedMap), 2) + assert.Equal(t, len(updatedMap), 4) + assert.Equal(t, updatedMap[constants.CanonicalLabelQueueName], "root.default") assert.Equal(t, updatedMap[constants.LabelQueueName], "root.default") + assert.Equal(t, strings.HasPrefix(updatedMap[constants.CanonicalLabelApplicationID], constants.AutoGenAppPrefix), true) assert.Equal(t, strings.HasPrefix(updatedMap[constants.LabelApplicationID], constants.AutoGenAppPrefix), true) } else { t.Fatal("patch info content is not as expected") @@ -214,8 +222,10 @@ func TestUpdateLabels(t *testing.T) { assert.Equal(t, patch[0].Op, "add") assert.Equal(t, patch[0].Path, "/metadata/labels") if updatedMap, ok := patch[0].Value.(map[string]string); ok { - assert.Equal(t, len(updatedMap), 2) + assert.Equal(t, len(updatedMap), 4) + assert.Equal(t, updatedMap[constants.CanonicalLabelQueueName], "root.default") assert.Equal(t, updatedMap[constants.LabelQueueName], "root.default") + assert.Equal(t, strings.HasPrefix(updatedMap[constants.CanonicalLabelApplicationID], constants.AutoGenAppPrefix), true) assert.Equal(t, strings.HasPrefix(updatedMap[constants.LabelApplicationID], constants.AutoGenAppPrefix), true) } else { t.Fatal("patch info content is not as expected") @@ -240,8 +250,10 @@ func TestUpdateLabels(t *testing.T) { assert.Equal(t, patch[0].Op, "add") assert.Equal(t, patch[0].Path, "/metadata/labels") if updatedMap, ok := patch[0].Value.(map[string]string); ok { - assert.Equal(t, len(updatedMap), 2) + assert.Equal(t, len(updatedMap), 4) + assert.Equal(t, updatedMap[constants.CanonicalLabelQueueName], "root.default") assert.Equal(t, updatedMap[constants.LabelQueueName], "root.default") + assert.Equal(t, strings.HasPrefix(updatedMap[constants.CanonicalLabelApplicationID], constants.AutoGenAppPrefix), true) assert.Equal(t, strings.HasPrefix(updatedMap[constants.LabelApplicationID], constants.AutoGenAppPrefix), true) } else { t.Fatal("patch info content is not as expected") diff --git a/pkg/admission/util.go b/pkg/admission/util.go index fe994da72..5276b5b61 100644 --- a/pkg/admission/util.go +++ b/pkg/admission/util.go @@ -35,10 +35,11 @@ func updatePodLabel(pod *v1.Pod, namespace string, generateUniqueAppIds bool, de result[k] = v } + canonicalAppID := utils.GetPodLabelValue(pod, constants.CanonicalLabelApplicationID) sparkAppID := utils.GetPodLabelValue(pod, constants.SparkLabelAppID) labelAppID := utils.GetPodLabelValue(pod, constants.LabelApplicationID) annotationAppID := utils.GetPodAnnotationValue(pod, constants.AnnotationApplicationID) - if sparkAppID == "" && labelAppID == "" && annotationAppID == "" { + if canonicalAppID == "" && sparkAppID == "" && labelAppID == "" && annotationAppID == "" { // if app id not exist, generate one // for each namespace, we group unnamed pods to one single app - if GenerateUniqueAppId is not set // if GenerateUniqueAppId: @@ -46,19 +47,31 @@ func updatePodLabel(pod *v1.Pod, namespace string, generateUniqueAppIds bool, de // else // application ID convention: ${AUTO_GEN_PREFIX}-${NAMESPACE}-${AUTO_GEN_SUFFIX} generatedID := utils.GenerateApplicationID(namespace, generateUniqueAppIds, string(pod.UID)) + + result[constants.CanonicalLabelApplicationID] = generatedID + // Deprecated: After 1.7.0, admission controller will only add canonical label if application ID was not set result[constants.LabelApplicationID] = generatedID + } else if canonicalAppID != "" { + // Deprecated: Added in 1.6.0 for backward compatibility, in case the prior shim version can't handle canonical label + result[constants.LabelApplicationID] = canonicalAppID } + canonicalQueueName := utils.GetPodLabelValue(pod, constants.CanonicalLabelQueueName) labelQueueName := utils.GetPodLabelValue(pod, constants.LabelQueueName) annotationQueueName := utils.GetPodAnnotationValue(pod, constants.AnnotationQueueName) - if labelQueueName == "" && annotationQueueName == "" { + if canonicalQueueName == "" && labelQueueName == "" && annotationQueueName == "" { // if queueName not exist, generate one // if defaultQueueName is "", skip adding default queue name to the pod labels if defaultQueueName != "" { // for undefined configuration, am_conf will add 'root.default' to retain existing behavior // if a custom name is configured for default queue, it will be used instead of root.default + result[constants.CanonicalLabelQueueName] = defaultQueueName + // Deprecated: After 1.7.0, admission controller will only add canonical label if queue was not set result[constants.LabelQueueName] = defaultQueueName } + } else if canonicalQueueName != "" { + // Deprecated: Added in 1.6.0 for backward compatibility, in case the prior shim version can't handle canonical label + result[constants.LabelQueueName] = canonicalQueueName } return result diff --git a/pkg/admission/util_test.go b/pkg/admission/util_test.go index fa28c824c..2181bd5a1 100644 --- a/pkg/admission/util_test.go +++ b/pkg/admission/util_test.go @@ -70,8 +70,8 @@ func createTestingPodWithMeta() *v1.Pod { func createTestingPodWithLabels(appId string, queue string) *v1.Pod { pod := createTestingPodWithMeta() - pod.ObjectMeta.Labels[constants.LabelApplicationID] = appId - pod.ObjectMeta.Labels[constants.LabelQueueName] = queue + pod.ObjectMeta.Labels[constants.CanonicalLabelApplicationID] = appId + pod.ObjectMeta.Labels[constants.CanonicalLabelQueueName] = queue return pod } @@ -111,21 +111,25 @@ func TestUpdatePodLabelForAdmissionController(t *testing.T) { // we generate new appId/queue labels pod := createTestingPodWithMeta() if result := updatePodLabel(pod, "default", false, defaultQueueName); result != nil { - assert.Equal(t, len(result), 3) + assert.Equal(t, len(result), 5) assert.Equal(t, result["random"], "random") + assert.Equal(t, strings.HasPrefix(result[constants.CanonicalLabelApplicationID], constants.AutoGenAppPrefix), true) assert.Equal(t, strings.HasPrefix(result[constants.LabelApplicationID], constants.AutoGenAppPrefix), true) + assert.Equal(t, result[constants.CanonicalLabelQueueName], defaultQueueName) assert.Equal(t, result[constants.LabelQueueName], defaultQueueName) } else { t.Fatal("UpdatePodLabelForAdmissionController is not as expected") } - // verify if applicationId and queue is given in the labels, - // we won't modify it + // verify if appId/queue is given in the canonical labels + // we won't modify the value and will add it to non-canonical label for backward compatibility pod = createTestingPodWithLabels(dummyAppId, dummyQueueName) if result := updatePodLabel(pod, "default", false, defaultQueueName); result != nil { - assert.Equal(t, len(result), 3) + assert.Equal(t, len(result), 5) assert.Equal(t, result["random"], "random") + assert.Equal(t, result[constants.CanonicalLabelApplicationID], dummyAppId) assert.Equal(t, result[constants.LabelApplicationID], dummyAppId) + assert.Equal(t, result[constants.CanonicalLabelQueueName], dummyQueueName) assert.Equal(t, result[constants.LabelQueueName], dummyQueueName) } else { t.Fatal("UpdatePodLabelForAdmissionController is not as expected") @@ -147,8 +151,10 @@ func TestUpdatePodLabelForAdmissionController(t *testing.T) { pod = createTestingPodNoNamespaceAndLabels() if result := updatePodLabel(pod, "default", false, defaultQueueName); result != nil { - assert.Equal(t, len(result), 2) + assert.Equal(t, len(result), 4) + assert.Equal(t, result[constants.CanonicalLabelQueueName], defaultQueueName) assert.Equal(t, result[constants.LabelQueueName], defaultQueueName) + assert.Equal(t, strings.HasPrefix(result[constants.CanonicalLabelApplicationID], constants.AutoGenAppPrefix), true) assert.Equal(t, strings.HasPrefix(result[constants.LabelApplicationID], constants.AutoGenAppPrefix), true) } else { t.Fatal("UpdatePodLabelForAdmissionController is not as expected") @@ -157,8 +163,10 @@ func TestUpdatePodLabelForAdmissionController(t *testing.T) { // pod name might be empty, it can comes from generatedName pod = createTestingPodWithGenerateName() if result := updatePodLabel(pod, "default", false, defaultQueueName); result != nil { - assert.Equal(t, len(result), 2) + assert.Equal(t, len(result), 4) + assert.Equal(t, result[constants.CanonicalLabelQueueName], defaultQueueName) assert.Equal(t, result[constants.LabelQueueName], defaultQueueName) + assert.Equal(t, strings.HasPrefix(result[constants.CanonicalLabelApplicationID], constants.AutoGenAppPrefix), true) assert.Equal(t, strings.HasPrefix(result[constants.LabelApplicationID], constants.AutoGenAppPrefix), true) } else { t.Fatal("UpdatePodLabelForAdmissionController is not as expected") @@ -166,8 +174,10 @@ func TestUpdatePodLabelForAdmissionController(t *testing.T) { pod = createMinimalTestingPod() if result := updatePodLabel(pod, "default", false, defaultQueueName); result != nil { - assert.Equal(t, len(result), 2) + assert.Equal(t, len(result), 4) + assert.Equal(t, result[constants.CanonicalLabelQueueName], defaultQueueName) assert.Equal(t, result[constants.LabelQueueName], defaultQueueName) + assert.Equal(t, strings.HasPrefix(result[constants.CanonicalLabelApplicationID], constants.AutoGenAppPrefix), true) assert.Equal(t, strings.HasPrefix(result[constants.LabelApplicationID], constants.AutoGenAppPrefix), true) } else { t.Fatal("UpdatePodLabelForAdmissionController is not as expected") @@ -178,9 +188,11 @@ func TestDefaultQueueName(t *testing.T) { defaultConf := createConfig() pod := createTestingPodWithMeta() if result := updatePodLabel(pod, defaultConf.GetNamespace(), defaultConf.GetGenerateUniqueAppIds(), defaultConf.GetDefaultQueueName()); result != nil { - assert.Equal(t, len(result), 3) + assert.Equal(t, len(result), 5) assert.Equal(t, result["random"], "random") + assert.Equal(t, result[constants.CanonicalLabelApplicationID], "yunikorn-default-autogen") assert.Equal(t, result[constants.LabelApplicationID], "yunikorn-default-autogen") + assert.Equal(t, result[constants.CanonicalLabelQueueName], "root.default") assert.Equal(t, result[constants.LabelQueueName], "root.default") } else { t.Fatal("UpdatePodLabelForAdmissionController is not as expected") @@ -190,9 +202,11 @@ func TestDefaultQueueName(t *testing.T) { conf.AMFilteringDefaultQueueName: "", }) if result := updatePodLabel(pod, queueNameEmptyConf.GetNamespace(), queueNameEmptyConf.GetGenerateUniqueAppIds(), queueNameEmptyConf.GetDefaultQueueName()); result != nil { - assert.Equal(t, len(result), 2) + assert.Equal(t, len(result), 3) assert.Equal(t, result["random"], "random") + assert.Equal(t, result[constants.CanonicalLabelApplicationID], "yunikorn-default-autogen") assert.Equal(t, result[constants.LabelApplicationID], "yunikorn-default-autogen") + assert.Equal(t, result[constants.CanonicalLabelQueueName], "") assert.Equal(t, result[constants.LabelQueueName], "") } else { t.Fatal("UpdatePodLabelForAdmissionController is not as expected") @@ -202,9 +216,11 @@ func TestDefaultQueueName(t *testing.T) { conf.AMFilteringDefaultQueueName: "yunikorn", }) if result := updatePodLabel(pod, customQueueNameConf.GetNamespace(), customQueueNameConf.GetGenerateUniqueAppIds(), customQueueNameConf.GetDefaultQueueName()); result != nil { - assert.Equal(t, len(result), 3) + assert.Equal(t, len(result), 5) assert.Equal(t, result["random"], "random") + assert.Equal(t, result[constants.CanonicalLabelApplicationID], "yunikorn-default-autogen") assert.Equal(t, result[constants.LabelApplicationID], "yunikorn-default-autogen") + assert.Assert(t, result[constants.CanonicalLabelQueueName] != "yunikorn") assert.Assert(t, result[constants.LabelQueueName] != "yunikorn") } else { t.Fatal("UpdatePodLabelForAdmissionController is not as expected") @@ -215,9 +231,11 @@ func TestDefaultQueueName(t *testing.T) { }) if result := updatePodLabel(pod, customValidQueueNameConf.GetNamespace(), customValidQueueNameConf.GetGenerateUniqueAppIds(), customValidQueueNameConf.GetDefaultQueueName()); result != nil { - assert.Equal(t, len(result), 3) + assert.Equal(t, len(result), 5) assert.Equal(t, result["random"], "random") + assert.Equal(t, result[constants.CanonicalLabelApplicationID], "yunikorn-default-autogen") assert.Equal(t, result[constants.LabelApplicationID], "yunikorn-default-autogen") + assert.Equal(t, result[constants.CanonicalLabelQueueName], "root.yunikorn") assert.Equal(t, result[constants.LabelQueueName], "root.yunikorn") } else { t.Fatal("UpdatePodLabelForAdmissionController is not as expected") diff --git a/pkg/common/constants/constants.go b/pkg/common/constants/constants.go index 7a4c415d4..bdee70a8e 100644 --- a/pkg/common/constants/constants.go +++ b/pkg/common/constants/constants.go @@ -38,9 +38,11 @@ const DomainYuniKornInternal = siCommon.DomainYuniKornInternal const LabelApp = "app" const LabelApplicationID = "applicationId" const AnnotationApplicationID = DomainYuniKorn + "app-id" +const CanonicalLabelApplicationID = DomainYuniKorn + "app-id" const LabelQueueName = "queue" const RootQueue = "root" const AnnotationQueueName = DomainYuniKorn + "queue" +const CanonicalLabelQueueName = DomainYuniKorn + "queue" const AnnotationParentQueue = DomainYuniKorn + "parentqueue" const ApplicationDefaultQueue = "root.default" const DefaultPartition = "default"