Skip to content

Commit c6cc2f4

Browse files
committed
UIS subscription changes
1 parent b83eb6c commit c6cc2f4

File tree

9 files changed

+116
-76
lines changed

9 files changed

+116
-76
lines changed

conda-environment.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ dependencies:
55
- ansimarkup >=1.0.0
66
- async-timeout>=3.0.0 # [py<3.11]
77
- colorama >=0.4,<1.0
8-
- graphene >=3.4.3,<4
8+
- graphene ==3.4.3
99
- graphviz # for static graphing
1010
# Note: can't pin jinja2 any higher than this until we give up on Cylc 7 back-compat
1111
- jinja2 >=3.0,<3.1

cylc/flow/network/graphql.py

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
visit,
3535
get_argument_values,
3636
get_named_type,
37-
introspection_types,
37+
is_introspection_type,
3838
)
3939
from graphql.pyutils import AwaitableOrValue, is_awaitable
4040

@@ -47,10 +47,6 @@
4747
NULL_VALUE = None
4848
EMPTY_VALUES: Tuple[list, dict] = ([], {})
4949
STRIP_OPS = {'query', 'subscription'}
50-
INTROSPECTS = {
51-
k.lower()
52-
for k in introspection_types
53-
}
5450

5551
T = TypeVar("T")
5652
U = TypeVar("U")
@@ -82,6 +78,17 @@ def grow_tree(tree, path, leaves=None):
8278
tree_loc[len(path) % 2].update({'leaves': leaves})
8379

8480

81+
def execution_result_to_dict(execution_result):
82+
result = {}
83+
if execution_result.data:
84+
result["data"] = execution_result.data
85+
if execution_result.errors:
86+
result["errors"] = [
87+
error.formatted for error in execution_result.errors
88+
]
89+
return result
90+
91+
8592
def instantiate_middleware(middlewares):
8693
"""Take iterable of middlewares and instantiate.
8794
@@ -154,21 +161,11 @@ def strip_null(data):
154161
return data
155162

156163

157-
def attr_strip_null(result):
158-
"""Work on the attribute/data of ExecutionResult if present."""
159-
if hasattr(result, 'data'):
160-
result.data = strip_null(result.data)
161-
return result
162-
return strip_null(result)
163-
164-
165164
def null_stripper(exe_result):
166165
"""Strip nulls in accordance with type of execution result."""
167166
if is_awaitable(exe_result):
168-
return async_next(attr_strip_null, exe_result)
169-
if getattr(exe_result, 'errors', None) is None:
170-
return attr_strip_null(exe_result)
171-
return exe_result
167+
return async_next(strip_null, exe_result)
168+
return strip_null(exe_result)
172169

173170

174171
class CylcVisitor(Visitor):
@@ -260,12 +257,13 @@ def __init__(self):
260257
def resolve(self, next_, root, info, **args):
261258
"""Middleware resolver; handles field according to operation."""
262259
# GraphiQL introspection is 'query' but not async
263-
if INTROSPECTS.intersection({f'{p}' for p in info.path.as_list()}):
260+
if is_introspection_type(get_named_type(info.return_type)):
264261
return next_(root, info, **args)
265262

266263
if info.operation.operation.value in STRIP_OPS:
267264
path_list = info.path.as_list()
268265
path_string = f'{path_list}'
266+
parent_path_string = f'{path_list[:-1:]}'
269267
# Needed for child fields that resolve without args.
270268
# Store arguments of parents as leaves of schema tree from path
271269
# to respective field.
@@ -301,7 +299,6 @@ def resolve(self, next_, root, info, **args):
301299
):
302300

303301
# Gather fields set in root
304-
parent_path_string = f'{path_list[:-1:]}'
305302
stamp = getattr(root, 'stamp', '')
306303
if (
307304
parent_path_string not in self.field_sets

cylc/flow/network/resolvers.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -557,8 +557,9 @@ async def subscribe_delta(
557557
# NOTE: we don't expect workflows to be returned in definition order
558558
# so it is ok to use `set` here
559559
workflow_ids = set(args.get('workflows', args.get('ids', ())))
560+
560561
sub_id = uuid4()
561-
info.variable_values['backend_sub_id'] = sub_id
562+
info.context['sub_id'] = sub_id
562563
self.delta_store[sub_id] = {}
563564

564565
op_id = root

cylc/flow/network/schema.py

Lines changed: 68 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@
4848
)
4949
from graphene.types.generic import GenericScalar
5050
from graphene.utils.str_converters import to_snake_case
51+
from graphene.types.schema import identity_resolve
52+
5153
from graphql.type.definition import get_named_type
5254

5355
from cylc.flow import LOG_LEVELS
@@ -70,8 +72,7 @@
7072
FLOW_NONE,
7173
)
7274
from cylc.flow.id import Tokens
73-
from cylc.flow.run_modes import (
74-
TASK_CONFIG_RUN_MODES, WORKFLOW_RUN_MODES, RunMode)
75+
from cylc.flow.run_modes import RunMode
7576
from cylc.flow.task_outputs import SORT_ORDERS
7677
from cylc.flow.task_state import (
7778
TASK_STATUS_DESC,
@@ -309,8 +310,8 @@ def process_resolver_info(
309310
"""Set and gather info for resolver."""
310311
# Add the subscription id to the resolver context
311312
# to know which delta-store to use."""
312-
if 'backend_sub_id' in info.variable_values:
313-
args['sub_id'] = info.variable_values['backend_sub_id']
313+
if 'sub_id' in info.context:
314+
args['sub_id'] = info.context['sub_id']
314315

