Skip to content

Commit

Permalink
[YUNIKORN-2504] Support canonical labels for queue/applicationId in s…
Browse files Browse the repository at this point in the history
…cheduler
  • Loading branch information
chenyulin0719 committed Jun 18, 2024
1 parent 233d8b5 commit 6057c13
Show file tree
Hide file tree
Showing 6 changed files with 193 additions and 46 deletions.
44 changes: 20 additions & 24 deletions pkg/cache/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,11 @@ func (app *Application) scheduleTasks(taskScheduleCondition func(t *Task) bool)
for _, task := range app.GetNewTasks() {
if taskScheduleCondition(task) {
// for each new task, we do a sanity check before moving the state to Pending_Schedule
if err := task.sanityCheckBeforeScheduling(); err == nil {
// if the task is not ready for scheduling, we keep it in New state
// if the task is illegal, we move it to Rejected state
err, rejectTask := task.sanityCheckBeforeScheduling()

if err == nil {
// note, if we directly trigger submit task event, it may spawn too many duplicate
// events, because a task might be submitted multiple times before its state transits to PENDING.
if handleErr := task.handle(
Expand All @@ -406,11 +410,19 @@ func (app *Application) scheduleTasks(taskScheduleCondition func(t *Task) bool)
log.Log(log.ShimCacheApplication).Warn("init task failed", zap.Error(err))
}
} else {
events.GetRecorder().Eventf(task.GetTaskPod().DeepCopy(), nil, v1.EventTypeWarning, "FailedScheduling", "FailedScheduling", err.Error())
log.Log(log.ShimCacheApplication).Debug("task is not ready for scheduling",
zap.String("appID", task.applicationID),
zap.String("taskID", task.taskID),
zap.Error(err))
if !rejectTask {
// no state transition needed, just log the error
events.GetRecorder().Eventf(task.GetTaskPod().DeepCopy(), nil, v1.EventTypeWarning, "FailedScheduling", "FailedScheduling", err.Error())
log.Log(log.ShimCacheApplication).Debug("task is not ready for scheduling",
zap.String("appID", task.applicationID),
zap.String("taskID", task.taskID),
zap.Error(err))
} else {
if handleErr := task.handle(
NewRejectTaskEvent(task.applicationID, task.taskID, err.Error())); handleErr != nil {
log.Log(log.ShimCacheApplication).Warn("reject task failed", zap.Error(err))
}
}
}
}
}
Expand Down Expand Up @@ -568,22 +580,6 @@ func (app *Application) handleCompleteApplicationEvent() {
}()
}

func failTaskPodWithReasonAndMsg(task *Task, reason string, msg string) {
podCopy := task.GetTaskPod().DeepCopy()
podCopy.Status = v1.PodStatus{
Phase: v1.PodFailed,
Reason: reason,
Message: msg,
}
log.Log(log.ShimCacheApplication).Info("setting pod to failed", zap.String("podName", task.GetTaskPod().Name))
pod, err := task.UpdateTaskPodStatus(podCopy)
if err != nil {
log.Log(log.ShimCacheApplication).Error("failed to update task pod status", zap.Error(err))
} else {
log.Log(log.ShimCacheApplication).Info("new pod status", zap.String("status", string(pod.Status.Phase)))
}
}

