Skip to content

Commit e92be34

Browse files
committed
WIP: nodes and edges
1 parent 61e39cd commit e92be34

3 files changed

Lines changed: 243 additions & 19 deletions

File tree

src/lsst/cmservice/routers/v2/nodes.py

Lines changed: 114 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,15 @@
55
"""
66

77
from collections.abc import Sequence
8-
from typing import Annotated
8+
from typing import TYPE_CHECKING, Annotated
99
from uuid import UUID, uuid5
1010

1111
from fastapi import APIRouter, Depends, HTTPException, Query, Request, Response
1212
from sqlmodel import col, select
1313
from sqlmodel.ext.asyncio.session import AsyncSession
1414

1515
from ...common.enums import ManifestKind
16+
from ...common.jsonpatch import JSONPatch, JSONPatchError, apply_json_patch
1617
from ...common.logging import LOGGER
1718
from ...db.campaigns_v2 import Campaign, Node
1819
from ...db.manifests_v2 import ManifestWrapper
@@ -33,18 +34,31 @@
3334
"/",
3435
summary="Get a list of nodes",
3536
)
36-
async def read_node_collection(
37+
async def read_nodes_collection(
3738
request: Request,
3839
response: Response,
3940
session: Annotated[AsyncSession, Depends(db_session_dependency)],
4041
offset: Annotated[int, Query(ge=0)] = 0,
4142
limit: Annotated[int, Query(le=100)] = 10,
4243
) -> Sequence[Node]:
43-
"""Gets all nodes"""
44-
# TODO add paginated links to response header
45-
response.headers["Link"] = ""
44+
"""Fetches and returns all nodes known to the service.
45+
46+
Notes
47+
-----
48+
For campaign-scoped nodes, one should use the /campaigns/{}/nodes route.
49+
"""
4650
try:
4751
nodes = await session.exec(select(Node).offset(offset).limit(limit))
52+
response.headers["Next"] = (
53+
request.url_for("read_nodes_collection")
54+
.include_query_params(offset=(offset + limit), limit=limit)
55+
.__str__()
56+
)
57+
response.headers["Previous"] = (
58+
request.url_for("read_nodes_collection")
59+
.include_query_params(offset=(offset - limit), limit=limit)
60+
.__str__()
61+
)
4862
return nodes.all()
4963
except Exception as msg:
5064
logger.exception()
@@ -72,10 +86,14 @@ async def read_node_resource(
7286
except ValueError:
7387
s = s.where(Node.name == node_name)
7488

75-
campaign = (await session.exec(s)).one_or_none()
76-
if campaign is None:
89+
node = (await session.exec(s)).one_or_none()
90+
if node is None:
7791
raise HTTPException(status_code=404)
78-
return campaign
92+
response.headers["Self"] = request.url_for("read_node_resource", node_name=node.id).__str__()
93+
response.headers["Campaign"] = request.url_for(
94+
"read_campaign_resource", campaign_name=node.namespace
95+
).__str__()
96+
return node
7997

8098

8199
@router.post(
@@ -88,26 +106,19 @@ async def create_node_resource(
88106
manifest: ManifestWrapper,
89107
session: Annotated[AsyncSession, Depends(db_session_dependency)],
90108
) -> Node:
91-
# TODO should support query parameters that scope the namespace, such that
92-
# response headers from a campaign-create operation can immediately
93-
# follow a link to node-create for that campaign.
94-
95109
# Validate the input by checking the "kind" of manifest is a node
96110
if manifest.kind is not ManifestKind.node:
97111
raise HTTPException(status_code=422, detail="Nodes may only be created from a 'node' manifest")
98112
# and that the manifest includes any required fields, though this could
99113
# just as well be a try/except ValueError around `_.model_validate()`
100-
elif (node_name := manifest.metadata_.pop("name")) is None:
114+
elif (node_name := manifest.metadata_.pop("name", None)) is None:
101115
raise HTTPException(status_code=400, detail="Nodes must have a name set in '.metadata.name'")
102116

103-
# A node's spec must be a valid node spec
104-
# TODO match node with jsonschema and validate
105-
106117
# A node must exist in the namespace of an existing campaign
107-
node_namespace: str = manifest.metadata_.pop("namespace")
118+
node_namespace: str = manifest.metadata_.pop("namespace", None)
108119
if node_namespace is None:
109120
raise HTTPException(
110-
status_code=400, detail="Nodes must have a namespace set in '.metadata.namespace'"
121+
status_code=400, detail="Nodes must have a campaign namespace set in '.metadata.namespace'"
111122
)
112123

113124
try:
@@ -150,4 +161,89 @@ async def create_node_resource(
150161
session.add(node)
151162
await session.commit()
152163
await session.refresh(node)
164+
response.headers["Self"] = request.url_for("read_node_resource", node_name=node.id).__str__()
165+
response.headers["Campaign"] = request.url_for(
166+
"read_campaign_resource", campaign_name=node.namespace
167+
).__str__()
153168
return node
169+
170+
171+
@router.patch(
172+
"/{node_name_or_id}",
173+
summary="Update node detail",
174+
status_code=202,
175+
)
176+
async def update_node_resource(
177+
request: Request,
178+
response: Response,
179+
session: Annotated[AsyncSession, Depends(db_session_dependency)],
180+
node_name_or_id: str,
181+
patch_data: Sequence[JSONPatch],
182+
) -> Node:
183+
"""Partial update method for nodes.
184+
185+
A Nodes's spec or metadata may be updated with this PATCH operation. All
186+
updates to a Node creates a new version of the Node instead of
187+
updating an existing record in-place. This preserves history and keeps
188+
previous node versions available.
189+
190+
A Node's name, id, kind, or namespace may not be modified by this
191+
method, and attempts to do so will produce a 4XX client error.
192+
193+
This PATCH endpoint supports RFC6902 json-patch and RFCrequests.
194+
195+
Notes
196+
-----
197+
- This API always targets the latest version of a manifest when applying
198+
a patch. This requires and maintains a "linear" sequence of versions;
199+
it is not permissible to "patch" a previous version and create a "tree"-
200+
like history of manifests. For exmaple, every manifest may be diffed
201+
against any previous version without having to consider branches.
202+
"""
203+
use_rfc6902 = False
204+
if request.headers["Content-Type"] == "application/json-patch+json":
205+
use_rfc6902 = True
206+
else:
207+
raise HTTPException(status_code=406, detail="Unsupported Content-Type")
208+
209+
if TYPE_CHECKING:
210+
assert use_rfc6902
211+
212+
s = select(Node)
213+
# The input could be a UUID or it could be a literal name.
214+
try:
215+
if _id := UUID(node_name_or_id):
216+
s = s.where(Node.id == _id)
217+
except ValueError:
218+
s = s.where(Node.name == node_name_or_id)
219+
220+
# we want to order and sort by version, in descending order, so we always
221+
# fetch only the most recent version of manifest
222+
# FIXME this implies that when a node ID is provided, it should be an
223+
# error if it is not the most recent version.
224+
s = s.order_by(col(Node.version).desc()).limit(1)
225+
226+
old_manifest = (await session.exec(s)).one_or_none()
227+
if old_manifest is None:
228+
raise HTTPException(status_code=404, detail="No such node")
229+
230+
new_manifest = old_manifest.model_dump(by_alias=True)
231+
new_manifest["version"] += 1
232+
new_manifest["id"] = uuid5(new_manifest["namespace"], f"{new_manifest['name']}.{new_manifest['version']}")
233+
234+
for patch in patch_data:
235+
try:
236+
apply_json_patch(patch, new_manifest)
237+
except JSONPatchError as e:
238+
raise HTTPException(
239+
status_code=422,
240+
detail=f"Unable to process one or more patch operations: {e}",
241+
)
242+
243+
# create Manifest from new_manifest, add to session, and commit
244+
new_manifest_db = Node.model_validate(new_manifest)
245+
session.add(new_manifest_db)
246+
await session.commit()
247+
248+
# TODO response headers
249+
return new_manifest_db

tests/v2/test_edge_routes.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
"""Tests v2 fastapi manifest routes"""
1+
"""Tests v2 fastapi edge routes"""
22

33
from uuid import uuid4
44

tests/v2/test_node_routes.py

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
"""Tests v2 fastapi node routes"""
2+
3+
from uuid import uuid4
4+
5+
import pytest
6+
from httpx import AsyncClient
7+
8+
pytestmark = pytest.mark.asyncio(loop_scope="module")
9+
"""All tests in this module will run in the same event loop."""
10+
11+
12+
async def test_list_delete_no_nodes(aclient: AsyncClient) -> None:
13+
"""Tests listing nodes when there are no nodes available."""
14+
x = await aclient.get("/cm-service/v2/nodes")
15+
assert x.is_success
16+
assert len(x.json()) == 0
17+
18+
x = await aclient.get(f"/cm-service/v2/nodes/{uuid4()}")
19+
assert x.status_code == 404
20+
21+
# there is no delete api for nodes
22+
x = await aclient.delete(f"/cm-service/v2/nodes/{uuid4()}")
23+
assert x.status_code == 405
24+
25+
26+
async def test_node_negative(aclient: AsyncClient) -> None:
27+
"""Tests node route negative outcomes."""
28+
29+
# negative test: failing request model validation
30+
x = await aclient.post(
31+
"cm-service/v2/nodes",
32+
json={
33+
"kind": "no_such_kind",
34+
},
35+
)
36+
assert x.is_client_error
37+
38+
# negative test: using wrong manifest kind
39+
x = await aclient.post(
40+
"cm-service/v2/nodes",
41+
json={
42+
"kind": "other",
43+
},
44+
)
45+
assert x.is_client_error
46+
47+
# negative test: missing node name
48+
x = await aclient.post(
49+
"cm-service/v2/nodes",
50+
json={
51+
"kind": "node",
52+
"metadata": {},
53+
"spec": {},
54+
},
55+
)
56+
assert x.is_client_error
57+
58+
# negative test: missing campaign namespace
59+
x = await aclient.post(
60+
"cm-service/v2/nodes",
61+
json={
62+
"kind": "node",
63+
"metadata": {"name": uuid4().hex[8:]},
64+
"spec": {},
65+
},
66+
)
67+
assert x.is_client_error
68+
assert "campaign namespace" in x.text
69+
70+
71+
async def test_node_lifecycle(aclient: AsyncClient) -> None:
72+
"""Tests node lifecycle."""
73+
campaign_name = uuid4().hex[:8]
74+
75+
# Create a campaign for edges. Campaigns come with START and END nodes.
76+
x = await aclient.post(
77+
"/cm-service/v2/campaigns",
78+
json={
79+
"kind": "campaign",
80+
"metadata": {"name": campaign_name},
81+
},
82+
)
83+
assert x.is_success
84+
campaign_id = x.json()["id"]
85+
86+
# Create a new campaign node
87+
x = await aclient.post(
88+
"/cm-service/v2/nodes",
89+
json={
90+
"kind": "node",
91+
"metadata": {"name": uuid4().hex[8:], "namespace": campaign_id},
92+
"spec": {
93+
"handler": "lsst.cmservice.handlers.element_handler.ElementHandler",
94+
"pipeline_yaml": "${DRP_PIPE_DIR}/pipelines/HSC/DRP-RC2.yaml#step1",
95+
"query": [{"instrument": "HSC"}, {"skymap": "hsc_rings_v1"}],
96+
"split": {
97+
"dataset": "raw",
98+
"field": "exposure",
99+
"method": "query",
100+
"minGroups": 3,
101+
"maxInFlight": 3,
102+
},
103+
},
104+
},
105+
)
106+
assert x.is_success
107+
node = x.json()
108+
assert node["version"] == 1
109+
node_url = x.headers["Self"]
110+
111+
# Edit a Node using RFC6902 json-patch
112+
x = await aclient.patch(
113+
node_url,
114+
headers={"Content-Type": "application/json-patch+json"},
115+
json=[
116+
{"op": "replace", "path": "/configuration/split/maxInFlight", "value": 4},
117+
{
118+
"op": "add",
119+
"path": "/configuration/query/-",
120+
"value": {"dimension": "fact"},
121+
},
122+
],
123+
)
124+
assert x.is_success
125+
node = x.json()
126+
127+
# Edit a Node using RFCxxxx json-merge
128+
...

0 commit comments

Comments
 (0)