Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(pubsub): fix unhandled exception in subscriber task #815

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions pubsub/gcloud/aio/pubsub/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -542,14 +542,33 @@ async def subscribe(

for task in producer_tasks:
task.cancel()
await asyncio.wait(producer_tasks, return_when=asyncio.ALL_COMPLETED)
producer_done, _ = await asyncio.wait(
producer_tasks,
return_when=asyncio.ALL_COMPLETED,
)

for task in consumer_tasks:
task.cancel()
await asyncio.wait(consumer_tasks, return_when=asyncio.ALL_COMPLETED)
consumer_done, _ = await asyncio.wait(
consumer_tasks,
return_when=asyncio.ALL_COMPLETED,
)

for task in acker_tasks:
task.cancel()
await asyncio.wait(acker_tasks, return_when=asyncio.ALL_COMPLETED)
acker_done, _ = await asyncio.wait(
acker_tasks,
return_when=asyncio.ALL_COMPLETED,
)

done = producer_done | consumer_done | acker_done
task_results = await asyncio.gather(*done, return_exceptions=True)
for result in task_results:
if isinstance(result, Exception) and not isinstance(
result, asyncio.CancelledError):
log.info(
'subscriber task exited with error',
exc_info=result,
)

raise asyncio.CancelledError('subscriber shut down')
49 changes: 49 additions & 0 deletions pubsub/tests/unit/subscriber_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -944,3 +944,52 @@ async def test_subscribe_integrates_whole_chain(
# verify that the subscriber shuts down gracefully
with pytest.raises(asyncio.CancelledError):
await asyncio.wait_for(subscribe_task, 1)

@pytest.mark.asyncio
async def test_task_error_after_cancel(
subscriber_client,
application_callback,
):
def exception_handler(_loop, context) -> None:
pytest.fail(f"Unhandled exception: {context['message']}")

# Ensure the test fails on unhandled exceptions
asyncio.get_running_loop().set_exception_handler(exception_handler)

pull_ret = asyncio.Future()
pull_called = asyncio.Event()

async def pull(*_args, **_kwargs):
pull_called.set()
return await pull_ret

subscriber_client.pull = pull

subscribe_task = asyncio.ensure_future(
subscribe(
'fake_subscription', application_callback,
subscriber_client, num_producers=1,
max_messages_per_producer=100, ack_window=0.0,
ack_deadline_cache_timeout=1000,
num_tasks_per_consumer=1, enable_nack=True,
nack_window=0.0,
),
)

# Wait for the subscriber's producer task to call `pull()`
await pull_called.wait()

# Cancel the subscribe task
subscribe_task.cancel()

# Yield control to the event loop to allow the cancellation to be
# handled and graceful termination of the worker to occur
await asyncio.sleep(0)

# Cause the ongoing `pull()` invocation in the worker task to raise an
# error
pull_ret.set_exception(aiohttp.ServerConnectionError('pull error'))

# verify that the subscriber still shuts down gracefully
with pytest.raises(asyncio.CancelledError):
await asyncio.wait_for(subscribe_task, 1)