func (app *Application) handleFailApplicationEvent(errMsg string) {
go func() {
getPlaceholderManager().cleanUp(app)
Expand All @@ -598,10 +594,10 @@ func (app *Application) handleFailApplicationEvent(errMsg string) {
for _, task := range unalloc {
// Only need to fail the non-placeholder pod(s)
if strings.Contains(errMsg, constants.ApplicationInsufficientResourcesFailure) {
failTaskPodWithReasonAndMsg(task, constants.ApplicationInsufficientResourcesFailure, "Scheduling has timed out due to insufficient resources")
task.failTaskPodWithReasonAndMsg(constants.ApplicationInsufficientResourcesFailure, "Scheduling has timed out due to insufficient resources")
} else if strings.Contains(errMsg, constants.ApplicationRejectedFailure) {
errMsgArr := strings.Split(errMsg, ":")
failTaskPodWithReasonAndMsg(task, constants.ApplicationRejectedFailure, errMsgArr[1])
task.failTaskPodWithReasonAndMsg(constants.ApplicationRejectedFailure, errMsgArr[1])
}
events.GetRecorder().Eventf(task.GetTaskPod().DeepCopy(), nil, v1.EventTypeWarning, "ApplicationFailed", "ApplicationFailed",
"Application %s scheduling failed, reason: %s", app.applicationID, errMsg)
Expand Down
82 changes: 75 additions & 7 deletions pkg/cache/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"context"
"fmt"
"strconv"
"strings"
"time"

"github.com/looplab/fsm"
Expand Down Expand Up @@ -187,6 +188,22 @@ func (task *Task) UpdateTaskPod(pod *v1.Pod, podMutator func(pod *v1.Pod)) (*v1.
return task.context.apiProvider.GetAPIs().KubeClient.UpdatePod(pod, podMutator)
}

func (task *Task) failTaskPodWithReasonAndMsg(reason string, msg string) {
podCopy := task.pod.DeepCopy()
podCopy.Status = v1.PodStatus{
Phase: v1.PodFailed,
Reason: reason,
Message: msg,
}
log.Log(log.ShimCacheTask).Info("setting pod to failed", zap.String("podName", podCopy.Name))
pod, err := task.UpdateTaskPodStatus(podCopy)
if err != nil {
log.Log(log.ShimCacheTask).Error("failed to update task pod status", zap.Error(err))
} else {
log.Log(log.ShimCacheTask).Info("new pod status", zap.String("status", string(pod.Status.Phase)))
}
}

func (task *Task) isTerminated() bool {
for _, states := range TaskStates().Terminated {
if task.GetTaskState() == states {
Expand Down Expand Up @@ -457,16 +474,19 @@ func (task *Task) postTaskBound() {
}
}

func (task *Task) postTaskRejected() {
// currently, once task is rejected by scheduler, we directly move task to failed state.
// so this function simply triggers the state transition when it is rejected.
// but further, we can introduce retry mechanism if necessary.
func (task *Task) postTaskRejected(reason string) {
// if task is rejected because of inconsistent metadata, we should fail the pod with reason
if strings.Contains(reason, constants.TaskPodInconsistMetadataFailure) {
task.failTaskPodWithReasonAndMsg(constants.TaskRejectedFailure, reason)
}

// move task to failed state.
dispatcher.Dispatch(NewFailTaskEvent(task.applicationID, task.taskID,
fmt.Sprintf("task %s failed because it is rejected by scheduler", task.alias)))
fmt.Sprintf("task %s failed because it is rejected", task.alias)))

events.GetRecorder().Eventf(task.pod.DeepCopy(), nil,
v1.EventTypeWarning, "TaskRejected", "TaskRejected",
"Task %s is rejected by the scheduler", task.alias)
"Task %s is rejected", task.alias)
}

// beforeTaskFail releases the allocation or ask from scheduler core
Expand Down Expand Up @@ -543,7 +563,55 @@ func (task *Task) releaseAllocation() {
// some sanity checks before sending task for scheduling,
// this reduces the scheduling overhead by blocking such
// request away from the core scheduler.
func (task *Task) sanityCheckBeforeScheduling() error {
func (task *Task) sanityCheckBeforeScheduling() (error, bool) {
rejectTask := false

if err := task.checkPodPVCs(); err != nil {
return err, rejectTask
}

// only check pod labels and annotations consistency if pod is not already bound
if !utils.PodAlreadyBound(task.pod) {
if err := task.checkPodMetadata(); err != nil {
rejectTask = true
return err, rejectTask
}
}

return nil, rejectTask
}

func (task *Task) checkPodMetadata() error {
// check application ID
appIdLabels := []string{
constants.CanonicalLabelApplicationID,
constants.SparkLabelAppID,
constants.LabelApplicationID,
}
appIdAnnotations := []string{
constants.AnnotationApplicationID,
}
if !utils.ValidatePodLabelAnnotationConsistency(task.pod, appIdLabels, appIdAnnotations) {
return fmt.Errorf("application ID is not set consistently in pod labels and annotations. [%s]", constants.TaskPodInconsistMetadataFailure)
}

// check queue name
queueLabels := []string{
constants.CanonicalLabelQueueName,
constants.LabelQueueName,
}

queueAnnotations := []string{
constants.AnnotationQueueName,
}

if !utils.ValidatePodLabelAnnotationConsistency(task.pod, queueLabels, queueAnnotations) {
return fmt.Errorf("queue is not set consistently in pod labels and annotations. [%s]", constants.TaskPodInconsistMetadataFailure)
}
return nil
}

func (task *Task) checkPodPVCs() error {
// Check PVCs used by the pod
namespace := task.pod.Namespace
manifest := &(task.pod.Spec)
Expand Down
11 changes: 10 additions & 1 deletion pkg/cache/task_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,16 @@ func newTaskState() *fsm.FSM {
},
states.Rejected: func(_ context.Context, event *fsm.Event) {
task := event.Args[0].(*Task) //nolint:errcheck
task.postTaskRejected()
eventArgs := make([]string, 1)
reason := ""
if err := events.GetEventArgsAsStrings(eventArgs, event.Args[1].([]interface{})); err != nil {
log.Log(log.ShimFSM).Error("failed to parse event arg", zap.Error(err))
reason = err.Error()
} else {
reason = eventArgs[0]
}

task.postTaskRejected(reason)
},
states.Failed: func(_ context.Context, event *fsm.Event) {
task := event.Args[0].(*Task) //nolint:errcheck
Expand Down
2 changes: 2 additions & 0 deletions pkg/common/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ var SchedulingPolicyStyleParamValues = map[string]string{"Hard": "Hard", "Soft":

const ApplicationInsufficientResourcesFailure = "ResourceReservationTimeout"
const ApplicationRejectedFailure = "ApplicationRejected"
const TaskRejectedFailure = "TaskRejected"
const TaskPodInconsistMetadataFailure = "PodInconsistentMetadata"

// namespace.max.* (Retaining for backwards compatibility. Need to be removed in next major release)
const CPUQuota = DomainYuniKorn + "namespace.max.cpu"
Expand Down
60 changes: 52 additions & 8 deletions pkg/common/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,21 @@ func IsAssignedPod(pod *v1.Pod) bool {
}

func GetQueueNameFromPod(pod *v1.Pod) string {
// Queue name can be defined in multiple places
// The queue name is determined by the following order
// 1. Label: constants.CanonicalLabelQueueName
// 2. Label: constants.LabelQueueName
// 3. Annotation: constants.AnnotationQueueName
// 4. Default: constants.ApplicationDefaultQueue
queueName := constants.ApplicationDefaultQueue
if an := GetPodLabelValue(pod, constants.LabelQueueName); an != "" {
queueName = an
} else if qu := GetPodAnnotationValue(pod, constants.AnnotationQueueName); qu != "" {
queueName = qu
if canonicalLabelQueueName := GetPodLabelValue(pod, constants.CanonicalLabelQueueName); canonicalLabelQueueName != "" {
queueName = canonicalLabelQueueName
} else if labelQueueName := GetPodLabelValue(pod, constants.LabelQueueName); labelQueueName != "" {
queueName = labelQueueName
} else if annotationQueueName := GetPodAnnotationValue(pod, constants.AnnotationQueueName); annotationQueueName != "" {
queueName = annotationQueueName
}

return queueName
}

Expand Down Expand Up @@ -154,16 +163,22 @@ func GetApplicationIDFromPod(pod *v1.Pod) string {
}
}

// Application ID can be defined in annotation
appID := GetPodAnnotationValue(pod, constants.AnnotationApplicationID)
// Application ID can be defined in multiple places
// The application ID is determined by the following order.
// 1. Label: constants.CanonicalLabelApplicationID
// 2. Label: constants.LabelApplicationID
// 3. Label: constants.SparkLabelAppID
// 4. Annotation: constants.AnnotationApplicationID
appID := GetPodLabelValue(pod, constants.CanonicalLabelApplicationID)
if appID == "" {
// Application ID can be defined in label
appID = GetPodLabelValue(pod, constants.LabelApplicationID)
}
if appID == "" {
// Spark can also define application ID
appID = GetPodLabelValue(pod, constants.SparkLabelAppID)
}
if appID == "" {
appID = GetPodAnnotationValue(pod, constants.AnnotationApplicationID)
}

// If plugin mode, interpret missing Application ID as a non-YuniKorn pod
if pluginMode && appID == "" {
Expand All @@ -185,6 +200,35 @@ func GetApplicationIDFromPod(pod *v1.Pod) string {
return GenerateApplicationID(pod.Namespace, conf.GetSchedulerConf().GenerateUniqueAppIds, string(pod.UID))
}

// check conflict information in pod, return true if all values are consistent in provided label/annotation keys if set
func ValidatePodLabelAnnotationConsistency(pod *v1.Pod, labelKeys []string, annotationKeys []string) bool {
var firstValue string

for _, key := range labelKeys {
value := GetPodLabelValue(pod, key)
if value == "" {

Check failure on line 209 in pkg/common/utils/utils.go

View workflow job for this annotation

GitHub Actions / build

ifElseChain: rewrite if-else to switch statement (gocritic)
continue
} else if firstValue == "" {
firstValue = value
} else if firstValue != value {
return false
}
}

for _, key := range annotationKeys {
value := GetPodAnnotationValue(pod, key)
if value == "" {

Check failure on line 220 in pkg/common/utils/utils.go

View workflow job for this annotation

GitHub Actions / build

ifElseChain: rewrite if-else to switch statement (gocritic)
continue
} else if firstValue == "" {
firstValue = value
} else if firstValue != value {
return false
}
}

return true
}

// compare the existing pod condition with the given one, return true if the pod condition remains not changed.
// return false if pod has no condition set yet, or condition has changed.
func PodUnderCondition(pod *v1.Pod, condition *v1.PodCondition) bool {
Expand Down
40 changes: 34 additions & 6 deletions pkg/common/utils/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,7 @@ func TestGetApplicationIDFromPod(t *testing.T) {
defer SetPluginMode(false)
defer func() { conf.GetSchedulerConf().GenerateUniqueAppIds = false }()

appIDInCanonicalLabel := "CanonicalLabelAppID"
appIDInLabel := "labelAppID"
appIDInAnnotation := "annotationAppID"
appIDInSelector := "selectorAppID"
Expand All @@ -525,6 +526,12 @@ func TestGetApplicationIDFromPod(t *testing.T) {
expectedAppIDPluginMode string
generateUniqueAppIds bool
}{
{"AppID defined in canonical label", &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{constants.CanonicalLabelApplicationID: appIDInCanonicalLabel},
},
Spec: v1.PodSpec{SchedulerName: constants.SchedulerName},
}, appIDInCanonicalLabel, appIDInCanonicalLabel, false},
{"AppID defined in label", &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{constants.LabelApplicationID: appIDInLabel},
Expand All @@ -545,15 +552,29 @@ func TestGetApplicationIDFromPod(t *testing.T) {
},
Spec: v1.PodSpec{SchedulerName: constants.SchedulerName},
}, "testns-podUid", "", true},
{"Unique autogen token found with generateUnique", &v1.Pod{
{"Unique autogen token found with generateUnique in canonical AppId label", &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "testns",
UID: "podUid",
Labels: map[string]string{constants.CanonicalLabelApplicationID: "testns-uniqueautogen"},
},
Spec: v1.PodSpec{SchedulerName: constants.SchedulerName},
}, "testns-podUid", "testns-podUid", true},
{"Unique autogen token found with generateUnique in legacy AppId labels", &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "testns",
UID: "podUid",
Labels: map[string]string{constants.LabelApplicationID: "testns-uniqueautogen"},
},
Spec: v1.PodSpec{SchedulerName: constants.SchedulerName},
}, "testns-podUid", "testns-podUid", true},
{"Non-yunikorn schedulerName", &v1.Pod{
{"Non-yunikorn schedulerName with canonical AppId label", &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{constants.CanonicalLabelApplicationID: appIDInCanonicalLabel},
},
Spec: v1.PodSpec{SchedulerName: "default"},
}, "", "", false},
{"Non-yunikorn schedulerName with legacy AppId label", &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{constants.LabelApplicationID: appIDInLabel},
},
Expand Down Expand Up @@ -583,13 +604,20 @@ func TestGetApplicationIDFromPod(t *testing.T) {
},
Spec: v1.PodSpec{SchedulerName: constants.SchedulerName},
}, appIDInAnnotation, appIDInAnnotation, false},
{"AppID defined in label and annotation", &v1.Pod{
{"AppID defined in canonical label and annotation", &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{constants.AnnotationApplicationID: appIDInAnnotation},
Labels: map[string]string{constants.CanonicalLabelApplicationID: appIDInCanonicalLabel},
},
Spec: v1.PodSpec{SchedulerName: constants.SchedulerName},
}, appIDInCanonicalLabel, appIDInCanonicalLabel, false},
{"AppID defined in legacy label and annotation", &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{constants.AnnotationApplicationID: appIDInAnnotation},
Labels: map[string]string{constants.LabelApplicationID: appIDInLabel},
},
Spec: v1.PodSpec{SchedulerName: constants.SchedulerName},
}, appIDInAnnotation, appIDInAnnotation, false},
}, appIDInLabel, appIDInLabel, false},

{"Spark AppID defined in spark app selector", &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -603,14 +631,14 @@ func TestGetApplicationIDFromPod(t *testing.T) {
Annotations: map[string]string{constants.AnnotationApplicationID: sparkIDInAnnotation},
},
Spec: v1.PodSpec{SchedulerName: constants.SchedulerName},
}, sparkIDInAnnotation, sparkIDInAnnotation, false},
}, appIDInSelector, appIDInSelector, false},
{"Spark AppID defined in spark app selector and annotation", &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{constants.SparkLabelAppID: appIDInSelector, constants.LabelApplicationID: appIDInLabel},
Annotations: map[string]string{constants.AnnotationApplicationID: sparkIDInAnnotation},
},
Spec: v1.PodSpec{SchedulerName: constants.SchedulerName},
}, sparkIDInAnnotation, sparkIDInAnnotation, false},
}, appIDInLabel, appIDInLabel, false},
{"No AppID defined", &v1.Pod{}, "", "", false},
{"Spark AppID defined in spark app selector and label", &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Expand Down

0 comments on commit 6057c13

Please sign in to comment.