Skip to content

Commit

Permalink
fix(tests): missing worker coverage - test idling
Browse files Browse the repository at this point in the history
  • Loading branch information
akhileshh committed Oct 22, 2024
1 parent 8ec81a3 commit 58138ca
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 0 deletions.
22 changes: 22 additions & 0 deletions tests/unit/mazepa/test_end_to_end_workflow_sqs.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,21 @@ def queues_with_worker(task_queue, outcome_queue):
yield task_queue, outcome_queue, worker


@pytest.fixture
def queues_with_idle_worker(task_queue, outcome_queue):
max_runtime = 10.0
worker = partial(
mazepa.run_worker,
task_queue=task_queue,
outcome_queue=outcome_queue,
sleep_sec=0.2,
max_runtime=max_runtime,
debug=True,
idle_timeout=0.5,
)
yield worker, max_runtime


def return_false_fn(*args, **kwargs):
return False

Expand Down Expand Up @@ -146,3 +161,10 @@ def test_worker_task_pull_error(queues_with_worker, mocker) -> None:
exc = outcomes[0].payload.outcome.exception
assert isinstance(exc, RuntimeError)
assert "hola" in str(exc)


def test_idling_worker(queues_with_idle_worker) -> None:
worker, max_runtime = queues_with_idle_worker
start = time.time()
worker()
assert (time.time() - start) < max_runtime
2 changes: 2 additions & 0 deletions zetta_utils/run/gc.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ def _delete_k8s_resource(resource_id: str, resource: Resource) -> bool:
namespace="default",
propagation_policy="Foreground",
)
deregister_resource(resource_id)
except k8s_client.ApiException as exc:
if exc.status == 404:
success = True
Expand Down Expand Up @@ -174,6 +175,7 @@ def _delete_sqs_queues(resources: dict[str, Resource]) -> bool: # pragma: no co
logger.info(f"Deleting SQS queue `{resource.name}`")
queue_url = sqs_client.get_queue_url(QueueName=resource.name)["QueueUrl"]
sqs_client.delete_queue(QueueUrl=queue_url)
deregister_resource(resource_id)
except sqs_client.exceptions.QueueDoesNotExist as exc:
logger.info(f"Queue does not exist: `{resource.name}`: {exc}")
deregister_resource(resource_id)
Expand Down

0 comments on commit 58138ca

Please sign in to comment.