Skip to content

Commit 17922fe

Browse files
broadcast: report invalid settings
* We already report invalid cycle points and namespaces. * However, we have been relying on client-side validation for settings (which doesn't apply to the GraphQL mutation). * This also raises the potential for inter-version compatibility issues going unreported. * This commit explicitly handles invalid settings in the same way as invalid cycle points and namespaces so that they are reported back to the user. * Closes the issue part of #6429. * Additionally: * This change also strips duplicate entries from broadcast reports. * And fixes the CLI options in broadcast report to match `cylc broadcast`.
1 parent f413ebb commit 17922fe

10 files changed

Lines changed: 282 additions & 93 deletions

File tree

changes.d/6574.fix.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Broadcast: Report any settings that are not compatible with the scheduler Cylc version.

cylc/flow/broadcast_mgr.py

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
fail_if_platform_and_host_conflict,
3838
PlatformLookupError,
3939
)
40-
40+
from cylc.flow.util import uniq
4141

4242
if TYPE_CHECKING:
4343
from cylc.flow.id import Tokens
@@ -151,7 +151,7 @@ def clear_broadcast(
151151
LOG.error(get_broadcast_bad_options_report(bad_options))
152152
if modified_settings:
153153
self.data_store_mgr.delta_broadcast()
154-
return modified_settings, bad_options
154+
return uniq(modified_settings), bad_options
155155

156156
def expire_broadcast(self, cutoff=None, **kwargs):
157157
"""Clear all broadcasts targeting cycle points earlier than cutoff."""
@@ -284,18 +284,24 @@ def put_broadcast(
284284
bad_options is as described in the docstring for self.clear().
285285
"""
286286
modified_settings = []
287-
bad_point_strings = []
288-
bad_namespaces = []
287+
bad_settings = []
288+
bad_point_strings = set()
289+
bad_namespaces = set()
289290

290291
with self.lock:
291292
for setting in settings or []:
292293
# Coerce setting to cylc runtime object,
293294
# i.e. str to DurationFloat.
294295
coerced_setting = deepcopy(setting)
295-
BroadcastConfigValidator().validate(
296-
coerced_setting,
297-
SPEC['runtime']['__MANY__'],
298-
)
296+
try:
297+
BroadcastConfigValidator().validate(
298+
coerced_setting,
299+
SPEC['runtime']['__MANY__'],
300+
)
301+
except Exception as exc:
302+
LOG.error(exc)
303+
bad_settings.append(setting)
304+
continue
299305

300306
# Skip and warn if a run mode is broadcast to a workflow
301307
# running in simulation or dummy mode.
@@ -308,22 +314,24 @@ def put_broadcast(
308314
f' running in {self.workflow_run_mode.value} mode'
309315
' will have no effect, and will not be actioned.'
310316
)
317+
bad_settings.append(setting)
311318
continue
312319

313320
for point_string in point_strings or []:
314321
# Standardise the point and check its validity.
315322
bad_point = False
316323
try:
317324
point_string = standardise_point_string(point_string)
325+
318326
except PointParsingError:
319327
if point_string != '*':
320-
bad_point_strings.append(point_string)
328+
bad_point_strings.add(point_string)
321329
bad_point = True
322330
if not bad_point and point_string not in self.broadcasts:
323331
self.broadcasts[point_string] = {}
324332
for namespace in namespaces or []:
325333
if namespace not in self.linearized_ancestors:
326-
bad_namespaces.append(namespace)
334+
bad_namespaces.add(namespace)
327335
elif not bad_point:
328336
# Check broadcast against config and against
329337
# existing broadcasts:
@@ -340,6 +348,7 @@ def put_broadcast(
340348
namespace,
341349
coerced_setting,
342350
):
351+
bad_settings.append(setting)
343352
continue
344353

345354
if namespace not in self.broadcasts[point_string]:
@@ -362,13 +371,15 @@ def put_broadcast(
362371
LOG.info(get_broadcast_change_report(modified_settings))
363372

364373
bad_options = {}
374+
if bad_settings:
375+
bad_options["settings"] = uniq(bad_settings)
365376
if bad_point_strings:
366-
bad_options["point_strings"] = bad_point_strings
377+
bad_options["point_strings"] = sorted(bad_point_strings)
367378
if bad_namespaces:
368-
bad_options["namespaces"] = bad_namespaces
379+
bad_options["namespaces"] = sorted(bad_namespaces)
369380
if modified_settings:
370381
self.data_store_mgr.delta_broadcast()
371-
return modified_settings, bad_options
382+
return uniq(modified_settings), bad_options
372383

373384
@staticmethod
374385
def bc_mixes_old_and_new_platform_settings(

cylc/flow/broadcast_report.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,13 @@
2626
CHANGE_TITLE_CANCEL = "Broadcast cancelled:"
2727
CHANGE_TITLE_SET = "Broadcast set:"
2828

29+
CLI_OPT_MAP = {
30+
# broadcast field: cli option
31+
'point_strings': 'point',
32+
'namespaces': 'namespace',
33+
'settings': 'set',
34+
}
35+
2936

3037
def get_broadcast_bad_options_report(bad_options, is_set=False):
3138
"""Return a string to report bad options for broadcast cancel/clear."""
@@ -48,7 +55,11 @@ def get_broadcast_bad_options_report(bad_options, is_set=False):
4855
value_str += val
4956
else:
5057
value_str = value
51-
msg += BAD_OPTIONS_FMT % (key, value_str)
58+
if isinstance(value, dict):
59+
value_str = ', '.join(
60+
f'{key}={val}' for key, val in value.items()
61+
)
62+
msg += BAD_OPTIONS_FMT % (CLI_OPT_MAP.get(key, key), value_str)
5263
return msg
5364

5465

cylc/flow/network/resolvers.py

Lines changed: 26 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
runtime_schema_to_cfg,
5656
sort_elements,
5757
)
58+
from cylc.flow.util import uniq, iter_uniq
5859

5960
if TYPE_CHECKING:
6061
from uuid import UUID
@@ -112,45 +113,6 @@ def collate_workflow_atts(workflow):
112113
}
113114

114115

115-
def uniq(iterable):
116-
"""Return a unique collection of the provided items preserving item order.
117-
118-
Useful for unhashable things like dicts, relies on __eq__ for testing
119-
equality.
120-
121-
Examples:
122-
>>> uniq([1, 1, 2, 3, 5, 8, 1])
123-
[1, 2, 3, 5, 8]
124-
125-
"""
126-
ret = []
127-
for item in iterable:
128-
if item not in ret:
129-
ret.append(item)
130-
return ret
131-
132-
133-
def iter_uniq(iterable):
134-
"""Iterate over an iterable omitting any duplicate entries.
135-
136-
Useful for unhashable things like dicts, relies on __eq__ for testing
137-
equality.
138-
139-
Note:
140-
More efficient than "uniq" for iteration use cases.
141-
142-
Examples:
143-
>>> list(iter_uniq([1, 1, 2, 3, 5, 8, 1]))
144-
[1, 2, 3, 5, 8]
145-
146-
"""
147-
cache = set()
148-
for item in iterable:
149-
if item not in cache:
150-
cache.add(item)
151-
yield item
152-
153-
154116
def workflow_ids_filter(workflow_tokens, items) -> bool:
155117
"""Match id arguments with workflow attributes.
156118
@@ -787,24 +749,31 @@ def broadcast(
787749
cutoff: Any = None
788750
):
789751
"""Put or clear broadcasts."""
790-
if settings is not None:
791-
# Convert schema field names to workflow config setting names if
792-
# applicable:
793-
for i, dict_ in enumerate(settings):
794-
settings[i] = runtime_schema_to_cfg(dict_)
795-
796-
if mode == 'put_broadcast':
797-
return self.schd.task_events_mgr.broadcast_mgr.put_broadcast(
798-
cycle_points, namespaces, settings)
799-
if mode == 'clear_broadcast':
800-
return self.schd.task_events_mgr.broadcast_mgr.clear_broadcast(
801-
point_strings=cycle_points,
802-
namespaces=namespaces,
803-
cancel_settings=settings)
804-
if mode == 'expire_broadcast':
805-
return self.schd.task_events_mgr.broadcast_mgr.expire_broadcast(
806-
cutoff)
807-
raise ValueError(f"Unsupported broadcast mode: '{mode}'")
752+
try:
753+
if settings is not None:
754+
# Convert schema field names to workflow config setting names
755+
# if applicable:
756+
for i, dict_ in enumerate(settings):
757+
settings[i] = runtime_schema_to_cfg(dict_)
758+
759+
if mode == 'put_broadcast':
760+
return self.schd.task_events_mgr.broadcast_mgr.put_broadcast(
761+
cycle_points, namespaces, settings)
762+
if mode == 'clear_broadcast':
763+
return self.schd.task_events_mgr.broadcast_mgr.clear_broadcast(
764+
point_strings=cycle_points,
765+
namespaces=namespaces,
766+
cancel_settings=settings)
767+
if mode == 'expire_broadcast':
768+
return (
769+
self.schd.task_events_mgr.broadcast_mgr.expire_broadcast(
770+
cutoff
771+
)
772+
)
773+
raise ValueError(f'Unsupported broadcast mode: {mode}')
774+
except Exception as exc:
775+
LOG.error(exc)
776+
return {'error': {'message': str(exc)}}
808777

809778
def put_ext_trigger(
810779
self,

cylc/flow/network/schema.py

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1795,17 +1795,6 @@ class Arguments:
17951795
)
17961796
)
17971797

1798-
# TODO: work out how to implement this feature, it needs to be
1799-
# handled client-side which makes it slightly awkward in
1800-
# api-on-the-fly land
1801-
1802-
# files = graphene.List(
1803-
# String,
1804-
# description=sstrip('''
1805-
# File with config to broadcast. Can be used multiple times
1806-
# ''')
1807-
# )
1808-
18091798
result = GenericScalar()
18101799

18111800

cylc/flow/util.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,45 @@
4545
_NAT_SORT_SPLIT = re.compile(r'([\d\.]+)')
4646

4747

48+
def uniq(iterable):
49+
"""Return a unique collection of the provided items preserving item order.
50+
51+
Useful for unhashable things like dicts, relies on __eq__ for testing
52+
equality.
53+
54+
Examples:
55+
>>> uniq([1, 1, 2, 3, 5, 8, 1])
56+
[1, 2, 3, 5, 8]
57+
58+
"""
59+
ret = []
60+
for item in iterable:
61+
if item not in ret:
62+
ret.append(item)
63+
return ret
64+
65+
66+
def iter_uniq(iterable):
67+
"""Iterate over an iterable omitting any duplicate entries.
68+
69+
Useful for unhashable things like dicts, relies on __eq__ for testing
70+
equality.
71+
72+
Note:
73+
More efficient than "uniq" for iteration use cases.
74+
75+
Examples:
76+
>>> list(iter_uniq([1, 1, 2, 3, 5, 8, 1]))
77+
[1, 2, 3, 5, 8]
78+
79+
"""
80+
cache = set()
81+
for item in iterable:
82+
if item not in cache:
83+
cache.add(item)
84+
yield item
85+
86+
4887
def sstrip(text):
4988
"""Simple function to dedent and strip text.
5089
Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
ERROR: No broadcast to cancel/clear for these options:
22
--cancel=[environment]BCAST
3-
--namespaces=m4
4-
--point_strings=20100809T00
3+
--namespace=m4
4+
--point=20100809T00
55
ERROR: No broadcast to cancel/clear for these options:
66
--cancel=[environment]BCAST
7-
--namespaces=m5
8-
--point_strings=*
7+
--namespace=m5
8+
--point=*
99
ERROR: No broadcast to cancel/clear for these options:
10-
--namespaces=m6
10+
--namespace=m6
1111
ERROR: No broadcast to cancel/clear for these options:
12-
--namespaces=m7
12+
--namespace=m7
1313
ERROR: No broadcast to cancel/clear for these options:
14-
--namespaces=ENS3
14+
--namespace=ENS3

tests/integration/run_modes/test_simulation.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -424,10 +424,12 @@ async def test_settings_broadcast(
424424

425425
# Assert that setting run mode on a simulation mode task fails with
426426
# warning:
427-
schd.broadcast_mgr.put_broadcast(
427+
good, bad = schd.broadcast_mgr.put_broadcast(
428428
['1066'], ['one'], [{
429429
'run mode': 'live',
430430
}])
431+
assert good == []
432+
assert bad == {'settings': [{'run mode': 'live'}]}
431433
record = log_filter(contains='will not be actioned')[0]
432434
assert record[0] == logging.WARNING
433435
assert 'run mode' not in schd.broadcast_mgr.broadcasts

0 commit comments

Comments
 (0)