315316
field_name: str = to_snake_case(info.field_name)
316317
# root is the parent data object.
@@ -620,19 +621,18 @@ class Meta:
620621
string_extended = String()
621622

622623

623-
# The run mode for the workflow.
624-
WorkflowRunMode = graphene.Enum(
625-
'WorkflowRunMode',
626-
[(m.capitalize(), m) for m in WORKFLOW_RUN_MODES],
627-
description=lambda x: RunMode(x.value).describe() if x else None,
628-
)
624+
class TaskRunMode(graphene.Enum):
625+
"""The mode used to run a task."""
629626

630-
# The run mode for the task.
631-
TaskRunMode = graphene.Enum(
632-
'TaskRunMode',
633-
[(m.capitalize(), m) for m in TASK_CONFIG_RUN_MODES],
634-
description=lambda x: RunMode(x.value).describe() if x else None,
635-
)
627+
# NOTE: using a different enum because:
628+
# * We only want to offer a subset of run modes (REQUEST_* only).
629+
630+
Live = cast('Enum', RunMode.LIVE) # type: graphene.Enum
631+
Skip = cast('Enum', RunMode.SKIP) # type: graphene.Enum
632+
633+
@property
634+
def description(self):
635+
return RunMode(self.value).describe()
636636

637637

638638
class Workflow(ObjectType):
@@ -862,7 +862,7 @@ class Meta:
862862
directives = graphene.List(RuntimeSetting, resolver=resolve_json_dump)
863863
environment = graphene.List(RuntimeSetting, resolver=resolve_json_dump)
864864
outputs = graphene.List(RuntimeSetting, resolver=resolve_json_dump)
865-
run_mode = TaskRunMode(default_value=TaskRunMode.Live.name)
865+
run_mode = TaskRunMode(default_value=TaskRunMode.Live)
866866

867867

868868
RUNTIME_FIELD_TO_CFG_MAP = {
@@ -1694,6 +1694,21 @@ class TimePoint(String):
16941694
)
16951695

16961696

1697+
class WorkflowRunMode(graphene.Enum):
1698+
"""The mode used to run a workflow."""
1699+
1700+
# NOTE: using a different enum because:
1701+
# * We only want to offer a subset of run modes (REQUEST_* only).
1702+
1703+
Live = cast('Enum', RunMode.LIVE) # type: graphene.Enum
1704+
Dummy = cast('Enum', RunMode.DUMMY) # type: graphene.Enum
1705+
Simulation = cast('Enum', RunMode.SIMULATION) # type: graphene.Enum
1706+
1707+
@property
1708+
def description(self):
1709+
return RunMode(self.value).describe()
1710+
1711+
16971712
class WorkflowStopMode(graphene.Enum):
16981713
"""The mode used to stop a running workflow."""
16991714

