Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[YUNIKORN-2631] Support canonical labels for queue/applicationId in Admission Controller #46

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ linters-settings:
local-prefixes: github.com/apache/yunikorn
govet:
check-shadowing: true
goconst:
min-occurrences: 5
funlen:
lines: 120
statements: 80
Expand Down
2 changes: 1 addition & 1 deletion pkg/admission/admission_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func (c *AdmissionController) processPodUpdate(req *admissionv1.AdmissionRequest

func (c *AdmissionController) shouldProcessAdmissionReview(namespace string, labels map[string]string) bool {
if c.shouldProcessNamespace(namespace) &&
(labels[constants.LabelApplicationID] != "" || labels[constants.SparkLabelAppID] != "" || c.shouldLabelNamespace(namespace)) {
(labels[constants.CanonicalLabelApplicationID] != "" || labels[constants.LabelApplicationID] != "" || labels[constants.SparkLabelAppID] != "" || c.shouldLabelNamespace(namespace)) {
return true
}

Expand Down
32 changes: 22 additions & 10 deletions pkg/admission/admission_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,11 @@ func TestUpdateLabels(t *testing.T) {
assert.Equal(t, patch[0].Op, "add")
assert.Equal(t, patch[0].Path, "/metadata/labels")
if updatedMap, ok := patch[0].Value.(map[string]string); ok {
assert.Equal(t, len(updatedMap), 3)
assert.Equal(t, len(updatedMap), 5)
assert.Equal(t, updatedMap["random"], "random")
assert.Equal(t, updatedMap[constants.CanonicalLabelQueueName], "root.default")
assert.Equal(t, updatedMap[constants.LabelQueueName], "root.default")
assert.Equal(t, strings.HasPrefix(updatedMap[constants.CanonicalLabelApplicationID], constants.AutoGenAppPrefix), true)
assert.Equal(t, strings.HasPrefix(updatedMap[constants.LabelApplicationID], constants.AutoGenAppPrefix), true)
} else {
t.Fatal("patch info content is not as expected")
Expand All @@ -104,8 +106,8 @@ func TestUpdateLabels(t *testing.T) {
UID: "7f5fd6c5d5",
ResourceVersion: "10654",
Labels: map[string]string{
"random": "random",
constants.LabelApplicationID: "app-0001",
"random": "random",
constants.CanonicalLabelApplicationID: "app-0001",
},
},
Spec: v1.PodSpec{},
Expand All @@ -117,9 +119,11 @@ func TestUpdateLabels(t *testing.T) {
assert.Equal(t, patch[0].Op, "add")
assert.Equal(t, patch[0].Path, "/metadata/labels")
if updatedMap, ok := patch[0].Value.(map[string]string); ok {
assert.Equal(t, len(updatedMap), 3)
assert.Equal(t, len(updatedMap), 5)
assert.Equal(t, updatedMap["random"], "random")
assert.Equal(t, updatedMap[constants.CanonicalLabelQueueName], "root.default")
assert.Equal(t, updatedMap[constants.LabelQueueName], "root.default")
assert.Equal(t, updatedMap[constants.CanonicalLabelApplicationID], "app-0001")
assert.Equal(t, updatedMap[constants.LabelApplicationID], "app-0001")
} else {
t.Fatal("patch info content is not as expected")
Expand All @@ -140,8 +144,8 @@ func TestUpdateLabels(t *testing.T) {
UID: "7f5fd6c5d5",
ResourceVersion: "10654",
Labels: map[string]string{
"random": "random",
constants.LabelQueueName: "root.abc",
"random": "random",
constants.CanonicalLabelQueueName: "root.abc",
},
},
Spec: v1.PodSpec{},
Expand All @@ -154,9 +158,11 @@ func TestUpdateLabels(t *testing.T) {
assert.Equal(t, patch[0].Op, "add")
assert.Equal(t, patch[0].Path, "/metadata/labels")
if updatedMap, ok := patch[0].Value.(map[string]string); ok {
assert.Equal(t, len(updatedMap), 3)
assert.Equal(t, len(updatedMap), 5)
assert.Equal(t, updatedMap["random"], "random")
assert.Equal(t, updatedMap[constants.CanonicalLabelQueueName], "root.abc")
assert.Equal(t, updatedMap[constants.LabelQueueName], "root.abc")
assert.Equal(t, strings.HasPrefix(updatedMap[constants.CanonicalLabelApplicationID], constants.AutoGenAppPrefix), true)
assert.Equal(t, strings.HasPrefix(updatedMap[constants.LabelApplicationID], constants.AutoGenAppPrefix), true)
} else {
t.Fatal("patch info content is not as expected")
Expand Down Expand Up @@ -186,8 +192,10 @@ func TestUpdateLabels(t *testing.T) {
assert.Equal(t, patch[0].Op, "add")
assert.Equal(t, patch[0].Path, "/metadata/labels")
if updatedMap, ok := patch[0].Value.(map[string]string); ok {
assert.Equal(t, len(updatedMap), 2)
assert.Equal(t, len(updatedMap), 4)
assert.Equal(t, updatedMap[constants.CanonicalLabelQueueName], "root.default")
assert.Equal(t, updatedMap[constants.LabelQueueName], "root.default")
assert.Equal(t, strings.HasPrefix(updatedMap[constants.CanonicalLabelApplicationID], constants.AutoGenAppPrefix), true)
assert.Equal(t, strings.HasPrefix(updatedMap[constants.LabelApplicationID], constants.AutoGenAppPrefix), true)
} else {
t.Fatal("patch info content is not as expected")
Expand All @@ -214,8 +222,10 @@ func TestUpdateLabels(t *testing.T) {
assert.Equal(t, patch[0].Op, "add")
assert.Equal(t, patch[0].Path, "/metadata/labels")
if updatedMap, ok := patch[0].Value.(map[string]string); ok {
assert.Equal(t, len(updatedMap), 2)
assert.Equal(t, len(updatedMap), 4)
assert.Equal(t, updatedMap[constants.CanonicalLabelQueueName], "root.default")
assert.Equal(t, updatedMap[constants.LabelQueueName], "root.default")
assert.Equal(t, strings.HasPrefix(updatedMap[constants.CanonicalLabelApplicationID], constants.AutoGenAppPrefix), true)
assert.Equal(t, strings.HasPrefix(updatedMap[constants.LabelApplicationID], constants.AutoGenAppPrefix), true)
} else {
t.Fatal("patch info content is not as expected")
Expand All @@ -240,8 +250,10 @@ func TestUpdateLabels(t *testing.T) {
assert.Equal(t, patch[0].Op, "add")
assert.Equal(t, patch[0].Path, "/metadata/labels")
if updatedMap, ok := patch[0].Value.(map[string]string); ok {
assert.Equal(t, len(updatedMap), 2)
assert.Equal(t, len(updatedMap), 4)
assert.Equal(t, updatedMap[constants.CanonicalLabelQueueName], "root.default")
assert.Equal(t, updatedMap[constants.LabelQueueName], "root.default")
assert.Equal(t, strings.HasPrefix(updatedMap[constants.CanonicalLabelApplicationID], constants.AutoGenAppPrefix), true)
assert.Equal(t, strings.HasPrefix(updatedMap[constants.LabelApplicationID], constants.AutoGenAppPrefix), true)
} else {
t.Fatal("patch info content is not as expected")
Expand Down
9 changes: 7 additions & 2 deletions pkg/admission/conf/am_conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,13 @@ func NewAdmissionControllerConf(configMaps []*v1.ConfigMap) *AdmissionController
return acc
}

func (acc *AdmissionControllerConf) RegisterHandlers(configMaps informersv1.ConfigMapInformer) {
configMaps.Informer().AddEventHandler(&configMapUpdateHandler{conf: acc})
func (acc *AdmissionControllerConf) RegisterHandlers(configMaps informersv1.ConfigMapInformer) error {
_, err := configMaps.Informer().AddEventHandler(&configMapUpdateHandler{conf: acc})
if err != nil {
return fmt.Errorf("failed to create register handlers: %w", err)
}

return nil
}

func (acc *AdmissionControllerConf) GetNamespace() string {
Expand Down
11 changes: 8 additions & 3 deletions pkg/admission/namespace_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package admission

import (
"fmt"

v1 "k8s.io/api/core/v1"
informersv1 "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -53,14 +55,17 @@ type nsFlags struct {
}

// NewNamespaceCache creates a new cache and registers the handler for the cache with the Informer.
func NewNamespaceCache(namespaces informersv1.NamespaceInformer) *NamespaceCache {
func NewNamespaceCache(namespaces informersv1.NamespaceInformer) (*NamespaceCache, error) {
nsc := &NamespaceCache{
nameSpaces: make(map[string]nsFlags),
}
if namespaces != nil {
namespaces.Informer().AddEventHandler(&namespaceUpdateHandler{cache: nsc})
_, err := namespaces.Informer().AddEventHandler(&namespaceUpdateHandler{cache: nsc})
if err != nil {
return nil, fmt.Errorf("failed to create namespace cache: %w", err)
}
}
return nsc
return nsc, nil
}

// enableYuniKorn returns the value for the enableYuniKorn flag (tri-state UNSET, TRUE or FALSE) for the namespace.
Expand Down
6 changes: 4 additions & 2 deletions pkg/admission/namespace_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ import (
const testNS = "test-ns"

func TestFlags(t *testing.T) {
cache := NewNamespaceCache(nil)
cache, nsErr := NewNamespaceCache(nil)
assert.NilError(t, nsErr)
cache.nameSpaces["notset"] = nsFlags{
enableYuniKorn: UNSET,
generateAppID: UNSET,
Expand Down Expand Up @@ -69,7 +70,8 @@ func TestNamespaceHandlers(t *testing.T) {
kubeClient := client.NewKubeClientMock(false)

informers := NewInformers(kubeClient, "default")
cache := NewNamespaceCache(informers.Namespace)
cache, nsErr := NewNamespaceCache(informers.Namespace)
assert.NilError(t, nsErr)
informers.Start()
defer informers.Stop()

Expand Down
11 changes: 8 additions & 3 deletions pkg/admission/priority_class_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package admission

import (
"fmt"

schedulingv1 "k8s.io/api/scheduling/v1"
informersv1 "k8s.io/client-go/informers/scheduling/v1"
"k8s.io/client-go/tools/cache"
Expand All @@ -36,14 +38,17 @@ type PriorityClassCache struct {
}

// NewPriorityClassCache creates a new cache and registers the handler for the cache with the Informer.
func NewPriorityClassCache(priorityClasses informersv1.PriorityClassInformer) *PriorityClassCache {
func NewPriorityClassCache(priorityClasses informersv1.PriorityClassInformer) (*PriorityClassCache, error) {
pcc := &PriorityClassCache{
priorityClasses: make(map[string]bool),
}
if priorityClasses != nil {
priorityClasses.Informer().AddEventHandler(&priorityClassUpdateHandler{cache: pcc})
_, err := priorityClasses.Informer().AddEventHandler(&priorityClassUpdateHandler{cache: pcc})
if err != nil {
return nil, fmt.Errorf("failed to create a new cache and register the handler: %w", err)
}
}
return pcc
return pcc, nil
}

// isPreemptSelfAllowed returns the preemption value. Only returns false if configured.
Expand Down
6 changes: 4 additions & 2 deletions pkg/admission/priority_class_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ import (
const testPC = "test-pc"

func TestIsPreemptSelfAllowed(t *testing.T) {
cache := NewPriorityClassCache(nil)
cache, pcErr := NewPriorityClassCache(nil)
assert.NilError(t, pcErr)
cache.priorityClasses["yes"] = true
cache.priorityClasses["no"] = false

Expand All @@ -49,7 +50,8 @@ func TestPriorityClassHandlers(t *testing.T) {
kubeClient := client.NewKubeClientMock(false)

informers := NewInformers(kubeClient, "default")
cache := NewPriorityClassCache(informers.PriorityClass)
cache, pcErr := NewPriorityClassCache(informers.PriorityClass)
assert.NilError(t, pcErr)
informers.Start()
defer informers.Stop()

Expand Down
17 changes: 15 additions & 2 deletions pkg/admission/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,30 +35,43 @@ func updatePodLabel(pod *v1.Pod, namespace string, generateUniqueAppIds bool, de
result[k] = v
}

canonicalAppID := utils.GetPodLabelValue(pod, constants.CanonicalLabelApplicationID)
sparkAppID := utils.GetPodLabelValue(pod, constants.SparkLabelAppID)
labelAppID := utils.GetPodLabelValue(pod, constants.LabelApplicationID)
annotationAppID := utils.GetPodAnnotationValue(pod, constants.AnnotationApplicationID)
if sparkAppID == "" && labelAppID == "" && annotationAppID == "" {
if canonicalAppID == "" && sparkAppID == "" && labelAppID == "" && annotationAppID == "" {
// if app id not exist, generate one
// for each namespace, we group unnamed pods to one single app - if GenerateUniqueAppId is not set
// if GenerateUniqueAppId:
// application ID convention: ${NAMESPACE}-${POD_UID}
// else
// application ID convention: ${AUTO_GEN_PREFIX}-${NAMESPACE}-${AUTO_GEN_SUFFIX}
generatedID := utils.GenerateApplicationID(namespace, generateUniqueAppIds, string(pod.UID))

result[constants.CanonicalLabelApplicationID] = generatedID
// Deprecated: After 1.7.0, admission controller will only add canonical label if application ID was not set
result[constants.LabelApplicationID] = generatedID
} else if canonicalAppID != "" {
// Deprecated: Added in 1.6.0 for backward compatibility, in case the prior shim version can't handle canonical label
result[constants.LabelApplicationID] = canonicalAppID
}

canonicalQueueName := utils.GetPodLabelValue(pod, constants.CanonicalLabelQueueName)
labelQueueName := utils.GetPodLabelValue(pod, constants.LabelQueueName)
annotationQueueName := utils.GetPodAnnotationValue(pod, constants.AnnotationQueueName)
if labelQueueName == "" && annotationQueueName == "" {
if canonicalQueueName == "" && labelQueueName == "" && annotationQueueName == "" {
// if queueName not exist, generate one
// if defaultQueueName is "", skip adding default queue name to the pod labels
if defaultQueueName != "" {
// for undefined configuration, am_conf will add 'root.default' to retain existing behavior
// if a custom name is configured for default queue, it will be used instead of root.default
result[constants.CanonicalLabelQueueName] = defaultQueueName
// Deprecated: After 1.7.0, admission controller will only add canonical label if queue was not set
result[constants.LabelQueueName] = defaultQueueName
}
} else if canonicalQueueName != "" {
// Deprecated: Added in 1.6.0 for backward compatibility, in case the prior shim version can't handle canonical label
result[constants.LabelQueueName] = canonicalQueueName
}

return result
Expand Down
Loading
Loading