From f20eda15ed808effac475a2495e44acfce4188b9 Mon Sep 17 00:00:00 2001 From: Abhishek Malvankar Date: Thu, 7 Sep 2023 21:32:50 -0400 Subject: [PATCH] fix duplicate requeues --- .../queuejob/queuejob_controller_ex.go | 68 +++++++++---------- 1 file changed, 34 insertions(+), 34 deletions(-) diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index ae74370c..b74b0d83 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -1642,6 +1642,40 @@ func (cc *XController) addQueueJob(obj interface{}) { klog.V(6).Infof("[Informer-addQJ] enqueue %s &qj=%p Version=%s Status=%+v", qj.Name, qj, qj.ResourceVersion, qj.Status) cc.enqueue(qj) + // Requeue the item to be processed again in 30 seconds. + //TODO: tune the frequency of reprocessing an AW + hasCompletionStatus := false + for _, genericItem := range qj.Spec.AggrResources.GenericItems { + if len(genericItem.CompletionStatus) > 0 { + hasCompletionStatus = true + } + } + //When an AW entrs a system with completionstatus keep checking the AW until completed + //updatequeuejobs now runs as a part of informer machinery. optimization here is to not use etcd to pullout submitted AWs and operate + //on stale AWs. This has potential to improve performance at scale. + //if qj.Status.State != arbv1.AppWrapperStateCompleted && qj.Status.State != arbv1.AppWrapperStateFailed && qj.Status.State != "" { + requeueInterval := 30 * time.Second + key, err := cache.MetaNamespaceKeyFunc(qj) + if err == nil { + go func() { + for { + time.Sleep(requeueInterval) + latestAw, exists, err := cc.appwrapperInformer.Informer().GetStore().GetByKey(key) + if latestAw.(*arbv1.AppWrapper).Status.State != arbv1.AppWrapperStateActive && latestAw.(*arbv1.AppWrapper).Status.State != arbv1.AppWrapperStateEnqueued && latestAw.(*arbv1.AppWrapper).Status.State != arbv1.AppWrapperStateRunningHoldCompletion { + klog.V(2).Infof("[Informer-addQJ] Stopping requeue for AW %s with status %s", latestAw.(*arbv1.AppWrapper).Name, latestAw.(*arbv1.AppWrapper).Status.State) + break //Exit the loop + } + if err == nil && exists { + // Enqueue the latest copy of the AW. + if (qj.Status.State != arbv1.AppWrapperStateCompleted && qj.Status.State != arbv1.AppWrapperStateFailed) && hasCompletionStatus { + cc.UpdateQueueJobs(latestAw.(*arbv1.AppWrapper)) + klog.V(2).Infof("[Informer-addQJ] Finished requeing AW to determine completion status") + } + } + } + }() + } + //} } func (cc *XController) updateQueueJob(oldObj, newObj interface{}) { @@ -1685,40 +1719,6 @@ func (cc *XController) updateQueueJob(oldObj, newObj interface{}) { // cc.eventQueue.Delete(oldObj) if notBackedoff { cc.enqueue(newQJ) - - // Requeue the item to be processed again in 30 seconds. - //TODO: tune the frequency of reprocessing an AW - hasCompletionStatus := false - for _, genericItem := range newQJ.Spec.AggrResources.GenericItems { - if len(genericItem.CompletionStatus) > 0 { - hasCompletionStatus = true - } - } - //updatequeuejobs now runs as a part of informer machinery. optimization here is to not use etcd to pullout submitted AWs and operate - //on stale AWs. This has potential to improve performance at scale. - if newQJ.Status.State != arbv1.AppWrapperStateCompleted && newQJ.Status.State != arbv1.AppWrapperStateFailed && newQJ.Status.State != "" { - requeueInterval := 30 * time.Second - key, err := cache.MetaNamespaceKeyFunc(newQJ) - if err == nil { - go func() { - for { - time.Sleep(requeueInterval) - latestAw, exists, err := cc.appwrapperInformer.Informer().GetStore().GetByKey(key) - if latestAw.(*arbv1.AppWrapper).Status.State != arbv1.AppWrapperStateActive && latestAw.(*arbv1.AppWrapper).Status.State != arbv1.AppWrapperStateEnqueued && latestAw.(*arbv1.AppWrapper).Status.State != arbv1.AppWrapperStateRunningHoldCompletion || !exists { - klog.V(2).Infof("[Informer-updateQJ] Stopping requeue for AW %s with status %s", latestAw.(*arbv1.AppWrapper).Name, latestAw.(*arbv1.AppWrapper).Status.State) - break //Exit the loop - } - if err == nil && exists { - // Enqueue the latest copy of the AW. - if (newQJ.Status.State != arbv1.AppWrapperStateCompleted && newQJ.Status.State != arbv1.AppWrapperStateFailed) && hasCompletionStatus { - cc.UpdateQueueJobs(latestAw.(*arbv1.AppWrapper)) - klog.V(2).Infof("[Informer-updateQJ] Finished requeing AW to determine completion status") - } - } - } - }() - } - } } }