Skip to content

Commit

Permalink
[YUNIKORN-2504] Support canonical labels for queue/applicationId in s…
Browse files Browse the repository at this point in the history
…cheduler
  • Loading branch information
chenyulin0719 committed Jun 21, 2024
1 parent 24efbed commit 3bf0269
Show file tree
Hide file tree
Showing 12 changed files with 583 additions and 57 deletions.
45 changes: 21 additions & 24 deletions pkg/cache/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,11 @@ func (app *Application) scheduleTasks(taskScheduleCondition func(t *Task) bool)
for _, task := range app.GetNewTasks() {
if taskScheduleCondition(task) {
// for each new task, we do a sanity check before moving the state to Pending_Schedule
if err := task.sanityCheckBeforeScheduling(); err == nil {
// if the task is not ready for scheduling, we keep it in New state
// if the task pod is bounded and have conflicting metadata, we move the task to Rejected state
err, rejectTask := task.sanityCheckBeforeScheduling()

if err == nil {
// note, if we directly trigger submit task event, it may spawn too many duplicate
// events, because a task might be submitted multiple times before its state transits to PENDING.
if handleErr := task.handle(
Expand All @@ -406,11 +410,20 @@ func (app *Application) scheduleTasks(taskScheduleCondition func(t *Task) bool)
log.Log(log.ShimCacheApplication).Warn("init task failed", zap.Error(err))
}
} else {
events.GetRecorder().Eventf(task.GetTaskPod().DeepCopy(), nil, v1.EventTypeWarning, "FailedScheduling", "FailedScheduling", err.Error())
log.Log(log.ShimCacheApplication).Debug("task is not ready for scheduling",
zap.String("appID", task.applicationID),
zap.String("taskID", task.taskID),
zap.Error(err))
if !rejectTask {

Check warning on line 413 in pkg/cache/application.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/application.go#L413

Added line #L413 was not covered by tests
// no state transition
events.GetRecorder().Eventf(task.GetTaskPod().DeepCopy(), nil, v1.EventTypeWarning, "FailedScheduling", "FailedScheduling", err.Error())
log.Log(log.ShimCacheApplication).Debug("task is not ready for scheduling",
zap.String("appID", task.applicationID),
zap.String("taskID", task.taskID),
zap.Error(err))
} else {

Check warning on line 420 in pkg/cache/application.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/application.go#L415-L420

Added lines #L415 - L420 were not covered by tests
// task transits to Rejected state
if handleErr := task.handle(
NewRejectTaskEvent(task.applicationID, task.taskID, err.Error())); handleErr != nil {
log.Log(log.ShimCacheApplication).Warn("reject task failed", zap.Error(err))

Check warning on line 424 in pkg/cache/application.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/application.go#L422-L424

Added lines #L422 - L424 were not covered by tests
}
}
}
}
}
Expand Down Expand Up @@ -568,22 +581,6 @@ func (app *Application) handleCompleteApplicationEvent() {
}()
}

func failTaskPodWithReasonAndMsg(task *Task, reason string, msg string) {
podCopy := task.GetTaskPod().DeepCopy()
podCopy.Status = v1.PodStatus{
Phase: v1.PodFailed,
Reason: reason,
Message: msg,
}
log.Log(log.ShimCacheApplication).Info("setting pod to failed", zap.String("podName", task.GetTaskPod().Name))
pod, err := task.UpdateTaskPodStatus(podCopy)
if err != nil {
log.Log(log.ShimCacheApplication).Error("failed to update task pod status", zap.Error(err))
} else {
log.Log(log.ShimCacheApplication).Info("new pod status", zap.String("status", string(pod.Status.Phase)))
}
}

