Skip to content

Commit

Permalink
broadcast: remove duplicate namespaces
Browse files Browse the repository at this point in the history
* Fix an issue that could cause issues when broadcasting "coerced"
  configurations to multiple namespaces.
* Specifying the same namesapce multiple times doesn't make sense,
  we should strip duplicates earlier on in the process.
* Closes cylc#6334
  • Loading branch information
oliver-sanders committed Aug 28, 2024
1 parent 347921f commit eeb3205
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 6 deletions.
1 change: 1 addition & 0 deletions changes.d/6335.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix an issue that could cause broadcasts made to multiple namespaces to fail.
13 changes: 10 additions & 3 deletions cylc/flow/broadcast_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,20 +298,27 @@ def put_broadcast(
elif not bad_point:
if namespace not in self.broadcasts[point_string]:
self.broadcasts[point_string][namespace] = {}

# NOTE: validating the setting "coerces" it
# modifying the setting object so we must copy it
# see https://github.com/cylc/cylc-flow/issues/6334
_setting = deepcopy(setting)

# Keep saved/reported setting as workflow
# config format.
modified_settings.append(
(point_string, namespace, deepcopy(setting))
(point_string, namespace, _setting)
)

# Coerce setting to cylc runtime object,
# i.e. str to DurationFloat.
BroadcastConfigValidator().validate(
setting,
_setting,
SPEC['runtime']['__MANY__']
)
addict(
self.broadcasts[point_string][namespace],
setting
_setting
)

# Log the broadcast
Expand Down
7 changes: 5 additions & 2 deletions cylc/flow/scripts/broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,10 @@ async def run(options: 'Values', workflow_id):
"""Implement cylc broadcast."""
pclient = get_client(workflow_id, timeout=options.comms_timeout)

# remove any duplicate namespaces
# see https://github.com/cylc/cylc-flow/issues/6334
namespaces = list(set(options.namespaces))

ret: Dict[str, Any] = {
'stdout': [],
'stderr': [],
Expand All @@ -337,7 +341,7 @@ async def run(options: 'Values', workflow_id):
'wFlows': [workflow_id],
'bMode': 'Set',
'cPoints': options.point_strings,
'nSpaces': options.namespaces,
'nSpaces': namespaces,
'bSettings': options.settings,
'bCutoff': options.expire,
}
Expand Down Expand Up @@ -382,7 +386,6 @@ async def run(options: 'Values', workflow_id):
mutation_kwargs['variables']['bMode'] = 'Expire'

# implement namespace and cycle point defaults here
namespaces = options.namespaces
if not namespaces:
namespaces = ["root"]
point_strings = options.point_strings
Expand Down
48 changes: 47 additions & 1 deletion tests/integration/scripts/test_broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
BroadcastOptions = Options(get_option_parser())


async def test_broadcast_multi(
async def test_broadcast_multi_workflow(
one_conf,
flow,
scheduler,
Expand Down Expand Up @@ -77,3 +77,49 @@ async def test_broadcast_multi(
' settings are not compatible with the workflow'
) in out
assert err == ''


async def test_broadcast_multi_namespace(
flow,
scheduler,
start,
):
"""Test a multi-namespace broadcast command.
See https://github.com/cylc/cylc-flow/issues/6334
"""
id_ = flow(
{
'scheduling': {
'graph': {'R1': 'a & b & c & fin'},
},
'runtime': {
'root': {'execution time limit': 'PT1S'},
'VOWELS': {'execution time limit': 'PT2S'},
'CONSONANTS': {'execution time limit': 'PT3S'},
'a': {'inherit': 'VOWELS'},
'b': {'inherit': 'CONSONANTS'},
'c': {'inherit': 'CONSONANTS'},
},
}
)
schd = scheduler(id_)

async with start(schd):
# issue a broadcast to multiple namespaces
rets = await _main(
BroadcastOptions(
settings=['execution time limit = PT5S'],
namespaces=['root', 'VOWELS', 'CONSONANTS'],
),
schd.workflow,
)

# the broadcast should succeed
assert list(rets.values()) == [True]

# each task should now have the new "execution time limit"
for task in ['a', 'b', 'c', 'fin']:
assert schd.broadcast_mgr.get_broadcast(
schd.tokens.duplicate(cycle='1', task=task)
) == {'execution time limit': 5.0}

0 comments on commit eeb3205

Please sign in to comment.