@@ -1984,7 +1999,7 @@ class Meta:
19841999
submitted immediately if the workflow is restarted.
19852000
Remaining task event handlers, job poll and kill commands, will
19862001
be executed prior to shutdown, unless
1987-
the stop mode is `{WorkflowStopMode.Now.name}`.
2002+
the stop mode is `{WorkflowStopMode.Now}`.
19882003
19892004
Valid for: paused, running, stopping workflows.
19902005
''')
@@ -2509,6 +2524,27 @@ class Meta:
25092524
)
25102525

25112526

2527+
# TODO: Change to use subscribe arg/default graphql-core has a subscribe field
2528+
# for both Meta and Field, graphene at v3.4.3 does not.. As a workaround
2529+
# the subscribe function is looked up via the following mapping:
2530+
SUB_RESOLVER_MAPPING = {
2531+
'deltas': delta_subs,
2532+
'workflows': delta_subs,
2533+
'job': delta_subs,
2534+
'jobs': delta_subs,
2535+
'task': delta_subs,
2536+
'tasks': delta_subs,
2537+
'taskProxy': delta_subs,
2538+
'taskProxies': delta_subs,
2539+
'family': delta_subs,
2540+
'families': delta_subs,
2541+
'familyProxy': delta_subs,
2542+
'familyProxies': delta_subs,
2543+
'edges': delta_subs,
2544+
'nodesEdges': delta_subs,
2545+
}
2546+
2547+
25122548
class Subscriptions(ObjectType):
25132549
"""Defines the subscriptions available in the schema."""
25142550
class Meta:
@@ -2523,7 +2559,7 @@ class Meta:
25232559
strip_null=Boolean(default_value=False),
25242560
initial_burst=Boolean(default_value=True),
25252561
ignore_interval=Float(default_value=0.0),
2526-
resolver=delta_subs
2562+
resolver=identity_resolve
25272563
)
25282564
workflows = graphene.List(
25292565
Workflow,
@@ -2536,7 +2572,7 @@ class Meta:
25362572
delta_type=String(default_value=DELTA_ADDED),
25372573
initial_burst=Boolean(default_value=True),
25382574
ignore_interval=Float(default_value=2.5),
2539-
resolver=delta_subs
2575+
resolver=identity_resolve
25402576
)
25412577
job = Field(
25422578
Job,
@@ -2547,7 +2583,7 @@ class Meta:
25472583
delta_type=String(default_value=DELTA_ADDED),
25482584
initial_burst=Boolean(default_value=True),
25492585
ignore_interval=Float(default_value=0.0),
2550-
resolver=delta_subs
2586+
resolver=identity_resolve
25512587
)
25522588
jobs = graphene.List(
25532589
Job,
@@ -2558,7 +2594,7 @@ class Meta:
25582594
delta_type=String(default_value=DELTA_ADDED),
25592595
initial_burst=Boolean(default_value=True),
25602596
ignore_interval=Float(default_value=0.0),
2561-
resolver=delta_subs
2597+
resolver=identity_resolve
25622598
)
25632599
task = Field(
25642600
Task,
@@ -2569,7 +2605,7 @@ class Meta:
25692605
delta_type=String(default_value=DELTA_ADDED),
25702606
initial_burst=Boolean(default_value=True),
25712607
ignore_interval=Float(default_value=0.0),
2572-
resolver=delta_subs
2608+
resolver=identity_resolve
25732609
)
25742610
tasks = graphene.List(
25752611
Task,
@@ -2580,7 +2616,7 @@ class Meta:
25802616
delta_type=String(default_value=DELTA_ADDED),
25812617
initial_burst=Boolean(default_value=True),
25822618
ignore_interval=Float(default_value=0.0),
2583-
resolver=delta_subs
2619+
resolver=identity_resolve
25842620
)
25852621
task_proxy = Field(
25862622
TaskProxy,
@@ -2591,7 +2627,7 @@ class Meta:
25912627
delta_type=String(default_value=DELTA_ADDED),
25922628
initial_burst=Boolean(default_value=True),
25932629
ignore_interval=Float(default_value=0.0),
2594-
resolver=delta_subs
2630+
resolver=identity_resolve
25952631
)
25962632
task_proxies = graphene.List(
25972633
TaskProxy,
@@ -2602,7 +2638,7 @@ class Meta:
26022638
delta_type=String(default_value=DELTA_ADDED),
26032639
initial_burst=Boolean(default_value=True),
26042640
ignore_interval=Float(default_value=0.0),
2605-
resolver=delta_subs
2641+
resolver=identity_resolve
26062642
)
26072643
family = Field(
26082644
Family,
@@ -2613,7 +2649,7 @@ class Meta:
26132649
delta_type=String(default_value=DELTA_ADDED),
26142650
initial_burst=Boolean(default_value=True),
26152651
ignore_interval=Float(default_value=0.0),
2616-
resolver=delta_subs
2652+
resolver=identity_resolve
26172653
)
26182654
families = graphene.List(
26192655
Family,
@@ -2624,7 +2660,7 @@ class Meta:
26242660
delta_type=String(default_value=DELTA_ADDED),
26252661
initial_burst=Boolean(default_value=True),
26262662
ignore_interval=Float(default_value=0.0),
2627-
resolver=delta_subs
2663+
resolver=identity_resolve
26282664
)
26292665
family_proxy = Field(
26302666
FamilyProxy,
@@ -2635,7 +2671,7 @@ class Meta:
26352671
delta_type=String(default_value=DELTA_ADDED),
26362672
initial_burst=Boolean(default_value=True),
26372673
ignore_interval=Float(default_value=0.0),
2638-
resolver=delta_subs
2674+
resolver=identity_resolve
26392675
)
26402676
family_proxies = graphene.List(
26412677
FamilyProxy,
@@ -2646,7 +2682,7 @@ class Meta:
26462682
delta_type=String(default_value=DELTA_ADDED),
26472683
initial_burst=Boolean(default_value=True),
26482684
ignore_interval=Float(default_value=0.0),
2649-
resolver=delta_subs
2685+
resolver=identity_resolve
26502686
)
26512687
edges = graphene.List(
26522688
Edge,
@@ -2657,7 +2693,7 @@ class Meta:
26572693
delta_type=String(default_value=DELTA_ADDED),
26582694
initial_burst=Boolean(default_value=True),
26592695
ignore_interval=Float(default_value=0.0),
2660-
resolver=delta_subs
2696+
resolver=identity_resolve
26612697
)
26622698
nodes_edges = Field(
26632699
NodesEdges,
@@ -2668,7 +2704,7 @@ class Meta:
26682704
delta_type=String(default_value=DELTA_ADDED),
26692705
initial_burst=Boolean(default_value=True),
26702706
ignore_interval=Float(default_value=0.0),
2671-
resolver=delta_subs
2707+
resolver=identity_resolve
26722708
)
26732709

26742710

cylc/flow/network/server.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@
4545
from cylc.flow.network.graphql import (
4646
CylcExecutionContext,
4747
IgnoreFieldMiddleware,
48-
instantiate_middleware
48+
execution_result_to_dict,
49+
instantiate_middleware,
4950
)
5051
from cylc.flow.network.publisher import WorkflowPublisher
5152
from cylc.flow.network.replier import WorkflowReplier
@@ -414,14 +415,15 @@ def graphql(
414415
execution_context_class=CylcExecutionContext,
415416
)
416417
if is_awaitable(executed):
417-
result = self.loop.run_until_complete(executed)
418-
if result.errors:
419-
for error in result.errors:
418+
executed = self.loop.run_until_complete(executed)
419+
result = execution_result_to_dict(executed)
420+
if result.get('errors'):
421+
# If there are execution errors log and return the errors,
422+
# don't raise them and end the server over a bad query.
423+
for error in result['errors']:
420424
LOG.warning(f"GraphQL: {error}")
421-
# If there are execution errors, it means there was an unexpected
422-
# error, so fail the command.
423-
raise Exception(*result.errors)
424-
return result.data
425+
return result['errors']
426+
return result.get('data')
425427

426428
# UIServer Data Commands
427429
@expose

0 commit comments

Comments
 (0)