Skip to content

Commit

Permalink
Decrease worker size metrics when dataset is blocked (#2972)
Browse files Browse the repository at this point in the history
* Decrease worker size metrics when dataset is blocked

* Apply suggestions from code review

Co-authored-by: Sylvain Lesage <[email protected]>

---------

Co-authored-by: Sylvain Lesage <[email protected]>
  • Loading branch information
AndreaFrancis and severo authored Jul 8, 2024
1 parent fe605b0 commit 97b3fd9
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 26 deletions.
16 changes: 12 additions & 4 deletions libs/libcommon/src/libcommon/queue/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from libcommon.queue.lock import lock, release_lock, release_locks
from libcommon.queue.metrics import (
decrease_metric,
decrease_worker_size_metrics,
increase_metric,
update_metrics_for_type,
)
Expand Down Expand Up @@ -126,6 +127,7 @@ class JobQueryFilters(TypedDict, total=False):
"priority": pa.string(),
"status": pa.string(),
"created_at": pa.timestamp("ms"),
"difficulty": pa.int64(),
}
)

Expand Down Expand Up @@ -368,7 +370,7 @@ def delete_waiting_jobs_by_job_id(self, job_ids: list[str]) -> int:
try:
existing = JobDocument.objects(pk__in=job_ids, status=Status.WAITING)
for job in existing.all():
decrease_metric(dataset=job.dataset, job_type=job.type, status=job.status, difficulty=job.difficulty)
decrease_metric(job_type=job.type, status=job.status, difficulty=job.difficulty)
deleted_jobs = existing.delete()
return 0 if deleted_jobs is None else deleted_jobs
except Exception:
Expand Down Expand Up @@ -685,9 +687,10 @@ def finish_job(self, job_id: str) -> Optional[Priority]:
except StartedJobError as e:
logging.error(f"job {job_id} has not the expected format for a started job. Aborting: {e}")
return None
decrease_metric(dataset=job.dataset, job_type=job.type, status=job.status, difficulty=job.difficulty)
decrease_metric(job_type=job.type, status=job.status, difficulty=job.difficulty)
was_blocked = False
if job.started_at is not None:
create_past_job(
was_blocked = create_past_job(
dataset=job.dataset,
started_at=pytz.UTC.localize(job.started_at),
finished_at=get_datetime(),
Expand All @@ -696,6 +699,11 @@ def finish_job(self, job_id: str) -> Optional[Priority]:
job.delete()
release_locks(owner=job_id)
# ^ bug: the lock owner is not set to the job id anymore when calling start_job()!
if was_blocked:
pending_jobs = self.get_pending_jobs_df(dataset=job.dataset)
for _, pending_job in pending_jobs.iterrows():
decrease_worker_size_metrics(pending_job["difficulty"])

return job_priority

def delete_dataset_waiting_jobs(self, dataset: str) -> int:
Expand All @@ -710,7 +718,7 @@ def delete_dataset_waiting_jobs(self, dataset: str) -> int:
"""
existing_waiting_jobs = JobDocument.objects(dataset=dataset, status=Status.WAITING)
for job in existing_waiting_jobs.no_cache():
decrease_metric(dataset=job.dataset, job_type=job.type, status=job.status, difficulty=job.difficulty)
decrease_metric(job_type=job.type, status=job.status, difficulty=job.difficulty)
release_lock(key=job.unicity_id)
num_deleted_jobs = existing_waiting_jobs.delete()
return 0 if num_deleted_jobs is None else num_deleted_jobs
Expand Down
44 changes: 25 additions & 19 deletions libs/libcommon/src/libcommon/queue/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,41 +105,47 @@ def get_worker_size(difficulty: int) -> WorkerSize:
objects = QuerySetManager["WorkerSizeJobsCountDocument"]()


def _update_metrics(dataset: str, job_type: str, status: str, increase_by: int, difficulty: int) -> None:
def _update_metrics(job_type: str, status: str, increase_by: int) -> None:
JobTotalMetricDocument.objects(job_type=job_type, status=status).update(
upsert=True,
write_concern={"w": "majority", "fsync": True},
read_concern={"level": "majority"},
inc__total=increase_by,
)
if status == Status.WAITING:
# Do not consider blocked datasets for auto-scaling metrics
if not is_blocked(dataset):
worker_size = WorkerSizeJobsCountDocument.get_worker_size(difficulty=difficulty)
WorkerSizeJobsCountDocument.objects(worker_size=worker_size).update(
upsert=True,
write_concern={"w": "majority", "fsync": True},
read_concern={"level": "majority"},
inc__jobs_count=increase_by,
)


def increase_metric(dataset: str, job_type: str, status: str, difficulty: int) -> None:
_update_metrics(
dataset=dataset, job_type=job_type, status=status, increase_by=DEFAULT_INCREASE_AMOUNT, difficulty=difficulty
def _update_worker_size_metrics(increase_by: int, difficulty: int) -> None:
worker_size = WorkerSizeJobsCountDocument.get_worker_size(difficulty=difficulty)
WorkerSizeJobsCountDocument.objects(worker_size=worker_size).update(
upsert=True,
write_concern={"w": "majority", "fsync": True},
read_concern={"level": "majority"},
inc__jobs_count=increase_by,
)


def decrease_metric(dataset: str, job_type: str, status: str, difficulty: int) -> None:
_update_metrics(
dataset=dataset, job_type=job_type, status=status, increase_by=DEFAULT_DECREASE_AMOUNT, difficulty=difficulty
)
def increase_metric(dataset: str, job_type: str, status: str, difficulty: int) -> None:
_update_metrics(job_type=job_type, status=status, increase_by=DEFAULT_INCREASE_AMOUNT)

# Do not consider blocked datasets for auto-scaling metrics
if status == Status.WAITING and not is_blocked(dataset):
_update_worker_size_metrics(DEFAULT_INCREASE_AMOUNT, difficulty)


def decrease_metric(job_type: str, status: str, difficulty: int) -> None:
_update_metrics(job_type=job_type, status=status, increase_by=DEFAULT_DECREASE_AMOUNT)
if status == Status.WAITING:
_update_worker_size_metrics(DEFAULT_DECREASE_AMOUNT, difficulty)


def decrease_worker_size_metrics(difficulty: int) -> None:
_update_worker_size_metrics(DEFAULT_DECREASE_AMOUNT, difficulty)


def update_metrics_for_type(
dataset: str, job_type: str, previous_status: str, new_status: str, difficulty: int
) -> None:
if job_type is not None:
decrease_metric(dataset=dataset, job_type=job_type, status=previous_status, difficulty=difficulty)
decrease_metric(job_type=job_type, status=previous_status, difficulty=difficulty)
increase_metric(dataset=dataset, job_type=job_type, status=new_status, difficulty=difficulty)
# ^ this does not affect WorkerSizeJobsCountDocument, so we don't pass the job difficulty
9 changes: 7 additions & 2 deletions libs/libcommon/src/libcommon/queue/past_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class PastJobDocument(Document):
objects = QuerySetManager["PastJobDocument"]()


def create_past_job(dataset: str, started_at: datetime, finished_at: datetime) -> None:
def create_past_job(dataset: str, started_at: datetime, finished_at: datetime) -> bool:
"""Create a past job in the mongoDB database.
After creating the entry, we check if it should be rate-limited (if it isn't yet), and if so, we block
Expand All @@ -85,12 +85,17 @@ def create_past_job(dataset: str, started_at: datetime, finished_at: datetime) -
dataset (`str`): The dataset on which to apply the job.
started_at (`datetime`): The date the job has started.
finished_at (`datetime`): The date the job has finished.
Returns:
`bool`: If the dataset was blocked.
"""
duration = int((finished_at - started_at).total_seconds())
if duration < JOB_DURATION_MIN_SECONDS:
return
return False
PastJobDocument(dataset=dataset, duration=duration, finished_at=finished_at).save()

if not is_blocked(dataset) and duration > JOB_DURATION_CHECK_MIN_SECONDS:
if PastJobDocument.objects(dataset=dataset).sum("duration") > DATASET_BLOCKAGE_THRESHOLD_SECONDS:
block_dataset(dataset)
return True
return False
24 changes: 24 additions & 0 deletions libs/libcommon/tests/queue/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,30 @@ def test_add_job() -> None:
assert_past_jobs_number(1)


def test_finish_job_blocked() -> None:
test_type = "test_type"
test_dataset = "test_dataset"
test_revision = "test_revision"
test_difficulty = 50

queue = Queue()
assert WorkerSizeJobsCountDocument.objects().count() == 0

queue.add_job(job_type=test_type, dataset=test_dataset, revision=test_revision, difficulty=test_difficulty)
queue.add_job(job_type="test_type2", dataset=test_dataset, revision=test_revision, difficulty=test_difficulty)
queue.add_job(job_type="test_type3", dataset=test_dataset, revision=test_revision, difficulty=test_difficulty)
queue.add_job(job_type="test_type4", dataset=test_dataset, revision=test_revision, difficulty=test_difficulty)

assert_metric_jobs_per_worker(worker_size=WorkerSize.medium, jobs_count=4)

job_info = queue.start_job()
assert_metric_jobs_per_worker(worker_size=WorkerSize.medium, jobs_count=3)

with patch("libcommon.queue.jobs.create_past_job", return_value=True):
queue.finish_job(job_id=job_info["job_id"])
assert_metric_jobs_per_worker(worker_size=WorkerSize.medium, jobs_count=0)


@pytest.mark.parametrize(
"jobs_ids,job_ids_to_delete,expected_deleted_number",
[
Expand Down
2 changes: 1 addition & 1 deletion libs/libcommon/tests/test_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ def populate_queue() -> None:
def test_get_pending_jobs_df() -> None:
populate_queue()
pending_jobs_df = Queue().get_pending_jobs_df(dataset=DATASET_NAME)
assert pending_jobs_df.shape == (250, 9)
assert pending_jobs_df.shape == (250, 10)


@pytest.mark.parametrize(
Expand Down

0 comments on commit 97b3fd9

Please sign in to comment.