Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[YUNIKORN-2504] Support canonical labels for queue/applicationId in scheduler #54

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 21 additions & 24 deletions pkg/cache/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,11 @@
for _, task := range app.GetNewTasks() {
if taskScheduleCondition(task) {
// for each new task, we do a sanity check before moving the state to Pending_Schedule
if err := task.sanityCheckBeforeScheduling(); err == nil {
// if the task is not ready for scheduling, we keep it in New state
// if the task pod is bounded and have conflicting metadata, we move the task to Rejected state
err, rejectTask := task.sanityCheckBeforeScheduling()

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perform a sanity check before move this task to Pending state.

Before this PR, sanity check only check PVC's readiness

  • If sanity check passed, move task state from 'New' -> 'Pending'
  • If sanity check failed, task state remains in 'New' (Will be checked again in next schedule cycle)

After this PR (Sanity check check PVC and Pod Metadata)

  • if sanity check passed, 'New' -> 'Pending'
  • if sanity check fails due to PVC -> 'New' (No change)
  • if sanity check fails due to a unbound pod with inconsistent metadata (AppID/Label), move task state from 'New' to 'Rejected'

Design decision: Only reject unbound pods because we don't want to failed existing running pod after restart YK.

if err == nil {
// note, if we directly trigger submit task event, it may spawn too many duplicate
// events, because a task might be submitted multiple times before its state transits to PENDING.
if handleErr := task.handle(
Expand All @@ -406,11 +410,20 @@
log.Log(log.ShimCacheApplication).Warn("init task failed", zap.Error(err))
}
} else {
events.GetRecorder().Eventf(task.GetTaskPod().DeepCopy(), nil, v1.EventTypeWarning, "FailedScheduling", "FailedScheduling", err.Error())
log.Log(log.ShimCacheApplication).Debug("task is not ready for scheduling",
zap.String("appID", task.applicationID),
zap.String("taskID", task.taskID),
zap.Error(err))
if !rejectTask {

Check warning on line 413 in pkg/cache/application.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/application.go#L413

Added line #L413 was not covered by tests
// no state transition
events.GetRecorder().Eventf(task.GetTaskPod().DeepCopy(), nil, v1.EventTypeWarning, "FailedScheduling", "FailedScheduling", err.Error())
log.Log(log.ShimCacheApplication).Debug("task is not ready for scheduling",
zap.String("appID", task.applicationID),
zap.String("taskID", task.taskID),
zap.Error(err))
} else {

Check warning on line 420 in pkg/cache/application.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/application.go#L415-L420

Added lines #L415 - L420 were not covered by tests
// task transits to Rejected state
if handleErr := task.handle(
NewRejectTaskEvent(task.applicationID, task.taskID, err.Error())); handleErr != nil {
log.Log(log.ShimCacheApplication).Warn("reject task failed", zap.Error(err))

Check warning on line 424 in pkg/cache/application.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/application.go#L422-L424

Added lines #L422 - L424 were not covered by tests
}
}
}
}
}
Expand Down Expand Up @@ -568,22 +581,6 @@
}()
}

func failTaskPodWithReasonAndMsg(task *Task, reason string, msg string) {
podCopy := task.GetTaskPod().DeepCopy()
podCopy.Status = v1.PodStatus{
Phase: v1.PodFailed,
Reason: reason,
Message: msg,
}
log.Log(log.ShimCacheApplication).Info("setting pod to failed", zap.String("podName", task.GetTaskPod().Name))
pod, err := task.UpdateTaskPodStatus(podCopy)
if err != nil {
log.Log(log.ShimCacheApplication).Error("failed to update task pod status", zap.Error(err))
} else {
log.Log(log.ShimCacheApplication).Info("new pod status", zap.String("status", string(pod.Status.Phase)))
}
}

