Skip to content

Commit

Permalink
Only apply the "unstick" task pending workaround to V2 queues
Browse files Browse the repository at this point in the history
  • Loading branch information
sambles committed Jan 20, 2025
1 parent 9e7e7c4 commit 0d9733d
Showing 1 changed file with 9 additions and 8 deletions.
17 changes: 9 additions & 8 deletions kubernetes/worker-controller/src/autoscaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,14 +166,15 @@ async def parse_queued_pending(self, msg) -> [RunningAnalysis]:

# Check for pending analyses
if (queue_name not in ['celery', 'celery-v2', 'task-controller']):
queued_task_count = entry.get('queue', {}).get('queued_count', 0) # sub-task queued (API DB status)
queue_message_count = entry.get('queue', {}).get('queue_message_count', 0) # queue has messages
queued_count = max(queued_task_count, queue_message_count)

if (queued_count > 0) and not analyses_list:
# a task is queued, but no analyses are running.
# worker-controller might have missed the analysis displatch
pending_analyses[f'pending-task_{queue_name}'] = RunningAnalysis(id=None, tasks=1, queue_names=[queue_name], priority=4)
if queue_name.endswith('v2'):
queued_task_count = entry.get('queue', {}).get('queued_count', 0) # sub-task queued (API DB status)
queue_message_count = entry.get('queue', {}).get('queue_message_count', 0) # queue has messages
queued_count = max(queued_task_count, queue_message_count)

if (queued_count > 0) and not analyses_list:
# a task is queued, but no analyses are running.
# worker-controller might have missed the analysis displatch
pending_analyses[f'pending-task_{queue_name}'] = RunningAnalysis(id=None, tasks=1, queue_names=[queue_name], priority=4)

return pending_analyses

Expand Down

0 comments on commit 0d9733d

Please sign in to comment.