Skip to content

Commit 42a61e2

Browse files
authored
Merge pull request #185 from lsst-dm/tickets/DM-51505/v2_nodes_edges
DM-51505 : Implement v2 Node and Edge routes
2 parents bcc2a96 + 99b7b13 commit 42a61e2

14 files changed

Lines changed: 1366 additions & 86 deletions

File tree

src/lsst/cmservice/common/graph.py

Lines changed: 155 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
1-
from collections.abc import Mapping, Sequence
1+
from collections.abc import Iterable, Mapping, MutableSet, Sequence
2+
from typing import Literal
23

34
import networkx as nx
5+
from sqlalchemy import select
46

57
from ..db import Script, ScriptDependency, Step, StepDependency
8+
from ..db.campaigns_v2 import Edge, Node
69
from ..parsing.string import parse_element_fullname
710
from .types import AnyAsyncSession
811

@@ -35,8 +38,159 @@ async def graph_from_edge_list(
3538
return g
3639

3740

41+
async def graph_from_edge_list_v2(
42+
edges: Sequence[Edge],
43+
session: AnyAsyncSession,
44+
node_type: type[Node] = Node,
45+
node_view: Literal["simple", "model"] = "model",
46+
) -> nx.DiGraph:
47+
"""Given a sequence of Edges, create a directed graph for these
48+
edges with nodes derived from database lookups of the related objects.
49+
50+
Parameters
51+
----------
52+
edges: Sequence[Edge]
53+
The list of edges forming the graph
54+
55+
node_type: type
56+
The pydantic or sqlmodel class representing the graph node model
57+
58+
node_view: "simple" or "model"
59+
Whether the node metadata in the graph should be simplified (dict) or
60+
using the full expunged model form.
61+
62+
session
63+
An async database session
64+
"""
65+
g = nx.DiGraph()
66+
g.add_edges_from([(e.source, e.target) for e in edges])
67+
relabel_mapping = {}
68+
69+
# The graph understands the nodes in terms of the IDs used in the edges,
70+
# but we want to hydrate the entire Node model for subsequent users of this
71+
# graph to reference without dipping back to the Database.
72+
for node in g.nodes:
73+
s = select(Node).where(Node.id == node)
74+
db_node: Node = (await session.execute(s)).scalars().one()
75+
76+
# This Node is going on an adventure where it does not need to drag its
77+
# SQLAlchemy baggage along, so we expunge it from the session before
78+
# adding it to the graph.
79+
session.expunge(db_node)
80+
if node_view == "simple":
81+
# for the simple node view, the goal is to minimize the amount of
82+
# data attached to the node and ensure that this data is json-
83+
# serializable and otherwise appropriate for an API response
84+
g.nodes[node]["id"] = str(db_node.id)
85+
g.nodes[node]["status"] = db_node.status.name
86+
g.nodes[node]["kind"] = db_node.kind.name
87+
relabel_mapping[node] = db_node.name
88+
else:
89+
g.nodes[node]["model"] = db_node
90+
91+
if relabel_mapping:
92+
g = nx.relabel_nodes(g, mapping=relabel_mapping, copy=False)
93+
94+
# TODO validate graph now raise exception, or leave it to the caller?
95+
return g
96+
97+
3898
def graph_to_dict(g: nx.DiGraph) -> Mapping:
3999
"""Renders a networkx directed graph to a mapping format suitable for JSON
40100
serialization.
101+
102+
Notes
103+
-----
104+
The "edges" attribute name in the node link data is "edges" instead of the
105+
default "links".
41106
"""
42107
return nx.node_link_data(g, edges="edges")
108+
109+
110+
def validate_graph(g: nx.DiGraph, source: str = "START", sink: str = "END") -> bool:
111+
"""Validates a graph by asserting by traversal that a complete and correct
112+
path exists between `source` and `sink` nodes.
113+
114+
"Correct" means that there are no cycles or isolate nodes (nodes with
115+
degree 0) and no nodes with degree 1.
116+
"""
117+
try:
118+
# Test that G is a directed graph with no cycles
119+
is_valid = nx.is_directed_acyclic_graph(g)
120+
assert is_valid
121+
122+
# And that any path from source to sink exists
123+
is_valid = nx.has_path(g, source, sink)
124+
assert is_valid
125+
126+
# Guard against bad graphs where START and/or END have been connected
127+
# such that they are no longer the only source and sink
128+
...
129+
130+
# Test that there are no isolated Nodes in the graph. A node becomes
131+
# isolated if it was involved with an edge that has been removed from
132+
# G with no replacement edge added, in which case the node should also
133+
# be removed.
134+
is_valid = nx.number_of_isolates(g) == 0
135+
assert is_valid
136+
137+
# TODO Given the set of nodes in the graph, consider all paths in G
138+
# from source to sink, making sure every node appears in a path?
139+
140+
# Every node in G that is not the START/END node must have a degree
141+
# of at least 2 (one inbound and one outbound edge). If G has any
142+
# node with a degree of 1, it cannot be considered valid.
143+
g_degree_view: Iterable = nx.degree(g, (n for n in g.nodes if n not in [source, sink]))
144+
is_valid = min([d[1] for d in g_degree_view]) > 1
145+
assert is_valid
146+
except (nx.exception.NodeNotFound, AssertionError):
147+
return False
148+
return True
149+
150+
151+
def processable_graph_nodes(g: nx.DiGraph) -> Iterable[Node]:
152+
"""Traverse the graph G and produce an iterator of any nodes that are
153+
candidates for processing, i.e., their status is waiting/prepared/running
154+
and their ancestors are complete/successful. Graph nodes in a failed state
155+
will block the graph and prevent candidacy for subsequent nodes.
156+
157+
Yields
158+
------
159+
`lsst.cmservice.db.campaigns_v2.Node`
160+
A Node ORM object that has been ``expunge``d from its ``Session``.
161+
162+
Notes
163+
-----
164+
This function operates only on valid graphs (see `validate_graph()`) that
165+
have been built by the `graph_from_edge_list_v2()` function, where each
166+
graph-node is decorated with a "model" attribute referring to an expunged
167+
instance of ``Node``. This ``Node`` can be ``add``ed back to a ``Session``
168+
and manipulated in the usual way.
169+
"""
170+
processable_nodes: MutableSet[Node] = set()
171+
172+
# A valid campaign graph will have only one source (START) with in_degree 0
173+
# and only one sink (END) with out_degree 0
174+
source = next(v for v, d in g.in_degree() if d == 0)
175+
sink = next(v for v, d in g.out_degree() if d == 0)
176+
177+
# For each path through the graph, evaluate the state of nodes to determine
178+
# which nodes are up for processing. When there are multiple paths, we have
179+
# parallelization and common ancestors may be evaluated more than once,
180+
# which is an exercise in optimization left as a TODO
181+
for path in nx.all_simple_paths(g, source, sink):
182+
for n in path:
183+
node: Node = g.nodes[n]["model"]
184+
if node.status.is_processable_element():
185+
processable_nodes.add(node)
186+
# We found a processable node in this path, stop traversal
187+
break
188+
elif node.status.is_bad():
189+
# We reached a failed node in this path, it is blocked
190+
break
191+
else:
192+
# This node must be in a "successful" terminal state
193+
continue
194+
195+
# the inspection should stop when there are no more nodes to check
196+
yield from processable_nodes

src/lsst/cmservice/common/jsonpatch.py

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
"""
44

55
import operator
6-
from collections.abc import MutableMapping, MutableSequence
6+
from collections.abc import Mapping, MutableMapping, MutableSequence
77
from functools import reduce
88
from typing import TYPE_CHECKING, Any, Literal
99

@@ -51,7 +51,7 @@ def apply_json_patch[T: MutableMapping](op: JSONPatch, o: T) -> T:
5151
numeric, e.g., {"1": "first", "2": "second"}
5252
- Unsupported: JSON pointer values that refer to an entire object, e.g.,
5353
"" -- the JSON Patch must have a root element ("/") per the model.
54-
- Unsupported: JSON pointer values taht refer to a nameless object, e.g.,
54+
- Unsupported: JSON pointer values that refer to a nameless object, e.g.,
5555
"/" -- JSON allows object keys to be the empty string ("") but this is
5656
disallowed by the application.
5757
"""
@@ -222,3 +222,31 @@ def apply_json_patch[T: MutableMapping](op: JSONPatch, o: T) -> T:
222222
raise JSONPatchError(f"Unknown JSON Patch operation: {op.op}")
223223

224224
return o
225+
226+
227+
def apply_json_merge[T: MutableMapping](patch: Any, o: T) -> T:
228+
"""Applies a patch to a mapping object as per the RFC7396 JSON Merge Patch.
229+
230+
Notably, this operation may only target a ``MutableMapping`` as an analogue
231+
of a JSON object. This means that any keyed value in a Mapping may be
232+
replaced, added, or removed by a JSON Merge. This is not appropriate for
233+
patches that need to perform more tactical updates, such as modifying
234+
elements of a ``Sequence``.
235+
236+
This function does not allow setting a field value in the target to `None`;
237+
instead, any `None` value in a patch is an instruction to remove that
238+
field from the target completely.
239+
240+
This function differs from the RFC in the following ways: it will not
241+
replace the entire target object with a new mapping (i.e., the target must
242+
be a Mapping).
243+
"""
244+
if isinstance(patch, Mapping):
245+
for k, v in patch.items():
246+
if v is None:
247+
_ = o.pop(k, None)
248+
else:
249+
o[k] = apply_json_merge(v, o.get(k, {}))
250+
return o
251+
else:
252+
return patch

src/lsst/cmservice/db/campaigns_v2.py

Lines changed: 24 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
"""ORM Models for v2 tables and objects."""
2+
13
from datetime import datetime
24
from typing import Any
35
from uuid import NAMESPACE_DNS, UUID, uuid5
@@ -43,15 +45,6 @@ def jsonb_column(name: str, aliases: list[str] | None = None) -> Any:
4345
)
4446