func (app *Application) handleFailApplicationEvent(errMsg string) {
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to failTaskPodWithReasonAndMsg() to task.go

change

  • podCopy := task.GetTaskPod().DeepCopy()
    to
  • podCopy := task.pod.DeepCopy()

to prevent deadlock when task state machine is handling TaskRejected event.

go func() {
getPlaceholderManager().cleanUp(app)
Expand All @@ -598,10 +595,10 @@
for _, task := range unalloc {
// Only need to fail the non-placeholder pod(s)
if strings.Contains(errMsg, constants.ApplicationInsufficientResourcesFailure) {
failTaskPodWithReasonAndMsg(task, constants.ApplicationInsufficientResourcesFailure, "Scheduling has timed out due to insufficient resources")
task.failTaskPodWithReasonAndMsg(constants.ApplicationInsufficientResourcesFailure, "Scheduling has timed out due to insufficient resources")
} else if strings.Contains(errMsg, constants.ApplicationRejectedFailure) {
errMsgArr := strings.Split(errMsg, ":")
failTaskPodWithReasonAndMsg(task, constants.ApplicationRejectedFailure, errMsgArr[1])
task.failTaskPodWithReasonAndMsg(constants.ApplicationRejectedFailure, errMsgArr[1])
}
events.GetRecorder().Eventf(task.GetTaskPod().DeepCopy(), nil, v1.EventTypeWarning, "ApplicationFailed", "ApplicationFailed",
"Application %s scheduling failed, reason: %s", app.applicationID, errMsg)
Expand Down
4 changes: 2 additions & 2 deletions pkg/cache/placeholder.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ func newPlaceholder(placeholderName string, app *Application, taskGroup TaskGrou
Name: placeholderName,
Namespace: app.tags[constants.AppTagNamespace],
Labels: utils.MergeMaps(taskGroup.Labels, map[string]string{
constants.LabelApplicationID: app.GetApplicationID(),
constants.LabelQueueName: app.GetQueue(),
constants.CanonicalLabelApplicationID: app.GetApplicationID(),
constants.CanonicalLabelQueueName: app.GetQueue(),
Comment on lines +93 to +94
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note:
We can directly use canonical representation for placeholder here.
The newer version shim allows legacy and canonical representation metadata coexists.

}),
Annotations: annotations,
OwnerReferences: ownerRefs,
Expand Down
8 changes: 4 additions & 4 deletions pkg/cache/placeholder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,10 @@ func TestNewPlaceholder(t *testing.T) {
assert.Equal(t, holder.pod.Name, "ph-name")
assert.Equal(t, holder.pod.Namespace, namespace)
assert.DeepEqual(t, holder.pod.Labels, map[string]string{
constants.LabelApplicationID: appID,
constants.LabelQueueName: queue,
"labelKey0": "labelKeyValue0",
"labelKey1": "labelKeyValue1",
constants.CanonicalLabelApplicationID: appID,
constants.CanonicalLabelQueueName: queue,
"labelKey0": "labelKeyValue0",
"labelKey1": "labelKeyValue1",
})
assert.Equal(t, len(holder.pod.Annotations), 6, "unexpected number of annotations")
assert.Equal(t, holder.pod.Annotations[constants.AnnotationTaskGroupName], app.taskGroups[0].Name)
Expand Down
83 changes: 76 additions & 7 deletions pkg/cache/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"context"
"fmt"
"strconv"
"strings"
"time"

"github.com/looplab/fsm"
Expand Down Expand Up @@ -187,6 +188,22 @@
return task.context.apiProvider.GetAPIs().KubeClient.UpdatePod(pod, podMutator)
}

func (task *Task) failTaskPodWithReasonAndMsg(reason string, msg string) {
podCopy := task.pod.DeepCopy()
podCopy.Status = v1.PodStatus{
Phase: v1.PodFailed,
Reason: reason,
Message: msg,
}
log.Log(log.ShimCacheTask).Info("setting pod to failed", zap.String("podName", podCopy.Name))
pod, err := task.UpdateTaskPodStatus(podCopy)
if err != nil {
log.Log(log.ShimCacheTask).Error("failed to update task pod status", zap.Error(err))

Check warning on line 201 in pkg/cache/task.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/task.go#L201

Added line #L201 was not covered by tests
} else {
log.Log(log.ShimCacheTask).Info("new pod status", zap.String("status", string(pod.Status.Phase)))
}
}

func (task *Task) isTerminated() bool {
for _, states := range TaskStates().Terminated {
if task.GetTaskState() == states {
Expand Down Expand Up @@ -457,16 +474,19 @@
}
}

func (task *Task) postTaskRejected() {
// currently, once task is rejected by scheduler, we directly move task to failed state.
// so this function simply triggers the state transition when it is rejected.
// but further, we can introduce retry mechanism if necessary.
func (task *Task) postTaskRejected(reason string) {
// if task is rejected because of conflicting metadata, we should fail the pod with reason
if strings.Contains(reason, constants.TaskPodInconsistMetadataFailure) {
task.failTaskPodWithReasonAndMsg(constants.TaskRejectedFailure, reason)

Check warning on line 480 in pkg/cache/task.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/task.go#L480

Added line #L480 was not covered by tests
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fail the pod if the task's reject reason is inconsistent metadata.

}

// move task to failed state.
dispatcher.Dispatch(NewFailTaskEvent(task.applicationID, task.taskID,
fmt.Sprintf("task %s failed because it is rejected by scheduler", task.alias)))
fmt.Sprintf("task %s failed because it is rejected", task.alias)))

events.GetRecorder().Eventf(task.pod.DeepCopy(), nil,
v1.EventTypeWarning, "TaskRejected", "TaskRejected",
"Task %s is rejected by the scheduler", task.alias)
"Task %s is rejected", task.alias)
}

// beforeTaskFail releases the allocation or ask from scheduler core
Expand Down Expand Up @@ -543,7 +563,56 @@
// some sanity checks before sending task for scheduling,
// this reduces the scheduling overhead by blocking such
// request away from the core scheduler.
func (task *Task) sanityCheckBeforeScheduling() error {
func (task *Task) sanityCheckBeforeScheduling() (error, bool) {
rejectTask := false

if err := task.checkPodPVCs(); err != nil {
return err, rejectTask

Check warning on line 570 in pkg/cache/task.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/task.go#L570

Added line #L570 was not covered by tests
}

// only check pod labels and annotations consistency if pod is not already bound
// reject the task if pod metadata is conflicting
if !utils.PodAlreadyBound(task.pod) {
if err := task.checkPodMetadata(); err != nil {
rejectTask = true
return err, rejectTask

Check warning on line 578 in pkg/cache/task.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/task.go#L576-L578

Added lines #L576 - L578 were not covered by tests
}
}

return nil, rejectTask
}

func (task *Task) checkPodMetadata() error {
// check application ID
appIdLabelKeys := []string{
constants.CanonicalLabelApplicationID,
constants.SparkLabelAppID,
constants.LabelApplicationID,
}
appIdAnnotationKeys := []string{
constants.AnnotationApplicationID,
}
if !utils.ValidatePodLabelAnnotationConsistency(task.pod, appIdLabelKeys, appIdAnnotationKeys) {
return fmt.Errorf("application ID is not consistently set in pod's labels and annotations. [%s]", constants.TaskPodInconsistMetadataFailure)
}

// check queue name
queueLabelKeys := []string{
constants.CanonicalLabelQueueName,
constants.LabelQueueName,
}

queueAnnotationKeys := []string{
constants.AnnotationQueueName,
}

if !utils.ValidatePodLabelAnnotationConsistency(task.pod, queueLabelKeys, queueAnnotationKeys) {
return fmt.Errorf("queue is not consistently set in pod's labels and annotations. [%s]", constants.TaskPodInconsistMetadataFailure)
}
return nil
}

func (task *Task) checkPodPVCs() error {
// Check PVCs used by the pod
namespace := task.pod.Namespace
manifest := &(task.pod.Spec)
Expand Down
10 changes: 9 additions & 1 deletion pkg/cache/task_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,15 @@
},
states.Rejected: func(_ context.Context, event *fsm.Event) {
task := event.Args[0].(*Task) //nolint:errcheck
task.postTaskRejected()
eventArgs := make([]string, 1)
reason := ""
if err := events.GetEventArgsAsStrings(eventArgs, event.Args[1].([]interface{})); err != nil {
log.Log(log.ShimFSM).Error("failed to parse event arg", zap.Error(err))
reason = err.Error()

Check warning on line 403 in pkg/cache/task_state.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/task_state.go#L402-L403

Added lines #L402 - L403 were not covered by tests
} else {
reason = eventArgs[0]
}
task.postTaskRejected(reason)
},
states.Failed: func(_ context.Context, event *fsm.Event) {
task := event.Args[0].(*Task) //nolint:errcheck
Expand Down
130 changes: 130 additions & 0 deletions pkg/cache/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package cache

import (
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -699,6 +700,135 @@ func TestSimultaneousTaskCompleteAndAllocate(t *testing.T) {
assert.Equal(t, task1.GetTaskState(), TaskStates().Completed)
}

// nolint: funlen
func TestCheckPodMetadata(t *testing.T) {
const (
appID = "app01"
app2ID = "app02"
queueName = "root.sandbox1"
queue2Name = "root.sandbox2"
)
var appIdInconsitentErr = fmt.Errorf("application ID is not consistently set in pod's labels and annotations. [%s]", constants.TaskPodInconsistMetadataFailure)
var queueInconsitentErr = fmt.Errorf("queue is not consistently set in pod's labels and annotations. [%s]", constants.TaskPodInconsistMetadataFailure)

testCases := []struct {
name string
podLabels map[string]string
podAnnotations map[string]string
expected error
}{
{
"empty label and empty annotation in pod", nil, nil, nil,
},
{
"appId and queueName have no conflict",
map[string]string{
constants.CanonicalLabelApplicationID: appID,
constants.SparkLabelAppID: appID,
constants.LabelApp: appID,
constants.CanonicalLabelQueueName: queueName,
constants.LabelQueueName: queueName,
}, map[string]string{
constants.AnnotationApplicationID: appID,
constants.AnnotationQueueName: queueName,
},
nil,
},
{
"have conflict appId in canonical label",
map[string]string{
constants.CanonicalLabelApplicationID: app2ID,
constants.SparkLabelAppID: appID,
constants.LabelApplicationID: appID,
}, map[string]string{
constants.AnnotationApplicationID: appID,
},
appIdInconsitentErr,
},
{
"have conflict appId in spark label",
map[string]string{
constants.CanonicalLabelApplicationID: appID,
constants.SparkLabelAppID: app2ID,
constants.LabelApplicationID: appID,
}, map[string]string{
constants.AnnotationApplicationID: appID,
},
appIdInconsitentErr,
},
{
"have conflict appId in legacy label",
map[string]string{
constants.CanonicalLabelApplicationID: appID,
constants.SparkLabelAppID: appID,
constants.LabelApplicationID: app2ID,
}, map[string]string{
constants.AnnotationApplicationID: appID,
},
appIdInconsitentErr,
},
{
"have conflict appId in annotation",
map[string]string{
constants.CanonicalLabelApplicationID: appID,
constants.SparkLabelAppID: appID,
constants.LabelApplicationID: appID,
}, map[string]string{
constants.AnnotationApplicationID: app2ID,
},
appIdInconsitentErr,
},
{
"have conflict queueNmae in canonical label",
map[string]string{
constants.CanonicalLabelQueueName: queue2Name,
constants.LabelQueueName: queueName,
}, map[string]string{
constants.AnnotationQueueName: queueName,
},
queueInconsitentErr,
},
{
"have conflict queueNmae in legacy label",
map[string]string{
constants.CanonicalLabelQueueName: queueName,
constants.LabelQueueName: queue2Name,
}, map[string]string{
constants.AnnotationQueueName: queueName,
},
queueInconsitentErr,
},
{
"have conflict queueNmae in annotation",
map[string]string{
constants.CanonicalLabelQueueName: queueName,
constants.LabelQueueName: queueName,
}, map[string]string{
constants.AnnotationQueueName: queue2Name,
},
queueInconsitentErr,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
app := NewApplication(appID, "root.default", "user", testGroups, map[string]string{}, nil)
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Labels: tc.podLabels,
Annotations: tc.podAnnotations,
},
}
task := NewTask("task01", app, nil, pod)
err := task.checkPodMetadata()
if err != nil {
assert.Equal(t, tc.expected.Error(), err.Error())
} else {
assert.NilError(t, err)
}
})
}
}

func TestUpdatePodCondition(t *testing.T) {
condition := v1.PodCondition{
Type: v1.ContainersReady,
Expand Down
2 changes: 2 additions & 0 deletions pkg/common/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ var SchedulingPolicyStyleParamValues = map[string]string{"Hard": "Hard", "Soft":

const ApplicationInsufficientResourcesFailure = "ResourceReservationTimeout"
const ApplicationRejectedFailure = "ApplicationRejected"
const TaskRejectedFailure = "TaskRejected"
const TaskPodInconsistMetadataFailure = "PodInconsistentMetadata"

// namespace.max.* (Retaining for backwards compatibility. Need to be removed in next major release)
const CPUQuota = DomainYuniKorn + "namespace.max.cpu"
Expand Down
Loading
Loading