Skip to content

Commit

Permalink
[YUNIKORN-2578] Refactor SchedulerCache.GetPod() to remove returned b…
Browse files Browse the repository at this point in the history
…oolean (apache#837)

Closes: apache#837

Signed-off-by: Chia-Ping Tsai <[email protected]>
  • Loading branch information
ryankert01 authored and chia7712 committed May 13, 2024
1 parent 81058ad commit 3a231be
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 85 deletions.
25 changes: 12 additions & 13 deletions pkg/cache/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,8 +343,8 @@ func (ctx *Context) ensureAppAndTaskCreated(pod *v1.Pod) {

func (ctx *Context) updateForeignPod(pod *v1.Pod) {
podStatusBefore := ""
oldPod, ok := ctx.schedulerCache.GetPod(string(pod.UID))
if ok {
oldPod := ctx.schedulerCache.GetPod(string(pod.UID))
if oldPod != nil {
podStatusBefore = string(oldPod.Status.Phase)
}

Expand Down Expand Up @@ -439,8 +439,8 @@ func (ctx *Context) deleteForeignPod(pod *v1.Pod) {
ctx.lock.Lock()
defer ctx.lock.Unlock()

oldPod, ok := ctx.schedulerCache.GetPod(string(pod.UID))
if !ok {
oldPod := ctx.schedulerCache.GetPod(string(pod.UID))
if oldPod == nil {
// if pod is not in scheduler cache, no node updates are needed
log.Log(log.ShimContext).Debug("unknown foreign pod deleted, no resource updated needed",
zap.String("namespace", pod.Namespace),
Expand All @@ -452,7 +452,7 @@ func (ctx *Context) deleteForeignPod(pod *v1.Pod) {
// 1. pod is already assigned to a node
// 2. pod was not in a terminal state before
// 3. pod references a known node
if oldPod != nil && !utils.IsPodTerminated(oldPod) {
if !utils.IsPodTerminated(oldPod) {
if !ctx.schedulerCache.IsPodOrphaned(string(oldPod.UID)) {
log.Log(log.ShimContext).Debug("foreign pod deleted, triggering occupied resource update",
zap.String("namespace", pod.Namespace),
Expand Down Expand Up @@ -644,9 +644,8 @@ func (ctx *Context) EventsToRegister(queueingHintFn framework.QueueingHintFn) []
func (ctx *Context) IsPodFitNode(name, node string, allocate bool) error {
ctx.lock.RLock()
defer ctx.lock.RUnlock()
var pod *v1.Pod
var ok bool
if pod, ok = ctx.schedulerCache.GetPod(name); !ok {
pod := ctx.schedulerCache.GetPod(name)
if pod == nil {

Check warning on line 648 in pkg/cache/context.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/context.go#L647-L648

Added lines #L647 - L648 were not covered by tests
return ErrorPodNotFound
}
// if pod exists in cache, try to run predicates
Expand All @@ -672,7 +671,7 @@ func (ctx *Context) IsPodFitNodeViaPreemption(name, node string, allocations []s

ctx.lock.RLock()
defer ctx.lock.RUnlock()
if pod, _ := ctx.schedulerCache.GetPod(name); pod != nil {
if pod := ctx.schedulerCache.GetPod(name); pod != nil {

Check warning on line 674 in pkg/cache/context.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/context.go#L674

Added line #L674 was not covered by tests
// if pod exists in cache, try to run predicates
if targetNode := ctx.schedulerCache.GetNode(node); targetNode != nil {
// need to lock cache here as predicates need a stable view into the cache
Expand All @@ -682,7 +681,7 @@ func (ctx *Context) IsPodFitNodeViaPreemption(name, node string, allocations []s
// look up each victim in the scheduler cache
victims := make([]*v1.Pod, len(allocations))
for index, uid := range allocations {
victim, _ := ctx.schedulerCache.GetPodNoLock(uid)
victim := ctx.schedulerCache.GetPodNoLock(uid)

Check warning on line 684 in pkg/cache/context.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/context.go#L684

Added line #L684 was not covered by tests
victims[index] = victim
}

Expand All @@ -704,7 +703,7 @@ func (ctx *Context) bindPodVolumes(pod *v1.Pod) error {
// during scheduling process as they have directly impact to other scheduling processes.
// when assumePodVolumes was called, we caches the value if all pod volumes are bound in schedulerCache,
// then here we just need to retrieve that value from cache, to skip bindings if volumes are already bound.
if assumedPod, exist := ctx.schedulerCache.GetPod(podKey); exist {
if assumedPod := ctx.schedulerCache.GetPod(podKey); assumedPod != nil {
if ctx.schedulerCache.ArePodVolumesAllBound(podKey) {
log.Log(log.ShimContext).Info("Binding Pod Volumes skipped: all volumes already bound",
zap.String("podName", pod.Name))
Expand Down Expand Up @@ -782,7 +781,7 @@ func (ctx *Context) bindPodVolumes(pod *v1.Pod) error {
func (ctx *Context) AssumePod(name, node string) error {
ctx.lock.Lock()
defer ctx.lock.Unlock()
if pod, ok := ctx.schedulerCache.GetPod(name); ok {
if pod := ctx.schedulerCache.GetPod(name); pod != nil {
// when add assumed pod, we make a copy of the pod to avoid
// modifying its original reference. otherwise, it may have
// race when some other go-routines accessing it in parallel.
Expand Down Expand Up @@ -844,7 +843,7 @@ func (ctx *Context) ForgetPod(name string) {
ctx.lock.Lock()
defer ctx.lock.Unlock()

if pod, ok := ctx.schedulerCache.GetPod(name); ok {
if pod := ctx.schedulerCache.GetPod(name); pod != nil {

Check warning on line 846 in pkg/cache/context.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/context.go#L846

Added line #L846 was not covered by tests
log.Log(log.ShimContext).Debug("forget pod", zap.String("pod", pod.Name))
ctx.schedulerCache.ForgetPod(pod)
return
Expand Down
94 changes: 47 additions & 47 deletions pkg/cache/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,10 +424,10 @@ func TestAddPod(t *testing.T) {
context.AddPod(pod1) // should be added
context.AddPod(pod2) // should skip as pod is terminated

_, ok := context.schedulerCache.GetPod("UID-00001")
assert.Check(t, ok, "active pod was not added")
_, ok = context.schedulerCache.GetPod("UID-00002")
assert.Check(t, !ok, "terminated pod was added")
pod := context.schedulerCache.GetPod("UID-00001")
assert.Check(t, pod != nil, "active pod was not added")
pod = context.schedulerCache.GetPod("UID-00002")
assert.Check(t, pod == nil, "terminated pod was added")
}

func TestUpdatePod(t *testing.T) {
Expand Down Expand Up @@ -482,8 +482,8 @@ func TestUpdatePod(t *testing.T) {
}

context.AddPod(pod1)
_, ok := context.schedulerCache.GetPod("UID-00001")
assert.Assert(t, ok, "pod1 is not present after adding")
pod := context.schedulerCache.GetPod("UID-00001")
assert.Assert(t, pod != nil, "pod1 is not present after adding")

// these should not fail, but are no-ops
context.UpdatePod(nil, nil)
Expand All @@ -492,13 +492,13 @@ func TestUpdatePod(t *testing.T) {

// ensure a terminated pod is removed
context.UpdatePod(pod1, pod3)
_, ok = context.schedulerCache.GetPod("UID-00001")
assert.Check(t, !ok, "pod still found after termination")
pod = context.schedulerCache.GetPod("UID-00001")
assert.Check(t, pod == nil, "pod still found after termination")

// ensure a non-terminated pod is updated
context.UpdatePod(pod1, pod2)
found, ok := context.schedulerCache.GetPod("UID-00001")
if assert.Check(t, ok, "pod not found after update") {
found := context.schedulerCache.GetPod("UID-00001")
if assert.Check(t, found != nil, "pod not found after update") {
assert.Check(t, found.GetAnnotations()["test.state"] == "updated", "pod state not updated")
}
}
Expand Down Expand Up @@ -537,22 +537,22 @@ func TestDeletePod(t *testing.T) {

context.AddPod(pod1)
context.AddPod(pod2)
_, ok := context.schedulerCache.GetPod("UID-00001")
assert.Assert(t, ok, "pod1 is not present after adding")
_, ok = context.schedulerCache.GetPod("UID-00002")
assert.Assert(t, ok, "pod2 is not present after adding")
pod := context.schedulerCache.GetPod("UID-00001")
assert.Assert(t, pod != nil, "pod1 is not present after adding")
pod = context.schedulerCache.GetPod("UID-00002")
assert.Assert(t, pod != nil, "pod2 is not present after adding")

// these should not fail, but here for completeness
context.DeletePod(nil)
context.DeletePod(cache.DeletedFinalStateUnknown{Key: "UID-00000", Obj: nil})

context.DeletePod(pod1)
_, ok = context.schedulerCache.GetPod("UID-00001")
assert.Check(t, !ok, "pod1 is still present")
pod = context.schedulerCache.GetPod("UID-00001")
assert.Check(t, pod == nil, "pod1 is still present")

context.DeletePod(cache.DeletedFinalStateUnknown{Key: "UID-00002", Obj: pod2})
_, ok = context.schedulerCache.GetPod("UID-00002")
assert.Check(t, !ok, "pod2 is still present")
pod = context.schedulerCache.GetPod("UID-00002")
assert.Check(t, pod == nil, "pod2 is still present")
}

//nolint:funlen
Expand Down Expand Up @@ -622,16 +622,16 @@ func TestAddUpdatePodForeign(t *testing.T) {
expectRemove = false
context.AddPod(pod1)
assert.Assert(t, !executed, "unexpected update")
_, ok := context.schedulerCache.GetPod(string(pod1.UID))
assert.Assert(t, !ok, "unassigned pod found in cache")
pod := context.schedulerCache.GetPod(string(pod1.UID))
assert.Assert(t, pod == nil, "unassigned pod found in cache")

// validate update
tc = "update-pod1"
executed = false
expectRemove = false
context.UpdatePod(nil, pod1)
assert.Assert(t, !executed, "unexpected update")
assert.Assert(t, !ok, "unassigned pod found in cache")
assert.Assert(t, pod == nil, "unassigned pod found in cache")

// pod is assigned to a node but still in pending state, should update
pod2 := foreignPod("pod2", "1G", "500m")
Expand All @@ -645,8 +645,8 @@ func TestAddUpdatePodForeign(t *testing.T) {
expectRemove = false
context.AddPod(pod2)
assert.Assert(t, executed, "updated expected")
_, ok = context.schedulerCache.GetPod(string(pod2.UID))
assert.Assert(t, ok, "pod not found in cache")
pod = context.schedulerCache.GetPod(string(pod2.UID))
assert.Assert(t, pod != nil, "pod not found in cache")

// validate update
tc = "update-pod2"
Expand All @@ -655,8 +655,8 @@ func TestAddUpdatePodForeign(t *testing.T) {
expectRemove = false
context.UpdatePod(nil, pod2)
assert.Assert(t, !executed, "unexpected update")
_, ok = context.schedulerCache.GetPod(string(pod2.UID))
assert.Assert(t, ok, "pod not found in cache")
pod = context.schedulerCache.GetPod(string(pod2.UID))
assert.Assert(t, pod != nil, "pod not found in cache")

// validate update when not already in cache
tc = "update-pod2-nocache-pre"
Expand All @@ -671,8 +671,8 @@ func TestAddUpdatePodForeign(t *testing.T) {
expectRemove = false
context.UpdatePod(nil, pod2)
assert.Assert(t, executed, "expected update")
_, ok = context.schedulerCache.GetPod(string(pod2.UID))
assert.Assert(t, ok, "pod not found in cache")
pod = context.schedulerCache.GetPod(string(pod2.UID))
assert.Assert(t, pod != nil, "pod not found in cache")

// pod is failed, should trigger update if already in cache
pod3 := pod2.DeepCopy()
Expand All @@ -685,8 +685,8 @@ func TestAddUpdatePodForeign(t *testing.T) {
expectRemove = true
context.AddPod(pod3)
assert.Assert(t, executed, "expected update")
_, ok = context.schedulerCache.GetPod(string(pod3.UID))
assert.Assert(t, !ok, "failed pod found in cache")
pod = context.schedulerCache.GetPod(string(pod3.UID))
assert.Assert(t, pod == nil, "failed pod found in cache")

// validate update when not already in cache
tc = "update-pod3-pre"
Expand All @@ -700,8 +700,8 @@ func TestAddUpdatePodForeign(t *testing.T) {
expectRemove = true
context.UpdatePod(nil, pod3)
assert.Assert(t, executed, "expected update")
_, ok = context.schedulerCache.GetPod(string(pod3.UID))
assert.Assert(t, !ok, "failed pod found in cache")
pod = context.schedulerCache.GetPod(string(pod3.UID))
assert.Assert(t, pod == nil, "failed pod found in cache")
}

func TestDeletePodForeign(t *testing.T) {
Expand Down Expand Up @@ -777,8 +777,8 @@ func TestDeletePodForeign(t *testing.T) {
expectRemove = true
context.DeletePod(pod1)
assert.Assert(t, executed, "update not executed")
_, ok := context.schedulerCache.GetPod(string(pod1.UID))
assert.Assert(t, !ok, "deleted pod found in cache")
pod := context.schedulerCache.GetPod(string(pod1.UID))
assert.Assert(t, pod == nil, "deleted pod found in cache")

// validate delete when not already found
tc = "delete-pod1-again"
Expand All @@ -787,8 +787,8 @@ func TestDeletePodForeign(t *testing.T) {
expectRemove = false
context.DeletePod(pod1)
assert.Assert(t, !executed, "unexpected update")
_, ok = context.schedulerCache.GetPod(string(pod1.UID))
assert.Assert(t, !ok, "deleted pod found in cache")
pod = context.schedulerCache.GetPod(string(pod1.UID))
assert.Assert(t, pod == nil, "deleted pod found in cache")
}

func TestAddTask(t *testing.T) {
Expand Down Expand Up @@ -2142,8 +2142,8 @@ func TestAssumePod(t *testing.T) {
err := context.AssumePod(pod1UID, fakeNodeName)
assert.NilError(t, err)
assert.Assert(t, context.schedulerCache.ArePodVolumesAllBound(pod1UID))
assumedPod, ok := context.schedulerCache.GetPod(pod1UID)
assert.Assert(t, ok, "pod not found in cache")
assumedPod := context.schedulerCache.GetPod(pod1UID)
assert.Assert(t, assumedPod != nil, "pod not found in cache")
assert.Equal(t, assumedPod.Spec.NodeName, fakeNodeName)
assert.Assert(t, context.schedulerCache.IsAssumedPod(pod1UID))
}
Expand All @@ -2159,8 +2159,8 @@ func TestAssumePod_GetPodVolumeClaimsError(t *testing.T) {
err := context.AssumePod(pod1UID, fakeNodeName)
assert.Error(t, err, errMsg)
assert.Assert(t, !context.schedulerCache.IsAssumedPod(pod1UID))
podInCache, ok := context.schedulerCache.GetPod(pod1UID)
assert.Assert(t, ok, "pod not found in cache")
podInCache := context.schedulerCache.GetPod(pod1UID)
assert.Assert(t, podInCache != nil, "pod not found in cache")
assert.Equal(t, podInCache.Spec.NodeName, "", "NodeName in pod spec was set unexpectedly")
}

Expand All @@ -2175,8 +2175,8 @@ func TestAssumePod_FindPodVolumesError(t *testing.T) {
err := context.AssumePod(pod1UID, fakeNodeName)
assert.Error(t, err, errMsg)
assert.Assert(t, !context.schedulerCache.IsAssumedPod(pod1UID))
podInCache, ok := context.schedulerCache.GetPod(pod1UID)
assert.Assert(t, ok, "pod not found in cache")
podInCache := context.schedulerCache.GetPod(pod1UID)
assert.Assert(t, podInCache != nil, "pod not found in cache")
assert.Equal(t, podInCache.Spec.NodeName, "", "NodeName in pod spec was set unexpectedly")
}

Expand All @@ -2190,8 +2190,8 @@ func TestAssumePod_ConflictingVolumes(t *testing.T) {
err := context.AssumePod(pod1UID, fakeNodeName)
assert.Error(t, err, "pod my-pod-1 has conflicting volume claims: reason1, reason2")
assert.Assert(t, !context.schedulerCache.IsAssumedPod(pod1UID))
podInCache, ok := context.schedulerCache.GetPod(pod1UID)
assert.Assert(t, ok, "pod not found in cache")
podInCache := context.schedulerCache.GetPod(pod1UID)
assert.Assert(t, podInCache != nil, "pod not found in cache")
assert.Equal(t, podInCache.Spec.NodeName, "", "NodeName in pod spec was set unexpectedly")
}

Expand All @@ -2206,8 +2206,8 @@ func TestAssumePod_AssumePodVolumesError(t *testing.T) {
err := context.AssumePod(pod1UID, fakeNodeName)
assert.Error(t, err, errMsg)
assert.Assert(t, !context.schedulerCache.IsAssumedPod(pod1UID))
podInCache, ok := context.schedulerCache.GetPod(pod1UID)
assert.Assert(t, ok, "pod not found in cache")
podInCache := context.schedulerCache.GetPod(pod1UID)
assert.Assert(t, podInCache != nil, "pod not found in cache")
assert.Equal(t, podInCache.Spec.NodeName, "", "NodeName in pod spec was set unexpectedly")
}

Expand All @@ -2219,8 +2219,8 @@ func TestAssumePod_PodNotFound(t *testing.T) {
err := context.AssumePod("nonexisting", fakeNodeName)
assert.NilError(t, err)
assert.Assert(t, !context.schedulerCache.IsAssumedPod(pod1UID))
podInCache, ok := context.schedulerCache.GetPod(pod1UID)
assert.Assert(t, ok)
podInCache := context.schedulerCache.GetPod(pod1UID)
assert.Assert(t, podInCache != nil)
assert.Equal(t, podInCache.Spec.NodeName, "", "NodeName in pod spec was set unexpectedly")
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/cache/external/scheduler_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ func (cache *SchedulerCache) NotifyTaskSchedulerAction(taskID string) {
cache.lock.Lock()
defer cache.lock.Unlock()
// verify that the pod exists in the cache, otherwise ignore
if _, ok := cache.GetPodNoLock(taskID); !ok {
if pod := cache.GetPodNoLock(taskID); pod == nil {

Check warning on line 381 in pkg/cache/external/scheduler_cache.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/external/scheduler_cache.go#L381

Added line #L381 was not covered by tests
return
}
cache.addSchedulingTask(taskID)
Expand Down Expand Up @@ -635,7 +635,7 @@ func (filter *taskBloomFilter) isTaskMaybePresent(taskID string) bool {
return true
}

func (cache *SchedulerCache) GetPod(uid string) (*v1.Pod, bool) {
func (cache *SchedulerCache) GetPod(uid string) *v1.Pod {
cache.lock.RLock()
defer cache.lock.RUnlock()
return cache.GetPodNoLock(uid)
Expand All @@ -648,11 +648,11 @@ func (cache *SchedulerCache) IsPodOrphaned(uid string) bool {
return ok
}

func (cache *SchedulerCache) GetPodNoLock(uid string) (*v1.Pod, bool) {
func (cache *SchedulerCache) GetPodNoLock(uid string) *v1.Pod {
if pod, ok := cache.podsMap[uid]; ok {
return pod, true
return pod
}
return nil, false
return nil
}

func (cache *SchedulerCache) AssumePod(pod *v1.Pod, allBound bool) {
Expand Down
Loading

0 comments on commit 3a231be

Please sign in to comment.