Skip to content

Commit

Permalink
added a test for xtrigger callback failure.
Browse files Browse the repository at this point in the history
  • Loading branch information
wxtim committed May 1, 2024
1 parent 551f0bd commit e1a4445
Showing 1 changed file with 43 additions and 0 deletions.
43 changes: 43 additions & 0 deletions tests/integration/test_xtrigger_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,3 +188,46 @@ def mytrig(*args, **kwargs):

# check the DB to ensure no additional entries have been created
assert db_select(schd, True, 'xtriggers') == db_xtriggers


async def test_error_in_xtrigger(flow, start, scheduler):
"""Failure in an xtrigger is handled nicely.
"""
id_ = flow({
'scheduler': {
'allow implicit tasks': 'True'
},
'scheduling': {
'xtriggers': {
'mytrig': 'mytrig()'
},
'graph': {
'R1': '@mytrig => foo'
},
}
})

# add a custom xtrigger to the workflow
run_dir = Path(get_workflow_run_dir(id_))
xtrig_dir = run_dir / 'lib/python'
xtrig_dir.mkdir(parents=True)
(xtrig_dir / 'mytrig.py').write_text(dedent('''
def mytrig(*args, **kwargs):
raise Exception('This Xtrigger is broken')
'''))

schd = scheduler(id_)
async with start(schd) as log:
foo = schd.pool.get_tasks()[0]
schd.xtrigger_mgr.call_xtriggers_async(foo)
for _ in range(50):
await asyncio.sleep(0.1)
schd.proc_pool.process()
if len(schd.proc_pool.runnings) == 0:
break
else:
raise Exception('Process pool did not clear')

error = log.messages[-1].split('\n')
assert error[-2] == 'Exception: This Xtrigger is broken'
assert error[0] == 'ERROR in xtrigger mytrig()'

0 comments on commit e1a4445

Please sign in to comment.