Skip to content

Commit

Permalink
fix duplicate requeues
Browse files Browse the repository at this point in the history
  • Loading branch information
asm582 committed Sep 8, 2023
1 parent 9135383 commit f20eda1
Showing 1 changed file with 34 additions and 34 deletions.
68 changes: 34 additions & 34 deletions pkg/controller/queuejob/queuejob_controller_ex.go
Original file line number Diff line number Diff line change
Expand Up @@ -1642,6 +1642,40 @@ func (cc *XController) addQueueJob(obj interface{}) {

klog.V(6).Infof("[Informer-addQJ] enqueue %s &qj=%p Version=%s Status=%+v", qj.Name, qj, qj.ResourceVersion, qj.Status)
cc.enqueue(qj)
// Requeue the item to be processed again in 30 seconds.
//TODO: tune the frequency of reprocessing an AW
hasCompletionStatus := false
for _, genericItem := range qj.Spec.AggrResources.GenericItems {
if len(genericItem.CompletionStatus) > 0 {
hasCompletionStatus = true
}
}
//When an AW entrs a system with completionstatus keep checking the AW until completed
//updatequeuejobs now runs as a part of informer machinery. optimization here is to not use etcd to pullout submitted AWs and operate
//on stale AWs. This has potential to improve performance at scale.
//if qj.Status.State != arbv1.AppWrapperStateCompleted && qj.Status.State != arbv1.AppWrapperStateFailed && qj.Status.State != "" {
requeueInterval := 30 * time.Second
key, err := cache.MetaNamespaceKeyFunc(qj)
if err == nil {
go func() {
for {
time.Sleep(requeueInterval)
latestAw, exists, err := cc.appwrapperInformer.Informer().GetStore().GetByKey(key)
if latestAw.(*arbv1.AppWrapper).Status.State != arbv1.AppWrapperStateActive && latestAw.(*arbv1.AppWrapper).Status.State != arbv1.AppWrapperStateEnqueued && latestAw.(*arbv1.AppWrapper).Status.State != arbv1.AppWrapperStateRunningHoldCompletion {
klog.V(2).Infof("[Informer-addQJ] Stopping requeue for AW %s with status %s", latestAw.(*arbv1.AppWrapper).Name, latestAw.(*arbv1.AppWrapper).Status.State)
break //Exit the loop
}
if err == nil && exists {
// Enqueue the latest copy of the AW.
if (qj.Status.State != arbv1.AppWrapperStateCompleted && qj.Status.State != arbv1.AppWrapperStateFailed) && hasCompletionStatus {
cc.UpdateQueueJobs(latestAw.(*arbv1.AppWrapper))
klog.V(2).Infof("[Informer-addQJ] Finished requeing AW to determine completion status")
}
}
}
}()
}
//}
}

func (cc *XController) updateQueueJob(oldObj, newObj interface{}) {
Expand Down Expand Up @@ -1685,40 +1719,6 @@ func (cc *XController) updateQueueJob(oldObj, newObj interface{}) {
// cc.eventQueue.Delete(oldObj)
if notBackedoff {
cc.enqueue(newQJ)

// Requeue the item to be processed again in 30 seconds.
//TODO: tune the frequency of reprocessing an AW
hasCompletionStatus := false
for _, genericItem := range newQJ.Spec.AggrResources.GenericItems {
if len(genericItem.CompletionStatus) > 0 {
hasCompletionStatus = true
}
}
//updatequeuejobs now runs as a part of informer machinery. optimization here is to not use etcd to pullout submitted AWs and operate
//on stale AWs. This has potential to improve performance at scale.
if newQJ.Status.State != arbv1.AppWrapperStateCompleted && newQJ.Status.State != arbv1.AppWrapperStateFailed && newQJ.Status.State != "" {
requeueInterval := 30 * time.Second
key, err := cache.MetaNamespaceKeyFunc(newQJ)
if err == nil {
go func() {
for {
time.Sleep(requeueInterval)
latestAw, exists, err := cc.appwrapperInformer.Informer().GetStore().GetByKey(key)
if latestAw.(*arbv1.AppWrapper).Status.State != arbv1.AppWrapperStateActive && latestAw.(*arbv1.AppWrapper).Status.State != arbv1.AppWrapperStateEnqueued && latestAw.(*arbv1.AppWrapper).Status.State != arbv1.AppWrapperStateRunningHoldCompletion || !exists {
klog.V(2).Infof("[Informer-updateQJ] Stopping requeue for AW %s with status %s", latestAw.(*arbv1.AppWrapper).Name, latestAw.(*arbv1.AppWrapper).Status.State)
break //Exit the loop
}
if err == nil && exists {
// Enqueue the latest copy of the AW.
if (newQJ.Status.State != arbv1.AppWrapperStateCompleted && newQJ.Status.State != arbv1.AppWrapperStateFailed) && hasCompletionStatus {
cc.UpdateQueueJobs(latestAw.(*arbv1.AppWrapper))
klog.V(2).Infof("[Informer-updateQJ] Finished requeing AW to determine completion status")
}
}
}
}()
}
}
}

}
Expand Down

0 comments on commit f20eda1

Please sign in to comment.