From e1a4445ede61f0083d18babc5967b23f0370f6d5 Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Tue, 30 Apr 2024 14:41:00 +0100 Subject: [PATCH] added a test for xtrigger callback failure. --- tests/integration/test_xtrigger_mgr.py | 43 ++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/tests/integration/test_xtrigger_mgr.py b/tests/integration/test_xtrigger_mgr.py index 612049163cc..3bf425650c4 100644 --- a/tests/integration/test_xtrigger_mgr.py +++ b/tests/integration/test_xtrigger_mgr.py @@ -188,3 +188,46 @@ def mytrig(*args, **kwargs): # check the DB to ensure no additional entries have been created assert db_select(schd, True, 'xtriggers') == db_xtriggers + + +async def test_error_in_xtrigger(flow, start, scheduler): + """Failure in an xtrigger is handled nicely. + """ + id_ = flow({ + 'scheduler': { + 'allow implicit tasks': 'True' + }, + 'scheduling': { + 'xtriggers': { + 'mytrig': 'mytrig()' + }, + 'graph': { + 'R1': '@mytrig => foo' + }, + } + }) + + # add a custom xtrigger to the workflow + run_dir = Path(get_workflow_run_dir(id_)) + xtrig_dir = run_dir / 'lib/python' + xtrig_dir.mkdir(parents=True) + (xtrig_dir / 'mytrig.py').write_text(dedent(''' + def mytrig(*args, **kwargs): + raise Exception('This Xtrigger is broken') + ''')) + + schd = scheduler(id_) + async with start(schd) as log: + foo = schd.pool.get_tasks()[0] + schd.xtrigger_mgr.call_xtriggers_async(foo) + for _ in range(50): + await asyncio.sleep(0.1) + schd.proc_pool.process() + if len(schd.proc_pool.runnings) == 0: + break + else: + raise Exception('Process pool did not clear') + + error = log.messages[-1].split('\n') + assert error[-2] == 'Exception: This Xtrigger is broken' + assert error[0] == 'ERROR in xtrigger mytrig()'