From eeb320564c3e9b08b5cc7bc532dc756ac26ab7ca Mon Sep 17 00:00:00 2001 From: Oliver Sanders Date: Wed, 28 Aug 2024 10:39:08 +0100 Subject: [PATCH] broadcast: remove duplicate namespaces * Fix an issue that could cause issues when broadcasting "coerced" configurations to multiple namespaces. * Specifying the same namesapce multiple times doesn't make sense, we should strip duplicates earlier on in the process. * Closes #6334 --- changes.d/6335.fix.md | 1 + cylc/flow/broadcast_mgr.py | 13 ++++-- cylc/flow/scripts/broadcast.py | 7 ++- tests/integration/scripts/test_broadcast.py | 48 ++++++++++++++++++++- 4 files changed, 63 insertions(+), 6 deletions(-) create mode 100644 changes.d/6335.fix.md diff --git a/changes.d/6335.fix.md b/changes.d/6335.fix.md new file mode 100644 index 00000000000..2057bae0f7c --- /dev/null +++ b/changes.d/6335.fix.md @@ -0,0 +1 @@ +Fix an issue that could cause broadcasts made to multiple namespaces to fail. diff --git a/cylc/flow/broadcast_mgr.py b/cylc/flow/broadcast_mgr.py index 6cd007ee25a..29379fd8420 100644 --- a/cylc/flow/broadcast_mgr.py +++ b/cylc/flow/broadcast_mgr.py @@ -298,20 +298,27 @@ def put_broadcast( elif not bad_point: if namespace not in self.broadcasts[point_string]: self.broadcasts[point_string][namespace] = {} + + # NOTE: validating the setting "coerces" it + # modifying the setting object so we must copy it + # see https://github.com/cylc/cylc-flow/issues/6334 + _setting = deepcopy(setting) + # Keep saved/reported setting as workflow # config format. modified_settings.append( - (point_string, namespace, deepcopy(setting)) + (point_string, namespace, _setting) ) + # Coerce setting to cylc runtime object, # i.e. str to DurationFloat. BroadcastConfigValidator().validate( - setting, + _setting, SPEC['runtime']['__MANY__'] ) addict( self.broadcasts[point_string][namespace], - setting + _setting ) # Log the broadcast diff --git a/cylc/flow/scripts/broadcast.py b/cylc/flow/scripts/broadcast.py index c7e5d2a4f3b..2cf068309b7 100755 --- a/cylc/flow/scripts/broadcast.py +++ b/cylc/flow/scripts/broadcast.py @@ -325,6 +325,10 @@ async def run(options: 'Values', workflow_id): """Implement cylc broadcast.""" pclient = get_client(workflow_id, timeout=options.comms_timeout) + # remove any duplicate namespaces + # see https://github.com/cylc/cylc-flow/issues/6334 + namespaces = list(set(options.namespaces)) + ret: Dict[str, Any] = { 'stdout': [], 'stderr': [], @@ -337,7 +341,7 @@ async def run(options: 'Values', workflow_id): 'wFlows': [workflow_id], 'bMode': 'Set', 'cPoints': options.point_strings, - 'nSpaces': options.namespaces, + 'nSpaces': namespaces, 'bSettings': options.settings, 'bCutoff': options.expire, } @@ -382,7 +386,6 @@ async def run(options: 'Values', workflow_id): mutation_kwargs['variables']['bMode'] = 'Expire' # implement namespace and cycle point defaults here - namespaces = options.namespaces if not namespaces: namespaces = ["root"] point_strings = options.point_strings diff --git a/tests/integration/scripts/test_broadcast.py b/tests/integration/scripts/test_broadcast.py index 67a79448041..cad23cdf386 100644 --- a/tests/integration/scripts/test_broadcast.py +++ b/tests/integration/scripts/test_broadcast.py @@ -21,7 +21,7 @@ BroadcastOptions = Options(get_option_parser()) -async def test_broadcast_multi( +async def test_broadcast_multi_workflow( one_conf, flow, scheduler, @@ -77,3 +77,49 @@ async def test_broadcast_multi( ' settings are not compatible with the workflow' ) in out assert err == '' + + +async def test_broadcast_multi_namespace( + flow, + scheduler, + start, +): + """Test a multi-namespace broadcast command. + + See https://github.com/cylc/cylc-flow/issues/6334 + """ + id_ = flow( + { + 'scheduling': { + 'graph': {'R1': 'a & b & c & fin'}, + }, + 'runtime': { + 'root': {'execution time limit': 'PT1S'}, + 'VOWELS': {'execution time limit': 'PT2S'}, + 'CONSONANTS': {'execution time limit': 'PT3S'}, + 'a': {'inherit': 'VOWELS'}, + 'b': {'inherit': 'CONSONANTS'}, + 'c': {'inherit': 'CONSONANTS'}, + }, + } + ) + schd = scheduler(id_) + + async with start(schd): + # issue a broadcast to multiple namespaces + rets = await _main( + BroadcastOptions( + settings=['execution time limit = PT5S'], + namespaces=['root', 'VOWELS', 'CONSONANTS'], + ), + schd.workflow, + ) + + # the broadcast should succeed + assert list(rets.values()) == [True] + + # each task should now have the new "execution time limit" + for task in ['a', 'b', 'c', 'fin']: + assert schd.broadcast_mgr.get_broadcast( + schd.tokens.duplicate(cycle='1', task=task) + ) == {'execution time limit': 5.0}