Skip to content

Commit

Permalink
[YUNIKORN-2503] Use internal annotation prefix for placeholder pod
Browse files Browse the repository at this point in the history
  • Loading branch information
chenyulin0719 committed Mar 28, 2024
1 parent 8be3910 commit 2ca2372
Show file tree
Hide file tree
Showing 8 changed files with 47 additions and 103 deletions.
7 changes: 3 additions & 4 deletions pkg/cache/placeholder.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func newPlaceholder(placeholderName string, app *Application, taskGroup TaskGrou
// Here the owner reference is always the originator pod
ownerRefs := app.getPlaceholderOwnerReferences()
annotations := utils.MergeMaps(taskGroup.Annotations, map[string]string{
constants.AnnotationPlaceholderFlag: "true",
constants.AnnotationPlaceholderFlag: constants.True,
constants.AnnotationTaskGroupName: taskGroup.Name,
})

Expand Down Expand Up @@ -90,9 +90,8 @@ func newPlaceholder(placeholderName string, app *Application, taskGroup TaskGrou
Name: placeholderName,
Namespace: app.tags[constants.AppTagNamespace],
Labels: utils.MergeMaps(taskGroup.Labels, map[string]string{
constants.LabelApplicationID: app.GetApplicationID(),
constants.LabelQueueName: app.GetQueue(),
constants.LabelPlaceholderFlag: "true",
constants.LabelApplicationID: app.GetApplicationID(),
constants.LabelQueueName: app.GetQueue(),
}),
Annotations: annotations,
OwnerReferences: ownerRefs,
Expand Down
53 changes: 17 additions & 36 deletions pkg/cache/placeholder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ func TestNewPlaceholder(t *testing.T) {
testGroups, map[string]string{constants.AppTagNamespace: namespace, constants.AppTagImagePullSecrets: "secret1,secret2"},
mockedSchedulerAPI)
app.setTaskGroups(taskGroups)
marshalledTaskGroups, err := json.Marshal(taskGroups)
assert.NilError(t, err, "taskGroups marshalling failed")
app.setTaskGroupsDefinition(string(marshalledTaskGroups))

assert.Equal(t, app.placeholderAsk.Resources[siCommon.CPU].Value, int64(10*500))
assert.Equal(t, app.placeholderAsk.Resources[siCommon.Memory].Value, int64(10*1024*1000*1000))
Expand All @@ -108,12 +111,21 @@ func TestNewPlaceholder(t *testing.T) {
assert.Equal(t, holder.pod.Spec.SchedulerName, constants.SchedulerName)
assert.Equal(t, holder.pod.Name, "ph-name")
assert.Equal(t, holder.pod.Namespace, namespace)
assert.Equal(t, len(holder.pod.Labels), 5, "unexpected number of labels")
assert.Equal(t, holder.pod.Labels[constants.LabelApplicationID], appID)
assert.Equal(t, holder.pod.Labels[constants.LabelQueueName], queue)
assert.Equal(t, holder.pod.Labels["placeholder"], "true")
assert.Equal(t, len(holder.pod.Annotations), 5, "unexpected number of annotations")
assert.DeepEqual(t, holder.pod.Labels, map[string]string{
constants.LabelApplicationID: appID,
constants.LabelQueueName: queue,
"labelKey0": "labelKeyValue0",
"labelKey1": "labelKeyValue1",
})
assert.Equal(t, len(holder.pod.Annotations), 6, "unexpected number of annotations")
assert.Equal(t, holder.pod.Annotations[constants.AnnotationTaskGroupName], app.taskGroups[0].Name)
assert.Equal(t, holder.pod.Annotations[constants.AnnotationPlaceholderFlag], constants.True)
assert.Equal(t, holder.pod.Annotations["annotationKey0"], "annotationValue0")
assert.Equal(t, holder.pod.Annotations["annotationKey1"], "annotationValue1")
assert.Equal(t, holder.pod.Annotations["annotationKey2"], "annotationValue2")
var taskGroupsDef []TaskGroup
err = json.Unmarshal([]byte(holder.pod.Annotations[siCommon.DomainYuniKorn+"task-groups"]), &taskGroupsDef)
assert.NilError(t, err, "taskGroupsDef unmarshal failed")
assert.Equal(t, common.GetPodResource(holder.pod).Resources[siCommon.CPU].Value, int64(500))
assert.Equal(t, common.GetPodResource(holder.pod).Resources[siCommon.Memory].Value, int64(1024*1000*1000))
assert.Equal(t, common.GetPodResource(holder.pod).Resources["pods"].Value, int64(1))
Expand All @@ -130,37 +142,6 @@ func TestNewPlaceholder(t *testing.T) {
assert.Equal(t, "", holder.pod.Spec.PriorityClassName)
}

func TestNewPlaceholderWithLabelsAndAnnotations(t *testing.T) {
mockedSchedulerAPI := newMockSchedulerAPI()
app := NewApplication(appID, queue,
"bob", testGroups, map[string]string{constants.AppTagNamespace: namespace}, mockedSchedulerAPI)
app.setTaskGroups(taskGroups)
marshalledTaskGroups, err := json.Marshal(taskGroups)
assert.NilError(t, err, "taskGroups marshalling failed")
app.setTaskGroupsDefinition(string(marshalledTaskGroups))

holder := newPlaceholder("ph-name", app, app.taskGroups[0])

assert.DeepEqual(t, holder.pod.Labels, map[string]string{
"applicationId": "app01",
"labelKey0": "labelKeyValue0",
"labelKey1": "labelKeyValue1",
"placeholder": "true",
"queue": "root.default",
})

assert.Equal(t, len(holder.pod.Annotations), 6)
assert.Equal(t, holder.pod.Annotations["annotationKey0"], "annotationValue0")
assert.Equal(t, holder.pod.Annotations["annotationKey1"], "annotationValue1")
assert.Equal(t, holder.pod.Annotations["annotationKey2"], "annotationValue2")
var taskGroupsDef []TaskGroup
err = json.Unmarshal([]byte(holder.pod.Annotations[siCommon.DomainYuniKorn+"task-groups"]), &taskGroupsDef)
assert.NilError(t, err, "taskGroupsDef unmarshal failed")
var priority *int32
assert.Equal(t, priority, holder.pod.Spec.Priority)
assert.Equal(t, "", holder.pod.Spec.PriorityClassName)
}

func TestNewPlaceholderWithNodeSelectors(t *testing.T) {
mockedSchedulerAPI := newMockSchedulerAPI()
app := NewApplication(appID, queue,
Expand Down
4 changes: 2 additions & 2 deletions pkg/common/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const DefaultNodeAttributeRackNameKey = "si.io/rackname"
const DefaultNodeInstanceTypeNodeLabelKey = "node.kubernetes.io/instance-type"
const DefaultRackName = "/rack-default"
const DomainYuniKorn = siCommon.DomainYuniKorn
const DomainYuniKornInternal = siCommon.DomainYuniKornInternal

// Application
const LabelApp = "app"
Expand Down Expand Up @@ -65,8 +66,7 @@ const DaemonSetType = "DaemonSet"
const PlaceholderContainerImage = "registry.k8s.io/pause:3.7"
const PlaceholderContainerName = "pause"
const PlaceholderPodRestartPolicy = "Never"
const LabelPlaceholderFlag = "placeholder"
const AnnotationPlaceholderFlag = DomainYuniKorn + "placeholder"
const AnnotationPlaceholderFlag = DomainYuniKornInternal + "placeholder"
const AnnotationTaskGroupName = DomainYuniKorn + "task-group-name"
const AnnotationTaskGroups = DomainYuniKorn + "task-groups"
const AnnotationSchedulingPolicyParam = DomainYuniKorn + "schedulingPolicyParameters"
Expand Down
6 changes: 0 additions & 6 deletions pkg/common/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,11 +372,5 @@ func GetPlaceholderFlagFromPodSpec(pod *v1.Pod) bool {
return v
}
}

if value := GetPodLabelValue(pod, constants.LabelPlaceholderFlag); value != "" {
if v, err := strconv.ParseBool(value); err == nil {
return v
}
}
return false
}
34 changes: 3 additions & 31 deletions pkg/common/utils/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -983,34 +983,7 @@ func TestGetPlaceholderFlagFromPodSpec(t *testing.T) {
pod *v1.Pod
expectedPlaceholderFlag bool
}{

{"Setting by annotation", &v1.Pod{
TypeMeta: metav1.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "pod-01",
UID: "UID-01",
Annotations: map[string]string{
constants.AnnotationPlaceholderFlag: "true",
},
},
}, true},
{"Setting by label", &v1.Pod{
TypeMeta: metav1.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "pod-01",
UID: "UID-01",
Labels: map[string]string{
constants.LabelPlaceholderFlag: "true",
},
},
}, true},
{"Setting both annotation and label, annotation has higher priority", &v1.Pod{
{"Pod with placeholder annotation", &v1.Pod{
TypeMeta: metav1.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
Expand All @@ -1019,12 +992,11 @@ func TestGetPlaceholderFlagFromPodSpec(t *testing.T) {
Name: "pod-01",
UID: "UID-01",
Annotations: map[string]string{
constants.AnnotationPlaceholderFlag: "true",
constants.LabelPlaceholderFlag: "false",
constants.AnnotationPlaceholderFlag: constants.True,
},
},
}, true},
{"No setting both annotation and label", &v1.Pod{
{"Pod without placeholder annotation", &v1.Pod{
TypeMeta: metav1.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
Expand Down
36 changes: 17 additions & 19 deletions test/e2e/framework/helpers/k8s/k8s_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import (
resourcehelper "k8s.io/kubectl/pkg/util/resource"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"

"github.com/apache/yunikorn-k8shim/pkg/common/constants"
"github.com/apache/yunikorn-k8shim/pkg/common/utils"
"github.com/apache/yunikorn-k8shim/test/e2e/framework/configmanager"
"github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/common"
Expand Down Expand Up @@ -1241,12 +1242,12 @@ func (k *KubeCtl) WaitForPlaceholders(namespace string, podPrefix string, numPod

func (k *KubeCtl) ListPlaceholders(namespace string, podPrefix string) ([]v1.Pod, error) {
pods := make([]v1.Pod, 0)
podList, lstErr := k.ListPods(namespace, "placeholder=true")
podList, lstErr := k.ListPods(namespace, "")
if lstErr != nil {
return pods, lstErr
}
for _, pod := range podList.Items {
if strings.HasPrefix(pod.Name, podPrefix) {
if strings.HasPrefix(pod.Name, podPrefix) && pod.Annotations[constants.AnnotationPlaceholderFlag] == constants.True {
pods = append(pods, pod)
}
}
Expand All @@ -1263,18 +1264,17 @@ func (k *KubeCtl) WaitForPlaceholdersStableState(namespace string, podPrefix str

func (k *KubeCtl) isNumPlaceholdersRunning(namespace string, podPrefix string, num int, podPhase *v1.PodPhase) wait.ConditionFunc {
return func() (bool, error) {
jobPods, lstErr := k.ListPods(namespace, "placeholder=true")
phPods, lstErr := k.ListPlaceholders(namespace, podPrefix)
if lstErr != nil {
return false, lstErr
}

var count int
for _, pod := range jobPods.Items {
if strings.HasPrefix(pod.Name, podPrefix) && (podPhase == nil || *podPhase == pod.Status.Phase) {
for _, pod := range phPods {
if podPhase == nil || *podPhase == pod.Status.Phase {
count++
}
}

return count == num, nil
}
}
Expand All @@ -1283,26 +1283,24 @@ func (k *KubeCtl) isNumPlaceholdersRunning(namespace string, podPrefix string, n
func (k *KubeCtl) arePlaceholdersStable(namespace string, podPrefix string, samePhases *int,
maxAttempts int, phases map[string]v1.PodPhase) wait.ConditionFunc {
return func() (bool, error) {
jobPods, lstErr := k.ListPods(namespace, "placeholder=true")
jobPods, lstErr := k.ListPlaceholders(namespace, podPrefix)
if lstErr != nil {
return false, lstErr
}

needRetry := false
for _, pod := range jobPods.Items {
if strings.HasPrefix(pod.Name, podPrefix) {
currentPhase := pod.Status.Phase
prevPhase, ok := phases[pod.Name]
if !ok {
phases[pod.Name] = currentPhase
needRetry = true
continue
}
if prevPhase != currentPhase {
needRetry = true
}
for _, pod := range jobPods {
currentPhase := pod.Status.Phase
prevPhase, ok := phases[pod.Name]
if !ok {
phases[pod.Name] = currentPhase
needRetry = true
continue
}
if prevPhase != currentPhase {
needRetry = true
}
phases[pod.Name] = currentPhase
}

if needRetry {
Expand Down
1 change: 0 additions & 1 deletion test/e2e/framework/helpers/k8s/pod_annotation.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ type PodAnnotation struct {
const (
TaskGroupName = constants.DomainYuniKorn + "task-group-name"
TaskGroups = constants.DomainYuniKorn + "task-groups"
PlaceHolder = constants.DomainYuniKorn + "placeholder"
SchedulingPolicyParams = constants.DomainYuniKorn + "schedulingPolicyParameters"

MaxCPU = constants.DomainYuniKorn + "namespace.max.cpu"
Expand Down
9 changes: 5 additions & 4 deletions test/e2e/gang_scheduling/gang_scheduling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,14 +144,15 @@ var _ = Describe("", func() {
// Wait for placeholders to become running
stateRunning := v1.PodRunning
By("Wait for all placeholders running")
phErr := kClient.WaitForPlaceholders(ns, "tg-"+appID+"-", 15, 2*time.Minute, &stateRunning)
phPodPrefix := "tg-" + appID + "-"
phErr := kClient.WaitForPlaceholders(ns, phPodPrefix, 15, 2*time.Minute, &stateRunning)
Ω(phErr).NotTo(HaveOccurred())

// Check placeholder node distribution is same as real pods'
phPods, phListErr := kClient.ListPods(ns, "placeholder=true")
phPods, phListErr := kClient.ListPlaceholders(ns, phPodPrefix)
Ω(phListErr).NotTo(HaveOccurred())
taskGroupNodes := map[string]map[string]int{}
for _, ph := range phPods.Items {
for _, ph := range phPods {
tg, ok := ph.Annotations[constants.AnnotationTaskGroupName]
if !ok {
continue
Expand All @@ -174,7 +175,7 @@ var _ = Describe("", func() {
}

By("Wait for all placeholders terminated")
phTermErr := kClient.WaitForPlaceholders(ns, "tg-"+appID+"-", 0, 3*time.Minute, nil)
phTermErr := kClient.WaitForPlaceholders(ns, phPodPrefix, 0, 3*time.Minute, nil)
Ω(phTermErr).NotTo(HaveOccurred())

// Check real gang members now running on same node distribution
Expand Down

0 comments on commit 2ca2372

Please sign in to comment.