4547

46-
# NOTES
47-
# - model validation is not triggered when table=True
48-
# - Every object model needs to have three flavors:
49-
# 1. the declarative model of the object's database table
50-
# 2. the model of the manifest when creating a new object
51-
# 3. the model of the manifest when updating an object
52-
# 4. a response model for APIs related to the object
53-
54-
5548
class BaseSQLModel(SQLModel):
5649
__table_args__ = {"schema": config.db.table_schema}
5750
metadata = metadata
@@ -71,10 +64,6 @@ class CampaignBase(BaseSQLModel):
7164
metadata_: dict = jsonb_column("metadata", aliases=["metadata", "metadata_"])
7265
configuration: dict = jsonb_column("configuration", aliases=["configuration", "data", "spec"])
7366

74-
75-
class CampaignModel(CampaignBase):
76-
"""model used for resource creation."""
77-
7867
@model_validator(mode="before")
7968
@classmethod
8069
def custom_model_validator(cls, data: Any, info: ValidationInfo) -> Any:
@@ -83,15 +72,15 @@ def custom_model_validator(cls, data: Any, info: ValidationInfo) -> Any:
8372
"""
8473
if isinstance(data, dict):
8574
if "name" not in data:
86-
raise ValueError("'name' must be specified.")
75+
raise ValueError("<campaign> name missing.")
8776
if "namespace" not in data:
8877
data["namespace"] = _default_campaign_namespace
8978
if "id" not in data:
9079
data["id"] = uuid5(namespace=data["namespace"], name=data["name"])
9180
return data
9281

9382

94-
class Campaign(CampaignModel, table=True):
83+
class Campaign(CampaignBase, table=True):
9584
"""Model used for database operations involving campaigns_v2 table rows"""
9685

9786
__tablename__: str = "campaigns_v2" # type: ignore[misc]
@@ -111,6 +100,12 @@ class CampaignUpdate(BaseSQLModel):
111100
class NodeBase(BaseSQLModel):
112101
"""nodes_v2 db table"""
113102

103+
def __hash__(self) -> int:
104+
"""A Node is hashable according to its unique ID, so it can be used in
105+
sets and other places hashable types are required.
106+
"""
107+
return self.id.int
108+
114109
id: UUID = Field(primary_key=True)
115110
name: str
116111
namespace: UUID
@@ -119,33 +114,32 @@ class NodeBase(BaseSQLModel):
119114
default=ManifestKind.other,
120115
sa_column=Column("kind", Enum(ManifestKind, length=20, native_enum=False, create_constraint=False)),
121116
)
122-
status: StatusField | None = Field(
117+
status: StatusField = Field(
123118
default=StatusEnum.waiting,
124119
sa_column=Column("status", Enum(StatusEnum, length=20, native_enum=False, create_constraint=False)),
125120
)
126121
metadata_: dict = jsonb_column("metadata", aliases=["metadata", "metadata_"])
127122
configuration: dict = jsonb_column("configuration", aliases=["configuration", "data", "spec"])
128123

129-
130-
class NodeModel(NodeBase):
131-
"""model validating class for Nodes"""
132-
133124
@model_validator(mode="before")
134125
@classmethod
135126
def custom_model_validator(cls, data: Any, info: ValidationInfo) -> Any:
127+
"""Validates the model based on different types of raw inputs,
128+
where some default non-optional fields can be auto-populated.
129+
"""
136130
if isinstance(data, dict):
137-
if "version" not in data:
138-
data["version"] = 1
139-
if "name" not in data:
140-
raise ValueError("'name' must be specified.")
141-
if "namespace" not in data:
142-
data["namespace"] = _default_campaign_namespace
131+
if (node_name := data.get("name")) is None:
132+
raise ValueError("<node> name missing.")
133+
if (node_namespace := data.get("namespace")) is None:
134+
raise ValueError("<node> namespace missing.")
135+
if (node_version := data.get("version")) is None:
136+
data["version"] = node_version = 1
143137
if "id" not in data:
144-
data["id"] = uuid5(namespace=data["namespace"], name=f"""{data["name"]}.{data["version"]}""")
138+
data["id"] = uuid5(namespace=node_namespace, name=f"{node_name}.{node_version}")
145139
return data
146140

147141

148-
class Node(NodeModel, table=True):
142+
class Node(NodeBase, table=True):
149143
__tablename__: str = "nodes_v2" # type: ignore[misc]
150144

151145
machine: UUID | None = Field(foreign_key="machines_v2.id", default=None, ondelete="CASCADE")
@@ -163,28 +157,12 @@ class EdgeBase(BaseSQLModel):
163157
configuration: dict = jsonb_column("configuration", aliases=["configuration", "data", "spec"])
164158

165159

166-
class EdgeModel(EdgeBase):
167-
"""model validating class for Edges"""
168-
169-
@model_validator(mode="before")
170-
@classmethod
171-
def custom_model_validator(cls, data: Any, info: ValidationInfo) -> Any:
172-
if isinstance(data, dict):
173-
if "name" not in data:
174-
raise ValueError("'name' must be specified.")
175-
if "namespace" not in data:
176-
raise ValueError("Edges may only exist in a 'namespace'.")
177-
if "id" not in data:
178-
data["id"] = uuid5(namespace=data["namespace"], name=data["name"])
179-
return data
180-
181-
182-
class EdgeResponseModel(EdgeModel):
160+
class EdgeResponseModel(EdgeBase):
183161
source: Any
184162
target: Any
185163

186164

187-
class Edge(EdgeModel, table=True):
165+
class Edge(EdgeBase, table=True):
188166
__tablename__: str = "edges_v2" # type: ignore[misc]
189167

190168

@@ -216,24 +194,6 @@ class ManifestBase(BaseSQLModel):
216194
spec: dict = jsonb_column("spec", aliases=["spec", "configuration", "data"])
217195

218196

219-
class ManifestModel(ManifestBase):
220-
"""model validating class for Manifests"""
221-
222-
@model_validator(mode="before")
223-
@classmethod
224-
def custom_model_validator(cls, data: Any, info: ValidationInfo) -> Any:
225-
if isinstance(data, dict):
226-
if "version" not in data:
227-
data["version"] = 1
228-
if "name" not in data:
229-
raise ValueError("'name' must be specified.")
230-
if "namespace" not in data:
231-
data["namespace"] = _default_campaign_namespace
232-
if "id" not in data:
233-
data["id"] = uuid5(namespace=data["namespace"], name=f"""{data["name"]}.{data["version"]}""")
234-
return data
235-
236-
237197
class Manifest(ManifestBase, table=True):
238198
__tablename__: str = "manifests_v2" # type: ignore[misc]
239199

0 commit comments

Comments
 (0)