From 197e0cab543230049e31a469e04328d7c457ba69 Mon Sep 17 00:00:00 2001 From: Yu-Lin Chen Date: Fri, 21 Jun 2024 11:41:51 +0000 Subject: [PATCH] [YUNIKORN-2504] Support canonical labels for queue/applicationId in scheduler --- pkg/cache/application.go | 45 +++-- pkg/cache/placeholder.go | 4 +- pkg/cache/placeholder_test.go | 8 +- pkg/cache/task.go | 83 +++++++++- pkg/cache/task_state.go | 10 +- pkg/cache/task_test.go | 130 +++++++++++++++ pkg/common/constants/constants.go | 2 + pkg/common/utils/utils.go | 73 +++++++-- pkg/common/utils/utils_test.go | 154 +++++++++++++++++- .../basic_scheduling/basic_scheduling_test.go | 68 ++++++++ test/e2e/framework/helpers/k8s/k8s_utils.go | 12 ++ .../recovery_and_restart_test.go | 55 +++++++ 12 files changed, 587 insertions(+), 57 deletions(-) diff --git a/pkg/cache/application.go b/pkg/cache/application.go index ff4db058c..2d698ff40 100644 --- a/pkg/cache/application.go +++ b/pkg/cache/application.go @@ -395,7 +395,11 @@ func (app *Application) scheduleTasks(taskScheduleCondition func(t *Task) bool) for _, task := range app.GetNewTasks() { if taskScheduleCondition(task) { // for each new task, we do a sanity check before moving the state to Pending_Schedule - if err := task.sanityCheckBeforeScheduling(); err == nil { + // if the task is not ready for scheduling, we keep it in New state + // if the task pod is bounded and have conflicting metadata, we move the task to Rejected state + err, rejectTask := task.sanityCheckBeforeScheduling() + + if err == nil { // note, if we directly trigger submit task event, it may spawn too many duplicate // events, because a task might be submitted multiple times before its state transits to PENDING. if handleErr := task.handle( @@ -406,11 +410,20 @@ func (app *Application) scheduleTasks(taskScheduleCondition func(t *Task) bool) log.Log(log.ShimCacheApplication).Warn("init task failed", zap.Error(err)) } } else { - events.GetRecorder().Eventf(task.GetTaskPod().DeepCopy(), nil, v1.EventTypeWarning, "FailedScheduling", "FailedScheduling", err.Error()) - log.Log(log.ShimCacheApplication).Debug("task is not ready for scheduling", - zap.String("appID", task.applicationID), - zap.String("taskID", task.taskID), - zap.Error(err)) + if !rejectTask { + // no state transition + events.GetRecorder().Eventf(task.GetTaskPod().DeepCopy(), nil, v1.EventTypeWarning, "FailedScheduling", "FailedScheduling", err.Error()) + log.Log(log.ShimCacheApplication).Debug("task is not ready for scheduling", + zap.String("appID", task.applicationID), + zap.String("taskID", task.taskID), + zap.Error(err)) + } else { + // task transits to Rejected state + if handleErr := task.handle( + NewRejectTaskEvent(task.applicationID, task.taskID, err.Error())); handleErr != nil { + log.Log(log.ShimCacheApplication).Warn("reject task failed", zap.Error(err)) + } + } } } } @@ -568,22 +581,6 @@ func (app *Application) handleCompleteApplicationEvent() { }() } -func failTaskPodWithReasonAndMsg(task *Task, reason string, msg string) { - podCopy := task.GetTaskPod().DeepCopy() - podCopy.Status = v1.PodStatus{ - Phase: v1.PodFailed, - Reason: reason, - Message: msg, - } - log.Log(log.ShimCacheApplication).Info("setting pod to failed", zap.String("podName", task.GetTaskPod().Name)) - pod, err := task.UpdateTaskPodStatus(podCopy) - if err != nil { - log.Log(log.ShimCacheApplication).Error("failed to update task pod status", zap.Error(err)) - } else { - log.Log(log.ShimCacheApplication).Info("new pod status", zap.String("status", string(pod.Status.Phase))) - } -} - func (app *Application) handleFailApplicationEvent(errMsg string) { go func() { getPlaceholderManager().cleanUp(app) @@ -598,10 +595,10 @@ func (app *Application) handleFailApplicationEvent(errMsg string) { for _, task := range unalloc { // Only need to fail the non-placeholder pod(s) if strings.Contains(errMsg, constants.ApplicationInsufficientResourcesFailure) { - failTaskPodWithReasonAndMsg(task, constants.ApplicationInsufficientResourcesFailure, "Scheduling has timed out due to insufficient resources") + task.failTaskPodWithReasonAndMsg(constants.ApplicationInsufficientResourcesFailure, "Scheduling has timed out due to insufficient resources") } else if strings.Contains(errMsg, constants.ApplicationRejectedFailure) { errMsgArr := strings.Split(errMsg, ":") - failTaskPodWithReasonAndMsg(task, constants.ApplicationRejectedFailure, errMsgArr[1]) + task.failTaskPodWithReasonAndMsg(constants.ApplicationRejectedFailure, errMsgArr[1]) } events.GetRecorder().Eventf(task.GetTaskPod().DeepCopy(), nil, v1.EventTypeWarning, "ApplicationFailed", "ApplicationFailed", "Application %s scheduling failed, reason: %s", app.applicationID, errMsg) diff --git a/pkg/cache/placeholder.go b/pkg/cache/placeholder.go index 8235bf11c..748ba17e3 100644 --- a/pkg/cache/placeholder.go +++ b/pkg/cache/placeholder.go @@ -90,8 +90,8 @@ func newPlaceholder(placeholderName string, app *Application, taskGroup TaskGrou Name: placeholderName, Namespace: app.tags[constants.AppTagNamespace], Labels: utils.MergeMaps(taskGroup.Labels, map[string]string{ - constants.LabelApplicationID: app.GetApplicationID(), - constants.LabelQueueName: app.GetQueue(), + constants.CanonicalLabelApplicationID: app.GetApplicationID(), + constants.CanonicalLabelQueueName: app.GetQueue(), }), Annotations: annotations, OwnerReferences: ownerRefs, diff --git a/pkg/cache/placeholder_test.go b/pkg/cache/placeholder_test.go index 35914af38..553ade1aa 100644 --- a/pkg/cache/placeholder_test.go +++ b/pkg/cache/placeholder_test.go @@ -125,10 +125,10 @@ func TestNewPlaceholder(t *testing.T) { assert.Equal(t, holder.pod.Name, "ph-name") assert.Equal(t, holder.pod.Namespace, namespace) assert.DeepEqual(t, holder.pod.Labels, map[string]string{ - constants.LabelApplicationID: appID, - constants.LabelQueueName: queue, - "labelKey0": "labelKeyValue0", - "labelKey1": "labelKeyValue1", + constants.CanonicalLabelApplicationID: appID, + constants.CanonicalLabelQueueName: queue, + "labelKey0": "labelKeyValue0", + "labelKey1": "labelKeyValue1", }) assert.Equal(t, len(holder.pod.Annotations), 6, "unexpected number of annotations") assert.Equal(t, holder.pod.Annotations[constants.AnnotationTaskGroupName], app.taskGroups[0].Name) diff --git a/pkg/cache/task.go b/pkg/cache/task.go index 97f041ea2..8196ee24b 100644 --- a/pkg/cache/task.go +++ b/pkg/cache/task.go @@ -22,6 +22,7 @@ import ( "context" "fmt" "strconv" + "strings" "time" "github.com/looplab/fsm" @@ -187,6 +188,22 @@ func (task *Task) UpdateTaskPod(pod *v1.Pod, podMutator func(pod *v1.Pod)) (*v1. return task.context.apiProvider.GetAPIs().KubeClient.UpdatePod(pod, podMutator) } +func (task *Task) failTaskPodWithReasonAndMsg(reason string, msg string) { + podCopy := task.pod.DeepCopy() + podCopy.Status = v1.PodStatus{ + Phase: v1.PodFailed, + Reason: reason, + Message: msg, + } + log.Log(log.ShimCacheTask).Info("setting pod to failed", zap.String("podName", podCopy.Name)) + pod, err := task.UpdateTaskPodStatus(podCopy) + if err != nil { + log.Log(log.ShimCacheTask).Error("failed to update task pod status", zap.Error(err)) + } else { + log.Log(log.ShimCacheTask).Info("new pod status", zap.String("status", string(pod.Status.Phase))) + } +} + func (task *Task) isTerminated() bool { for _, states := range TaskStates().Terminated { if task.GetTaskState() == states { @@ -457,16 +474,19 @@ func (task *Task) postTaskBound() { } } -func (task *Task) postTaskRejected() { - // currently, once task is rejected by scheduler, we directly move task to failed state. - // so this function simply triggers the state transition when it is rejected. - // but further, we can introduce retry mechanism if necessary. +func (task *Task) postTaskRejected(reason string) { + // if task is rejected because of conflicting metadata, we should fail the pod with reason + if strings.Contains(reason, constants.TaskPodInconsistMetadataFailure) { + task.failTaskPodWithReasonAndMsg(constants.TaskRejectedFailure, reason) + } + + // move task to failed state. dispatcher.Dispatch(NewFailTaskEvent(task.applicationID, task.taskID, - fmt.Sprintf("task %s failed because it is rejected by scheduler", task.alias))) + fmt.Sprintf("task %s failed because it is rejected", task.alias))) events.GetRecorder().Eventf(task.pod.DeepCopy(), nil, v1.EventTypeWarning, "TaskRejected", "TaskRejected", - "Task %s is rejected by the scheduler", task.alias) + "Task %s is rejected", task.alias) } // beforeTaskFail releases the allocation or ask from scheduler core @@ -543,7 +563,56 @@ func (task *Task) releaseAllocation() { // some sanity checks before sending task for scheduling, // this reduces the scheduling overhead by blocking such // request away from the core scheduler. -func (task *Task) sanityCheckBeforeScheduling() error { +func (task *Task) sanityCheckBeforeScheduling() (error, bool) { + rejectTask := false + + if err := task.checkPodPVCs(); err != nil { + return err, rejectTask + } + + // only check pod labels and annotations consistency if pod is not already bound + // reject the task if pod metadata is conflicting + if !utils.PodAlreadyBound(task.pod) { + if err := task.checkPodMetadata(); err != nil { + rejectTask = true + return err, rejectTask + } + } + + return nil, rejectTask +} + +func (task *Task) checkPodMetadata() error { + // check application ID + appIdLabelKeys := []string{ + constants.CanonicalLabelApplicationID, + constants.SparkLabelAppID, + constants.LabelApplicationID, + } + appIdAnnotationKeys := []string{ + constants.AnnotationApplicationID, + } + if !utils.ValidatePodLabelAnnotationConsistency(task.pod, appIdLabelKeys, appIdAnnotationKeys) { + return fmt.Errorf("application ID is not consistently set in pod's labels and annotations. [%s]", constants.TaskPodInconsistMetadataFailure) + } + + // check queue name + queueLabelKeys := []string{ + constants.CanonicalLabelQueueName, + constants.LabelQueueName, + } + + queueAnnotationKeys := []string{ + constants.AnnotationQueueName, + } + + if !utils.ValidatePodLabelAnnotationConsistency(task.pod, queueLabelKeys, queueAnnotationKeys) { + return fmt.Errorf("queue is not consistently set in pod's labels and annotations. [%s]", constants.TaskPodInconsistMetadataFailure) + } + return nil +} + +func (task *Task) checkPodPVCs() error { // Check PVCs used by the pod namespace := task.pod.Namespace manifest := &(task.pod.Spec) diff --git a/pkg/cache/task_state.go b/pkg/cache/task_state.go index d244fa8d1..616f5858d 100644 --- a/pkg/cache/task_state.go +++ b/pkg/cache/task_state.go @@ -396,7 +396,15 @@ func callbacks(states *TStates) fsm.Callbacks { }, states.Rejected: func(_ context.Context, event *fsm.Event) { task := event.Args[0].(*Task) //nolint:errcheck - task.postTaskRejected() + eventArgs := make([]string, 1) + reason := "" + if err := events.GetEventArgsAsStrings(eventArgs, event.Args[1].([]interface{})); err != nil { + log.Log(log.ShimFSM).Error("failed to parse event arg", zap.Error(err)) + reason = err.Error() + } else { + reason = eventArgs[0] + } + task.postTaskRejected(reason) }, states.Failed: func(_ context.Context, event *fsm.Event) { task := event.Args[0].(*Task) //nolint:errcheck diff --git a/pkg/cache/task_test.go b/pkg/cache/task_test.go index a74983b35..66ea2383d 100644 --- a/pkg/cache/task_test.go +++ b/pkg/cache/task_test.go @@ -19,6 +19,7 @@ package cache import ( + "fmt" "testing" "time" @@ -699,6 +700,135 @@ func TestSimultaneousTaskCompleteAndAllocate(t *testing.T) { assert.Equal(t, task1.GetTaskState(), TaskStates().Completed) } +// nolint: funlen +func TestCheckPodMetadata(t *testing.T) { + const ( + appID = "app01" + app2ID = "app02" + queueName = "root.sandbox1" + queue2Name = "root.sandbox2" + ) + var appIdInconsitentErr = fmt.Errorf("application ID is not consistently set in pod's labels and annotations. [%s]", constants.TaskPodInconsistMetadataFailure) + var queueInconsitentErr = fmt.Errorf("queue is not consistently set in pod's labels and annotations. [%s]", constants.TaskPodInconsistMetadataFailure) + + testCases := []struct { + name string + podLabels map[string]string + podAnnotations map[string]string + expected error + }{ + { + "empty label and empty annotation in pod", nil, nil, nil, + }, + { + "appId and queueName have no conflict", + map[string]string{ + constants.CanonicalLabelApplicationID: appID, + constants.SparkLabelAppID: appID, + constants.LabelApp: appID, + constants.CanonicalLabelQueueName: queueName, + constants.LabelQueueName: queueName, + }, map[string]string{ + constants.AnnotationApplicationID: appID, + constants.AnnotationQueueName: queueName, + }, + nil, + }, + { + "have conflict appId in canonical label", + map[string]string{ + constants.CanonicalLabelApplicationID: app2ID, + constants.SparkLabelAppID: appID, + constants.LabelApplicationID: appID, + }, map[string]string{ + constants.AnnotationApplicationID: appID, + }, + appIdInconsitentErr, + }, + { + "have conflict appId in spark label", + map[string]string{ + constants.CanonicalLabelApplicationID: appID, + constants.SparkLabelAppID: app2ID, + constants.LabelApplicationID: appID, + }, map[string]string{ + constants.AnnotationApplicationID: appID, + }, + appIdInconsitentErr, + }, + { + "have conflict appId in legacy label", + map[string]string{ + constants.CanonicalLabelApplicationID: appID, + constants.SparkLabelAppID: appID, + constants.LabelApplicationID: app2ID, + }, map[string]string{ + constants.AnnotationApplicationID: appID, + }, + appIdInconsitentErr, + }, + { + "have conflict appId in annotation", + map[string]string{ + constants.CanonicalLabelApplicationID: appID, + constants.SparkLabelAppID: appID, + constants.LabelApplicationID: appID, + }, map[string]string{ + constants.AnnotationApplicationID: app2ID, + }, + appIdInconsitentErr, + }, + { + "have conflict queueNmae in canonical label", + map[string]string{ + constants.CanonicalLabelQueueName: queue2Name, + constants.LabelQueueName: queueName, + }, map[string]string{ + constants.AnnotationQueueName: queueName, + }, + queueInconsitentErr, + }, + { + "have conflict queueNmae in legacy label", + map[string]string{ + constants.CanonicalLabelQueueName: queueName, + constants.LabelQueueName: queue2Name, + }, map[string]string{ + constants.AnnotationQueueName: queueName, + }, + queueInconsitentErr, + }, + { + "have conflict queueNmae in annotation", + map[string]string{ + constants.CanonicalLabelQueueName: queueName, + constants.LabelQueueName: queueName, + }, map[string]string{ + constants.AnnotationQueueName: queue2Name, + }, + queueInconsitentErr, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + app := NewApplication(appID, "root.default", "user", testGroups, map[string]string{}, nil) + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Labels: tc.podLabels, + Annotations: tc.podAnnotations, + }, + } + task := NewTask("task01", app, nil, pod) + err := task.checkPodMetadata() + if err != nil { + assert.Equal(t, tc.expected.Error(), err.Error()) + } else { + assert.NilError(t, err) + } + }) + } +} + func TestUpdatePodCondition(t *testing.T) { condition := v1.PodCondition{ Type: v1.ContainersReady, diff --git a/pkg/common/constants/constants.go b/pkg/common/constants/constants.go index bdee70a8e..3e6e4480b 100644 --- a/pkg/common/constants/constants.go +++ b/pkg/common/constants/constants.go @@ -87,6 +87,8 @@ var SchedulingPolicyStyleParamValues = map[string]string{"Hard": "Hard", "Soft": const ApplicationInsufficientResourcesFailure = "ResourceReservationTimeout" const ApplicationRejectedFailure = "ApplicationRejected" +const TaskRejectedFailure = "TaskRejected" +const TaskPodInconsistMetadataFailure = "PodInconsistentMetadata" // namespace.max.* (Retaining for backwards compatibility. Need to be removed in next major release) const CPUQuota = DomainYuniKorn + "namespace.max.cpu" diff --git a/pkg/common/utils/utils.go b/pkg/common/utils/utils.go index 141b0cafd..8d3f1f22b 100644 --- a/pkg/common/utils/utils.go +++ b/pkg/common/utils/utils.go @@ -104,12 +104,21 @@ func IsAssignedPod(pod *v1.Pod) bool { } func GetQueueNameFromPod(pod *v1.Pod) string { + // Queue name can be defined in multiple places + // The queue name is determined by the following order + // 1. Label: constants.CanonicalLabelQueueName + // 2. Label: constants.LabelQueueName + // 3. Annotation: constants.AnnotationQueueName + // 4. Default: constants.ApplicationDefaultQueue queueName := constants.ApplicationDefaultQueue - if an := GetPodLabelValue(pod, constants.LabelQueueName); an != "" { - queueName = an - } else if qu := GetPodAnnotationValue(pod, constants.AnnotationQueueName); qu != "" { - queueName = qu + if canonicalLabelQueueName := GetPodLabelValue(pod, constants.CanonicalLabelQueueName); canonicalLabelQueueName != "" { + queueName = canonicalLabelQueueName + } else if labelQueueName := GetPodLabelValue(pod, constants.LabelQueueName); labelQueueName != "" { + queueName = labelQueueName + } else if annotationQueueName := GetPodAnnotationValue(pod, constants.AnnotationQueueName); annotationQueueName != "" { + queueName = annotationQueueName } + return queueName } @@ -154,15 +163,26 @@ func GetApplicationIDFromPod(pod *v1.Pod) string { } } - // Application ID can be defined in annotation - appID := GetPodAnnotationValue(pod, constants.AnnotationApplicationID) - if appID == "" { - // Application ID can be defined in label - appID = GetPodLabelValue(pod, constants.LabelApplicationID) + // Application ID can be defined in multiple places + // The application ID is determined by the following order. + // 1. Label: constants.CanonicalLabelApplicationID + // 2. Label: constants.LabelApplicationID + // 3. Label: constants.SparkLabelAppID + // 4. Annotation: constants.AnnotationApplicationID + labelKeys := []string{ + constants.CanonicalLabelApplicationID, + constants.LabelApplicationID, + constants.SparkLabelAppID, + } + appID := "" + for _, label := range labelKeys { + appID = GetPodLabelValue(pod, label) + if appID != "" { + break + } } if appID == "" { - // Spark can also define application ID - appID = GetPodLabelValue(pod, constants.SparkLabelAppID) + appID = GetPodAnnotationValue(pod, constants.AnnotationApplicationID) } // If plugin mode, interpret missing Application ID as a non-YuniKorn pod @@ -185,6 +205,37 @@ func GetApplicationIDFromPod(pod *v1.Pod) string { return GenerateApplicationID(pod.Namespace, conf.GetSchedulerConf().GenerateUniqueAppIds, string(pod.UID)) } +// ValidatePodLabelAnnotationConsistency return true if all non-empty values are consistent across provided label/annotation +func ValidatePodLabelAnnotationConsistency(pod *v1.Pod, labelKeys []string, annotationKeys []string) bool { + var firstValue string + + for _, key := range labelKeys { + value := GetPodLabelValue(pod, key) + if value == "" { + continue + } + if firstValue == "" { + firstValue = value + } else if firstValue != value { + return false + } + } + + for _, key := range annotationKeys { + value := GetPodAnnotationValue(pod, key) + if value == "" { + continue + } + if firstValue == "" { + firstValue = value + } else if firstValue != value { + return false + } + } + + return true +} + // compare the existing pod condition with the given one, return true if the pod condition remains not changed. // return false if pod has no condition set yet, or condition has changed. func PodUnderCondition(pod *v1.Pod, condition *v1.PodCondition) bool { diff --git a/pkg/common/utils/utils_test.go b/pkg/common/utils/utils_test.go index ac689473a..94f4cd0ef 100644 --- a/pkg/common/utils/utils_test.go +++ b/pkg/common/utils/utils_test.go @@ -514,6 +514,7 @@ func TestGetApplicationIDFromPod(t *testing.T) { defer SetPluginMode(false) defer func() { conf.GetSchedulerConf().GenerateUniqueAppIds = false }() + appIDInCanonicalLabel := "CanonicalLabelAppID" appIDInLabel := "labelAppID" appIDInAnnotation := "annotationAppID" appIDInSelector := "selectorAppID" @@ -525,6 +526,12 @@ func TestGetApplicationIDFromPod(t *testing.T) { expectedAppIDPluginMode string generateUniqueAppIds bool }{ + {"AppID defined in canonical label", &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{constants.CanonicalLabelApplicationID: appIDInCanonicalLabel}, + }, + Spec: v1.PodSpec{SchedulerName: constants.SchedulerName}, + }, appIDInCanonicalLabel, appIDInCanonicalLabel, false}, {"AppID defined in label", &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{constants.LabelApplicationID: appIDInLabel}, @@ -545,7 +552,15 @@ func TestGetApplicationIDFromPod(t *testing.T) { }, Spec: v1.PodSpec{SchedulerName: constants.SchedulerName}, }, "testns-podUid", "", true}, - {"Unique autogen token found with generateUnique", &v1.Pod{ + {"Unique autogen token found with generateUnique in canonical AppId label", &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "testns", + UID: "podUid", + Labels: map[string]string{constants.CanonicalLabelApplicationID: "testns-uniqueautogen"}, + }, + Spec: v1.PodSpec{SchedulerName: constants.SchedulerName}, + }, "testns-podUid", "testns-podUid", true}, + {"Unique autogen token found with generateUnique in legacy AppId labels", &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Namespace: "testns", UID: "podUid", @@ -553,7 +568,13 @@ func TestGetApplicationIDFromPod(t *testing.T) { }, Spec: v1.PodSpec{SchedulerName: constants.SchedulerName}, }, "testns-podUid", "testns-podUid", true}, - {"Non-yunikorn schedulerName", &v1.Pod{ + {"Non-yunikorn schedulerName with canonical AppId label", &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{constants.CanonicalLabelApplicationID: appIDInCanonicalLabel}, + }, + Spec: v1.PodSpec{SchedulerName: "default"}, + }, "", "", false}, + {"Non-yunikorn schedulerName with legacy AppId label", &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{constants.LabelApplicationID: appIDInLabel}, }, @@ -583,13 +604,20 @@ func TestGetApplicationIDFromPod(t *testing.T) { }, Spec: v1.PodSpec{SchedulerName: constants.SchedulerName}, }, appIDInAnnotation, appIDInAnnotation, false}, - {"AppID defined in label and annotation", &v1.Pod{ + {"AppID defined in canonical label and annotation", &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{constants.AnnotationApplicationID: appIDInAnnotation}, + Labels: map[string]string{constants.CanonicalLabelApplicationID: appIDInCanonicalLabel}, + }, + Spec: v1.PodSpec{SchedulerName: constants.SchedulerName}, + }, appIDInCanonicalLabel, appIDInCanonicalLabel, false}, + {"AppID defined in legacy label and annotation", &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Annotations: map[string]string{constants.AnnotationApplicationID: appIDInAnnotation}, Labels: map[string]string{constants.LabelApplicationID: appIDInLabel}, }, Spec: v1.PodSpec{SchedulerName: constants.SchedulerName}, - }, appIDInAnnotation, appIDInAnnotation, false}, + }, appIDInLabel, appIDInLabel, false}, {"Spark AppID defined in spark app selector", &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -603,14 +631,14 @@ func TestGetApplicationIDFromPod(t *testing.T) { Annotations: map[string]string{constants.AnnotationApplicationID: sparkIDInAnnotation}, }, Spec: v1.PodSpec{SchedulerName: constants.SchedulerName}, - }, sparkIDInAnnotation, sparkIDInAnnotation, false}, + }, appIDInSelector, appIDInSelector, false}, {"Spark AppID defined in spark app selector and annotation", &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{constants.SparkLabelAppID: appIDInSelector, constants.LabelApplicationID: appIDInLabel}, Annotations: map[string]string{constants.AnnotationApplicationID: sparkIDInAnnotation}, }, Spec: v1.PodSpec{SchedulerName: constants.SchedulerName}, - }, sparkIDInAnnotation, sparkIDInAnnotation, false}, + }, appIDInLabel, appIDInLabel, false}, {"No AppID defined", &v1.Pod{}, "", "", false}, {"Spark AppID defined in spark app selector and label", &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -633,6 +661,96 @@ func TestGetApplicationIDFromPod(t *testing.T) { } } +func TestValidatePodLabelAnnotationConsistency(t *testing.T) { + labelKeys := []string{"labelKey1", "labelKey2"} + annotationKeys := []string{"annotationKey1", "annotationKey2"} + + testCases := []struct { + name string + pod *v1.Pod + lablabelKeys []string + annotationKeys []string + expected bool + }{ + { + "empty pod indicates no inconsistency between labels and annotations", + &v1.Pod{}, + labelKeys, + annotationKeys, + true, + }, + { + "pod with values that are consistent across all labels and annotations", + &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "labelKey1": "value1", + "labelKey2": "value1", + }, + Annotations: map[string]string{ + "annotationKey1": "value1", + "annotationKey2": "value1", + }, + }, + }, + labelKeys, + annotationKeys, + true, + }, + { + "pod with inconsistent value in labels", + &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "labelKey1": "value1", + "labelKey2": "value2", + }, + }, + }, + labelKeys, + annotationKeys, + false, + }, + { + "pod with inconsistent value between label and annotation", + &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "labelKey1": "value1", + }, + Annotations: map[string]string{ + "annotationKey1": "value2", + }, + }, + }, + labelKeys, + annotationKeys, + false, + }, + { + "pod with inconsistent value in annotations", + &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + "annotationKey1": "value1", + "annotationKey2": "value2", + }, + }, + }, + labelKeys, + annotationKeys, + false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + isConsistet := ValidatePodLabelAnnotationConsistency(tc.pod, tc.lablabelKeys, tc.annotationKeys) + assert.Equal(t, isConsistet, tc.expected) + }) + } +} + func TestGenerateApplicationID(t *testing.T) { assert.Equal(t, "yunikorn-this-is-a-namespace-autogen", GenerateApplicationID("this-is-a-namespace", false, "pod-uid")) @@ -817,6 +935,7 @@ func TestGetUserFromPodAnnotation(t *testing.T) { } func TestGetQueueNameFromPod(t *testing.T) { + queueInCanonicalLabel := "sandboxCanonicalLabel" queueInLabel := "sandboxLabel" queueInAnnotation := "sandboxAnnotation" testCases := []struct { @@ -825,7 +944,16 @@ func TestGetQueueNameFromPod(t *testing.T) { expectedQueue string }{ { - name: "With queue label", + name: "With canonical queue label", + pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{constants.CanonicalLabelQueueName: queueInCanonicalLabel}, + }, + }, + expectedQueue: queueInCanonicalLabel, + }, + { + name: "With legacy queue label", pod: &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{constants.LabelQueueName: queueInLabel}, @@ -843,7 +971,17 @@ func TestGetQueueNameFromPod(t *testing.T) { expectedQueue: queueInAnnotation, }, { - name: "With queue label and annotation", + name: "With canonical queue label and annotation", + pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{constants.CanonicalLabelQueueName: queueInCanonicalLabel}, + Annotations: map[string]string{constants.AnnotationQueueName: queueInAnnotation}, + }, + }, + expectedQueue: queueInCanonicalLabel, + }, + { + name: "With legacy queue label and annotation", pod: &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{constants.LabelQueueName: queueInLabel}, diff --git a/test/e2e/basic_scheduling/basic_scheduling_test.go b/test/e2e/basic_scheduling/basic_scheduling_test.go index a95e091ab..0ab358143 100644 --- a/test/e2e/basic_scheduling/basic_scheduling_test.go +++ b/test/e2e/basic_scheduling/basic_scheduling_test.go @@ -27,6 +27,7 @@ import ( v1 "k8s.io/api/core/v1" "github.com/apache/yunikorn-core/pkg/webservice/dao" + "github.com/apache/yunikorn-k8shim/pkg/common/constants" tests "github.com/apache/yunikorn-k8shim/test/e2e" "github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/common" "github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/k8s" @@ -119,6 +120,73 @@ var _ = ginkgo.Describe("", func() { Ω(resMap["vcore"]).To(gomega.Equal(core)) }) + ginkgo.It("Verify_Pod_With_Conflicting_AppId", func() { + ginkgo.By("Validate task pod with conflicting appId will be rejected.") + PodName := "pod-with-conflicting-app-id" + AppIdA := "appId-A-" + common.RandSeq(10) + AppIdB := "appId-B-" + common.RandSeq(10) + + var testPodConfigs = k8s.TestPodConfig{ + Name: PodName, + Labels: map[string]string{ + constants.CanonicalLabelApplicationID: AppIdA, + constants.LabelApplicationID: AppIdA, + }, + Annotations: &k8s.PodAnnotation{ + Other: map[string]string{ + constants.AnnotationApplicationID: AppIdB, + }, + }, + Namespace: dev, + } + pod, err := k8s.InitTestPod(testPodConfigs) + Ω(err).NotTo(HaveOccurred()) + _, err = kClient.CreatePod(pod, dev) + Ω(err).NotTo(HaveOccurred()) + err = kClient.WaitForPodFailed(dev, PodName, 30*time.Second) + Ω(err).NotTo(HaveOccurred()) + reason, message, getReasonErr := kClient.GetPodFailureReasonAndMessage(PodName, dev) + Ω(getReasonErr).NotTo(HaveOccurred()) + + Ω(reason).To(gomega.Equal("TaskRejected")) + Ω(message).To(gomega.ContainSubstring("PodInconsistentMetadata")) + }) + + ginkgo.It("Verify_Pod_With_Conflicting_QueueName", func() { + ginkgo.By("Validate task pod with conflicting queue name will be rejected.") + PodName := "pod-with-conflicting-queue" + AppId := "appId-" + common.RandSeq(10) + queueNameA := "root.aaa" + queueNameB := "root.bbb" + + var testPodConfigs = k8s.TestPodConfig{ + Name: PodName, + Labels: map[string]string{ + constants.CanonicalLabelApplicationID: AppId, + constants.LabelApplicationID: AppId, + constants.CanonicalLabelQueueName: queueNameA, + constants.LabelQueueName: queueNameA, + }, + Annotations: &k8s.PodAnnotation{ + Other: map[string]string{ + constants.AnnotationQueueName: queueNameB, + }, + }, + Namespace: dev, + } + pod, err := k8s.InitTestPod(testPodConfigs) + Ω(err).NotTo(HaveOccurred()) + _, err = kClient.CreatePod(pod, dev) + Ω(err).NotTo(HaveOccurred()) + err = kClient.WaitForPodFailed(dev, PodName, 30*time.Second) + Ω(err).NotTo(HaveOccurred()) + reason, message, getReasonErr := kClient.GetPodFailureReasonAndMessage(PodName, dev) + Ω(getReasonErr).NotTo(HaveOccurred()) + + Ω(reason).To(gomega.Equal("TaskRejected")) + Ω(message).To(gomega.ContainSubstring("PodInconsistentMetadata")) + }) + ginkgo.AfterEach(func() { tests.DumpClusterInfoIfSpecFailed(suiteName, []string{dev}) // call the healthCheck api to check scheduler health diff --git a/test/e2e/framework/helpers/k8s/k8s_utils.go b/test/e2e/framework/helpers/k8s/k8s_utils.go index 77a472f57..a3adade33 100644 --- a/test/e2e/framework/helpers/k8s/k8s_utils.go +++ b/test/e2e/framework/helpers/k8s/k8s_utils.go @@ -188,6 +188,18 @@ func (k *KubeCtl) GetPod(name, namespace string) (*v1.Pod, error) { return k.clientSet.CoreV1().Pods(namespace).Get(context.TODO(), name, metav1.GetOptions{}) } +func (k *KubeCtl) GetPodFailureReasonAndMessage(name, namespace string) (string, string, error) { + pod, err := k.clientSet.CoreV1().Pods(namespace).Get(context.TODO(), name, metav1.GetOptions{}) + if err != nil { + return "", "", err + } + if pod.Status.Phase == v1.PodFailed { + return pod.Status.Reason, pod.Status.Message, nil + } else { + return "", "", fmt.Errorf("pod %s is not in failed state", name) + } +} + func (k *KubeCtl) GetSchedulerPod() (string, error) { podNameList, err := k.GetPodNamesFromNS(configmanager.YuniKornTestConfig.YkNamespace) if err != nil { diff --git a/test/e2e/recovery_and_restart/recovery_and_restart_test.go b/test/e2e/recovery_and_restart/recovery_and_restart_test.go index d78326f99..3bd769829 100644 --- a/test/e2e/recovery_and_restart/recovery_and_restart_test.go +++ b/test/e2e/recovery_and_restart/recovery_and_restart_test.go @@ -29,6 +29,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + "github.com/apache/yunikorn-k8shim/pkg/common/constants" tests "github.com/apache/yunikorn-k8shim/test/e2e" "github.com/apache/yunikorn-k8shim/test/e2e/framework/configmanager" "github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/common" @@ -362,6 +363,60 @@ var _ = ginkgo.Describe("", func() { Ω(err).NotTo(gomega.HaveOccurred()) }) + ginkgo.It("Verify_Pod_Restart_After_Add_Conflict_Metadata", func() { + // A tast with conflicting metadata in pod will be rejected. + // However, if the pod is already bounded, the task can still be registered to YK. + kClient = k8s.KubeCtl{} + Ω(kClient.SetClient()).To(gomega.BeNil()) + defer yunikorn.RestorePortForwarding(&kClient) + + ginkgo.By("Submitting a normal sleep pod with consistent metadata") + appId := "appId-" + common.RandSeq(10) + PodName := "normal-sleep-pod" + queueName := "root.abc" + var testPodConfigs = k8s.TestPodConfig{ + Name: PodName, + Labels: map[string]string{ + constants.CanonicalLabelApplicationID: appId, + constants.LabelApplicationID: appId, + constants.CanonicalLabelQueueName: queueName, + constants.LabelQueueName: queueName, + }, + Namespace: dev, + } + pod, err := k8s.InitTestPod(testPodConfigs) + Ω(err).NotTo(gomega.HaveOccurred()) + _, err = kClient.CreatePod(pod, dev) + Ω(err).NotTo(gomega.HaveOccurred()) + err = kClient.WaitForPodRunning(dev, PodName, 30*time.Second) + Ω(err).NotTo(gomega.HaveOccurred()) + + ginkgo.By("Add conflict queue name to the pod annotation") + pod, err = kClient.GetPod(PodName, dev) + Ω(err).NotTo(gomega.HaveOccurred()) + + _, err = kClient.UpdatePodWithAnnotation(pod, dev, constants.AnnotationQueueName, "other-queue") + Ω(err).NotTo(gomega.HaveOccurred()) + + ginkgo.By("Restart the scheduler pod") + yunikorn.RestartYunikorn(&kClient) + + ginkgo.By("Port-forward scheduler pod after restart") + yunikorn.RestorePortForwarding(&kClient) + + ginkgo.By("Check the bounded pod is still in running state") + err = kClient.WaitForPodRunning(dev, PodName, 30*time.Second) + gomega.Ω(err).NotTo(gomega.HaveOccurred()) + + ginkgo.By("Check the task pod is still registered to YK") + restClient = yunikorn.RClient{} + err = restClient.WaitForAppStateTransition("default", "root."+dev, appId, "Running", 30) + gomega.Ω(err).NotTo(gomega.HaveOccurred()) + appsInfo, err := restClient.GetAppInfo("default", "root."+dev, appId) + gomega.Ω(err).NotTo(gomega.HaveOccurred()) + gomega.Ω(len(appsInfo.Allocations)).To(gomega.Equal(1)) + }) + ginkgo.AfterEach(func() { tests.DumpClusterInfoIfSpecFailed(suiteName, []string{dev}) })