func (app *Application) handleFailApplicationEvent(errMsg string) {
go func() {
getPlaceholderManager().cleanUp(app)
Expand All @@ -598,10 +595,10 @@ func (app *Application) handleFailApplicationEvent(errMsg string) {
for _, task := range unalloc {
// Only need to fail the non-placeholder pod(s)
if strings.Contains(errMsg, constants.ApplicationInsufficientResourcesFailure) {
failTaskPodWithReasonAndMsg(task, constants.ApplicationInsufficientResourcesFailure, "Scheduling has timed out due to insufficient resources")
task.failTaskPodWithReasonAndMsg(constants.ApplicationInsufficientResourcesFailure, "Scheduling has timed out due to insufficient resources")
} else if strings.Contains(errMsg, constants.ApplicationRejectedFailure) {
errMsgArr := strings.Split(errMsg, ":")
failTaskPodWithReasonAndMsg(task, constants.ApplicationRejectedFailure, errMsgArr[1])
task.failTaskPodWithReasonAndMsg(constants.ApplicationRejectedFailure, errMsgArr[1])
}
events.GetRecorder().Eventf(task.GetTaskPod().DeepCopy(), nil, v1.EventTypeWarning, "ApplicationFailed", "ApplicationFailed",
"Application %s scheduling failed, reason: %s", app.applicationID, errMsg)
Expand Down
4 changes: 2 additions & 2 deletions pkg/cache/placeholder.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ func newPlaceholder(placeholderName string, app *Application, taskGroup TaskGrou
Name: placeholderName,
Namespace: app.tags[constants.AppTagNamespace],
Labels: utils.MergeMaps(taskGroup.Labels, map[string]string{
constants.LabelApplicationID: app.GetApplicationID(),
constants.LabelQueueName: app.GetQueue(),
constants.CanonicalLabelApplicationID: app.GetApplicationID(),
constants.CanonicalLabelQueueName: app.GetQueue(),
}),
Annotations: annotations,
OwnerReferences: ownerRefs,
Expand Down
8 changes: 4 additions & 4 deletions pkg/cache/placeholder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,10 @@ func TestNewPlaceholder(t *testing.T) {
assert.Equal(t, holder.pod.Name, "ph-name")
assert.Equal(t, holder.pod.Namespace, namespace)
assert.DeepEqual(t, holder.pod.Labels, map[string]string{
constants.LabelApplicationID: appID,
constants.LabelQueueName: queue,
"labelKey0": "labelKeyValue0",
"labelKey1": "labelKeyValue1",
constants.CanonicalLabelApplicationID: appID,
constants.CanonicalLabelQueueName: queue,
"labelKey0": "labelKeyValue0",
"labelKey1": "labelKeyValue1",
})
assert.Equal(t, len(holder.pod.Annotations), 6, "unexpected number of annotations")
assert.Equal(t, holder.pod.Annotations[constants.AnnotationTaskGroupName], app.taskGroups[0].Name)
Expand Down
83 changes: 76 additions & 7 deletions pkg/cache/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"context"
"fmt"
"strconv"
"strings"
"time"

"github.com/looplab/fsm"
Expand Down Expand Up @@ -187,6 +188,22 @@ func (task *Task) UpdateTaskPod(pod *v1.Pod, podMutator func(pod *v1.Pod)) (*v1.
return task.context.apiProvider.GetAPIs().KubeClient.UpdatePod(pod, podMutator)
}

func (task *Task) failTaskPodWithReasonAndMsg(reason string, msg string) {
podCopy := task.pod.DeepCopy()
podCopy.Status = v1.PodStatus{
Phase: v1.PodFailed,
Reason: reason,
Message: msg,
}
log.Log(log.ShimCacheTask).Info("setting pod to failed", zap.String("podName", podCopy.Name))
pod, err := task.UpdateTaskPodStatus(podCopy)
if err != nil {
log.Log(log.ShimCacheTask).Error("failed to update task pod status", zap.Error(err))

Check warning on line 201 in pkg/cache/task.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/task.go#L201

Added line #L201 was not covered by tests
} else {
log.Log(log.ShimCacheTask).Info("new pod status", zap.String("status", string(pod.Status.Phase)))
}
}

func (task *Task) isTerminated() bool {
for _, states := range TaskStates().Terminated {
if task.GetTaskState() == states {
Expand Down Expand Up @@ -457,16 +474,19 @@ func (task *Task) postTaskBound() {
}
}

func (task *Task) postTaskRejected() {
// currently, once task is rejected by scheduler, we directly move task to failed state.
// so this function simply triggers the state transition when it is rejected.
// but further, we can introduce retry mechanism if necessary.
func (task *Task) postTaskRejected(reason string) {
// if task is rejected because of conflicting metadata, we should fail the pod with reason
if strings.Contains(reason, constants.TaskPodInconsistMetadataFailure) {
task.failTaskPodWithReasonAndMsg(constants.TaskRejectedFailure, reason)

Check warning on line 480 in pkg/cache/task.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/task.go#L480

Added line #L480 was not covered by tests
}

// move task to failed state.
dispatcher.Dispatch(NewFailTaskEvent(task.applicationID, task.taskID,
fmt.Sprintf("task %s failed because it is rejected by scheduler", task.alias)))
fmt.Sprintf("task %s failed because it is rejected", task.alias)))

events.GetRecorder().Eventf(task.pod.DeepCopy(), nil,
v1.EventTypeWarning, "TaskRejected", "TaskRejected",
"Task %s is rejected by the scheduler", task.alias)
"Task %s is rejected", task.alias)
}

// beforeTaskFail releases the allocation or ask from scheduler core
Expand Down Expand Up @@ -543,7 +563,56 @@ func (task *Task) releaseAllocation() {
// some sanity checks before sending task for scheduling,
// this reduces the scheduling overhead by blocking such
// request away from the core scheduler.
func (task *Task) sanityCheckBeforeScheduling() error {
func (task *Task) sanityCheckBeforeScheduling() (error, bool) {
rejectTask := false

if err := task.checkPodPVCs(); err != nil {
return err, rejectTask

Check warning on line 570 in pkg/cache/task.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/task.go#L570

Added line #L570 was not covered by tests
}

// only check pod labels and annotations consistency if pod is not already bound
// reject the task if pod metadata is conflicting
if !utils.PodAlreadyBound(task.pod) {
if err := task.checkPodMetadata(); err != nil {
rejectTask = true
return err, rejectTask

Check warning on line 578 in pkg/cache/task.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/task.go#L576-L578

Added lines #L576 - L578 were not covered by tests
}
}

return nil, rejectTask
}

func (task *Task) checkPodMetadata() error {
// check application ID
appIdLabelKeys := []string{
constants.CanonicalLabelApplicationID,
constants.SparkLabelAppID,
constants.LabelApplicationID,
}
appIdAnnotationKeys := []string{
constants.AnnotationApplicationID,
}
if !utils.ValidatePodLabelAnnotationConsistency(task.pod, appIdLabelKeys, appIdAnnotationKeys) {
return fmt.Errorf("application ID is not consistently set in pod's labels and annotations. [%s]", constants.TaskPodInconsistMetadataFailure)
}

// check queue name
queueLabelKeys := []string{
constants.CanonicalLabelQueueName,
constants.LabelQueueName,
}

queueAnnotationKeys := []string{
constants.AnnotationQueueName,
}

if !utils.ValidatePodLabelAnnotationConsistency(task.pod, queueLabelKeys, queueAnnotationKeys) {
return fmt.Errorf("queue is not consistently set in pod's labels and annotations. [%s]", constants.TaskPodInconsistMetadataFailure)
}
return nil
}

func (task *Task) checkPodPVCs() error {
// Check PVCs used by the pod
namespace := task.pod.Namespace
manifest := &(task.pod.Spec)
Expand Down
11 changes: 10 additions & 1 deletion pkg/cache/task_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,16 @@ func newTaskState() *fsm.FSM {
},
states.Rejected: func(_ context.Context, event *fsm.Event) {
task := event.Args[0].(*Task) //nolint:errcheck
task.postTaskRejected()
eventArgs := make([]string, 1)
reason := ""
if err := events.GetEventArgsAsStrings(eventArgs, event.Args[1].([]interface{})); err != nil {
log.Log(log.ShimFSM).Error("failed to parse event arg", zap.Error(err))
reason = err.Error()

Check warning on line 402 in pkg/cache/task_state.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/task_state.go#L401-L402

Added lines #L401 - L402 were not covered by tests
} else {
reason = eventArgs[0]
}

task.postTaskRejected(reason)
},
states.Failed: func(_ context.Context, event *fsm.Event) {
task := event.Args[0].(*Task) //nolint:errcheck
Expand Down
130 changes: 130 additions & 0 deletions pkg/cache/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package cache

import (
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -698,6 +699,135 @@ func TestSimultaneousTaskCompleteAndAllocate(t *testing.T) {
assert.Equal(t, task1.GetTaskState(), TaskStates().Completed)
}

// nolint: funlen
func TestCheckPodMetadata(t *testing.T) {
const (
appID = "app01"
app2ID = "app02"
queueName = "root.sandbox1"
queue2Name = "root.sandbox2"
)
var appIdInconsitentErr = fmt.Errorf("application ID is not consistently set in pod's labels and annotations. [%s]", constants.TaskPodInconsistMetadataFailure)
var queueInconsitentErr = fmt.Errorf("queue is not consistently set in pod's labels and annotations. [%s]", constants.TaskPodInconsistMetadataFailure)

testCases := []struct {
name string
podLabels map[string]string
podAnnotations map[string]string
expected error
}{
{
"empty label and empty annotation in pod", nil, nil, nil,
},
{
"appId and queueName have no conflict",
map[string]string{
constants.CanonicalLabelApplicationID: appID,
constants.SparkLabelAppID: appID,
constants.LabelApp: appID,
constants.CanonicalLabelQueueName: queueName,
constants.LabelQueueName: queueName,
}, map[string]string{
constants.AnnotationApplicationID: appID,
constants.AnnotationQueueName: queueName,
},
nil,
},
{
"have conflict appId in canonical label",
map[string]string{
constants.CanonicalLabelApplicationID: app2ID,
constants.SparkLabelAppID: appID,
constants.LabelApplicationID: appID,
}, map[string]string{
constants.AnnotationApplicationID: appID,
},
appIdInconsitentErr,
},
{
"have conflict appId in spark label",
map[string]string{
constants.CanonicalLabelApplicationID: appID,
constants.SparkLabelAppID: app2ID,
constants.LabelApplicationID: appID,
}, map[string]string{
constants.AnnotationApplicationID: appID,
},
appIdInconsitentErr,
},
{
"have conflict appId in legacy label",
map[string]string{
constants.CanonicalLabelApplicationID: appID,
constants.SparkLabelAppID: appID,
constants.LabelApplicationID: app2ID,
}, map[string]string{
constants.AnnotationApplicationID: appID,
},
appIdInconsitentErr,
},
{
"have conflict appId in annotation",
map[string]string{
constants.CanonicalLabelApplicationID: appID,
constants.SparkLabelAppID: appID,
constants.LabelApplicationID: appID,
}, map[string]string{
constants.AnnotationApplicationID: app2ID,
},
appIdInconsitentErr,
},
{
"have conflict queueNmae in canonical label",
map[string]string{
constants.CanonicalLabelQueueName: queue2Name,
constants.LabelQueueName: queueName,
}, map[string]string{
constants.AnnotationQueueName: queueName,
},
queueInconsitentErr,
},
{
"have conflict queueNmae in legacy label",
map[string]string{
constants.CanonicalLabelQueueName: queueName,
constants.LabelQueueName: queue2Name,
}, map[string]string{
constants.AnnotationQueueName: queueName,
},
queueInconsitentErr,
},
{
"have conflict queueNmae in annotation",
map[string]string{
constants.CanonicalLabelQueueName: queueName,
constants.LabelQueueName: queueName,
}, map[string]string{
constants.AnnotationQueueName: queue2Name,
},
queueInconsitentErr,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
app := NewApplication(appID, "root.default", "user", testGroups, map[string]string{}, nil)
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Labels: tc.podLabels,
Annotations: tc.podAnnotations,
},
}
task := NewTask("task01", app, nil, pod)
err := task.checkPodMetadata()
if err != nil {
assert.Equal(t, tc.expected.Error(), err.Error())
} else {
assert.NilError(t, err)
}
})
}
}

func TestUpdatePodCondition(t *testing.T) {
condition := v1.PodCondition{
Type: v1.ContainersReady,
Expand Down
2 changes: 2 additions & 0 deletions pkg/common/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ var SchedulingPolicyStyleParamValues = map[string]string{"Hard": "Hard", "Soft":

const ApplicationInsufficientResourcesFailure = "ResourceReservationTimeout"
const ApplicationRejectedFailure = "ApplicationRejected"
const TaskRejectedFailure = "TaskRejected"
const TaskPodInconsistMetadataFailure = "PodInconsistentMetadata"

// namespace.max.* (Retaining for backwards compatibility. Need to be removed in next major release)
const CPUQuota = DomainYuniKorn + "namespace.max.cpu"
Expand Down
Loading

0 comments on commit 3bf0269

Please sign in to comment.