diff --git a/libs/libcommon/src/libcommon/queue/jobs.py b/libs/libcommon/src/libcommon/queue/jobs.py index 886369ac9..fd4b16f03 100644 --- a/libs/libcommon/src/libcommon/queue/jobs.py +++ b/libs/libcommon/src/libcommon/queue/jobs.py @@ -33,6 +33,7 @@ from libcommon.queue.lock import lock, release_lock, release_locks from libcommon.queue.metrics import ( decrease_metric, + decrease_worker_size_metrics, increase_metric, update_metrics_for_type, ) @@ -126,6 +127,7 @@ class JobQueryFilters(TypedDict, total=False): "priority": pa.string(), "status": pa.string(), "created_at": pa.timestamp("ms"), + "difficulty": pa.int64(), } ) @@ -368,7 +370,7 @@ def delete_waiting_jobs_by_job_id(self, job_ids: list[str]) -> int: try: existing = JobDocument.objects(pk__in=job_ids, status=Status.WAITING) for job in existing.all(): - decrease_metric(dataset=job.dataset, job_type=job.type, status=job.status, difficulty=job.difficulty) + decrease_metric(job_type=job.type, status=job.status, difficulty=job.difficulty) deleted_jobs = existing.delete() return 0 if deleted_jobs is None else deleted_jobs except Exception: @@ -685,9 +687,10 @@ def finish_job(self, job_id: str) -> Optional[Priority]: except StartedJobError as e: logging.error(f"job {job_id} has not the expected format for a started job. Aborting: {e}") return None - decrease_metric(dataset=job.dataset, job_type=job.type, status=job.status, difficulty=job.difficulty) + decrease_metric(job_type=job.type, status=job.status, difficulty=job.difficulty) + was_blocked = False if job.started_at is not None: - create_past_job( + was_blocked = create_past_job( dataset=job.dataset, started_at=pytz.UTC.localize(job.started_at), finished_at=get_datetime(), @@ -696,6 +699,11 @@ def finish_job(self, job_id: str) -> Optional[Priority]: job.delete() release_locks(owner=job_id) # ^ bug: the lock owner is not set to the job id anymore when calling start_job()! + if was_blocked: + pending_jobs = self.get_pending_jobs_df(dataset=job.dataset) + for _, pending_job in pending_jobs.iterrows(): + decrease_worker_size_metrics(pending_job["difficulty"]) + return job_priority def delete_dataset_waiting_jobs(self, dataset: str) -> int: @@ -710,7 +718,7 @@ def delete_dataset_waiting_jobs(self, dataset: str) -> int: """ existing_waiting_jobs = JobDocument.objects(dataset=dataset, status=Status.WAITING) for job in existing_waiting_jobs.no_cache(): - decrease_metric(dataset=job.dataset, job_type=job.type, status=job.status, difficulty=job.difficulty) + decrease_metric(job_type=job.type, status=job.status, difficulty=job.difficulty) release_lock(key=job.unicity_id) num_deleted_jobs = existing_waiting_jobs.delete() return 0 if num_deleted_jobs is None else num_deleted_jobs diff --git a/libs/libcommon/src/libcommon/queue/metrics.py b/libs/libcommon/src/libcommon/queue/metrics.py index edff91a39..dfddfe0fe 100644 --- a/libs/libcommon/src/libcommon/queue/metrics.py +++ b/libs/libcommon/src/libcommon/queue/metrics.py @@ -105,41 +105,47 @@ def get_worker_size(difficulty: int) -> WorkerSize: objects = QuerySetManager["WorkerSizeJobsCountDocument"]() -def _update_metrics(dataset: str, job_type: str, status: str, increase_by: int, difficulty: int) -> None: +def _update_metrics(job_type: str, status: str, increase_by: int) -> None: JobTotalMetricDocument.objects(job_type=job_type, status=status).update( upsert=True, write_concern={"w": "majority", "fsync": True}, read_concern={"level": "majority"}, inc__total=increase_by, ) - if status == Status.WAITING: - # Do not consider blocked datasets for auto-scaling metrics - if not is_blocked(dataset): - worker_size = WorkerSizeJobsCountDocument.get_worker_size(difficulty=difficulty) - WorkerSizeJobsCountDocument.objects(worker_size=worker_size).update( - upsert=True, - write_concern={"w": "majority", "fsync": True}, - read_concern={"level": "majority"}, - inc__jobs_count=increase_by, - ) -def increase_metric(dataset: str, job_type: str, status: str, difficulty: int) -> None: - _update_metrics( - dataset=dataset, job_type=job_type, status=status, increase_by=DEFAULT_INCREASE_AMOUNT, difficulty=difficulty +def _update_worker_size_metrics(increase_by: int, difficulty: int) -> None: + worker_size = WorkerSizeJobsCountDocument.get_worker_size(difficulty=difficulty) + WorkerSizeJobsCountDocument.objects(worker_size=worker_size).update( + upsert=True, + write_concern={"w": "majority", "fsync": True}, + read_concern={"level": "majority"}, + inc__jobs_count=increase_by, ) -def decrease_metric(dataset: str, job_type: str, status: str, difficulty: int) -> None: - _update_metrics( - dataset=dataset, job_type=job_type, status=status, increase_by=DEFAULT_DECREASE_AMOUNT, difficulty=difficulty - ) +def increase_metric(dataset: str, job_type: str, status: str, difficulty: int) -> None: + _update_metrics(job_type=job_type, status=status, increase_by=DEFAULT_INCREASE_AMOUNT) + + # Do not consider blocked datasets for auto-scaling metrics + if status == Status.WAITING and not is_blocked(dataset): + _update_worker_size_metrics(DEFAULT_INCREASE_AMOUNT, difficulty) + + +def decrease_metric(job_type: str, status: str, difficulty: int) -> None: + _update_metrics(job_type=job_type, status=status, increase_by=DEFAULT_DECREASE_AMOUNT) + if status == Status.WAITING: + _update_worker_size_metrics(DEFAULT_DECREASE_AMOUNT, difficulty) + + +def decrease_worker_size_metrics(difficulty: int) -> None: + _update_worker_size_metrics(DEFAULT_DECREASE_AMOUNT, difficulty) def update_metrics_for_type( dataset: str, job_type: str, previous_status: str, new_status: str, difficulty: int ) -> None: if job_type is not None: - decrease_metric(dataset=dataset, job_type=job_type, status=previous_status, difficulty=difficulty) + decrease_metric(job_type=job_type, status=previous_status, difficulty=difficulty) increase_metric(dataset=dataset, job_type=job_type, status=new_status, difficulty=difficulty) # ^ this does not affect WorkerSizeJobsCountDocument, so we don't pass the job difficulty diff --git a/libs/libcommon/src/libcommon/queue/past_jobs.py b/libs/libcommon/src/libcommon/queue/past_jobs.py index dd4effede..4268f4685 100644 --- a/libs/libcommon/src/libcommon/queue/past_jobs.py +++ b/libs/libcommon/src/libcommon/queue/past_jobs.py @@ -75,7 +75,7 @@ class PastJobDocument(Document): objects = QuerySetManager["PastJobDocument"]() -def create_past_job(dataset: str, started_at: datetime, finished_at: datetime) -> None: +def create_past_job(dataset: str, started_at: datetime, finished_at: datetime) -> bool: """Create a past job in the mongoDB database. After creating the entry, we check if it should be rate-limited (if it isn't yet), and if so, we block @@ -85,12 +85,17 @@ def create_past_job(dataset: str, started_at: datetime, finished_at: datetime) - dataset (`str`): The dataset on which to apply the job. started_at (`datetime`): The date the job has started. finished_at (`datetime`): The date the job has finished. + + Returns: + `bool`: If the dataset was blocked. """ duration = int((finished_at - started_at).total_seconds()) if duration < JOB_DURATION_MIN_SECONDS: - return + return False PastJobDocument(dataset=dataset, duration=duration, finished_at=finished_at).save() if not is_blocked(dataset) and duration > JOB_DURATION_CHECK_MIN_SECONDS: if PastJobDocument.objects(dataset=dataset).sum("duration") > DATASET_BLOCKAGE_THRESHOLD_SECONDS: block_dataset(dataset) + return True + return False diff --git a/libs/libcommon/tests/queue/test_jobs.py b/libs/libcommon/tests/queue/test_jobs.py index c763e0baf..eb2a1d341 100644 --- a/libs/libcommon/tests/queue/test_jobs.py +++ b/libs/libcommon/tests/queue/test_jobs.py @@ -141,6 +141,30 @@ def test_add_job() -> None: assert_past_jobs_number(1) +def test_finish_job_blocked() -> None: + test_type = "test_type" + test_dataset = "test_dataset" + test_revision = "test_revision" + test_difficulty = 50 + + queue = Queue() + assert WorkerSizeJobsCountDocument.objects().count() == 0 + + queue.add_job(job_type=test_type, dataset=test_dataset, revision=test_revision, difficulty=test_difficulty) + queue.add_job(job_type="test_type2", dataset=test_dataset, revision=test_revision, difficulty=test_difficulty) + queue.add_job(job_type="test_type3", dataset=test_dataset, revision=test_revision, difficulty=test_difficulty) + queue.add_job(job_type="test_type4", dataset=test_dataset, revision=test_revision, difficulty=test_difficulty) + + assert_metric_jobs_per_worker(worker_size=WorkerSize.medium, jobs_count=4) + + job_info = queue.start_job() + assert_metric_jobs_per_worker(worker_size=WorkerSize.medium, jobs_count=3) + + with patch("libcommon.queue.jobs.create_past_job", return_value=True): + queue.finish_job(job_id=job_info["job_id"]) + assert_metric_jobs_per_worker(worker_size=WorkerSize.medium, jobs_count=0) + + @pytest.mark.parametrize( "jobs_ids,job_ids_to_delete,expected_deleted_number", [ diff --git a/libs/libcommon/tests/test_orchestrator.py b/libs/libcommon/tests/test_orchestrator.py index e35e67602..f835a10da 100644 --- a/libs/libcommon/tests/test_orchestrator.py +++ b/libs/libcommon/tests/test_orchestrator.py @@ -266,7 +266,7 @@ def populate_queue() -> None: def test_get_pending_jobs_df() -> None: populate_queue() pending_jobs_df = Queue().get_pending_jobs_df(dataset=DATASET_NAME) - assert pending_jobs_df.shape == (250, 9) + assert pending_jobs_df.shape == (250, 10) @pytest.mark.parametrize(