diff --git a/src/lsst/cmservice/common/graph.py b/src/lsst/cmservice/common/graph.py index f2a1e71e8..184c20917 100644 --- a/src/lsst/cmservice/common/graph.py +++ b/src/lsst/cmservice/common/graph.py @@ -1,8 +1,11 @@ -from collections.abc import Mapping, Sequence +from collections.abc import Iterable, Mapping, MutableSet, Sequence +from typing import Literal import networkx as nx +from sqlalchemy import select from ..db import Script, ScriptDependency, Step, StepDependency +from ..db.campaigns_v2 import Edge, Node from ..parsing.string import parse_element_fullname from .types import AnyAsyncSession @@ -35,8 +38,159 @@ async def graph_from_edge_list( return g +async def graph_from_edge_list_v2( + edges: Sequence[Edge], + session: AnyAsyncSession, + node_type: type[Node] = Node, + node_view: Literal["simple", "model"] = "model", +) -> nx.DiGraph: + """Given a sequence of Edges, create a directed graph for these + edges with nodes derived from database lookups of the related objects. + + Parameters + ---------- + edges: Sequence[Edge] + The list of edges forming the graph + + node_type: type + The pydantic or sqlmodel class representing the graph node model + + node_view: "simple" or "model" + Whether the node metadata in the graph should be simplified (dict) or + using the full expunged model form. + + session + An async database session + """ + g = nx.DiGraph() + g.add_edges_from([(e.source, e.target) for e in edges]) + relabel_mapping = {} + + # The graph understands the nodes in terms of the IDs used in the edges, + # but we want to hydrate the entire Node model for subsequent users of this + # graph to reference without dipping back to the Database. + for node in g.nodes: + s = select(Node).where(Node.id == node) + db_node: Node = (await session.execute(s)).scalars().one() + + # This Node is going on an adventure where it does not need to drag its + # SQLAlchemy baggage along, so we expunge it from the session before + # adding it to the graph. + session.expunge(db_node) + if node_view == "simple": + # for the simple node view, the goal is to minimize the amount of + # data attached to the node and ensure that this data is json- + # serializable and otherwise appropriate for an API response + g.nodes[node]["id"] = str(db_node.id) + g.nodes[node]["status"] = db_node.status.name + g.nodes[node]["kind"] = db_node.kind.name + relabel_mapping[node] = db_node.name + else: + g.nodes[node]["model"] = db_node + + if relabel_mapping: + g = nx.relabel_nodes(g, mapping=relabel_mapping, copy=False) + + # TODO validate graph now raise exception, or leave it to the caller? + return g + + def graph_to_dict(g: nx.DiGraph) -> Mapping: """Renders a networkx directed graph to a mapping format suitable for JSON serialization. + + Notes + ----- + The "edges" attribute name in the node link data is "edges" instead of the + default "links". """ return nx.node_link_data(g, edges="edges") + + +def validate_graph(g: nx.DiGraph, source: str = "START", sink: str = "END") -> bool: + """Validates a graph by asserting by traversal that a complete and correct + path exists between `source` and `sink` nodes. + + "Correct" means that there are no cycles or isolate nodes (nodes with + degree 0) and no nodes with degree 1. + """ + try: + # Test that G is a directed graph with no cycles + is_valid = nx.is_directed_acyclic_graph(g) + assert is_valid + + # And that any path from source to sink exists + is_valid = nx.has_path(g, source, sink) + assert is_valid + + # Guard against bad graphs where START and/or END have been connected + # such that they are no longer the only source and sink + ... + + # Test that there are no isolated Nodes in the graph. A node becomes + # isolated if it was involved with an edge that has been removed from + # G with no replacement edge added, in which case the node should also + # be removed. + is_valid = nx.number_of_isolates(g) == 0 + assert is_valid + + # TODO Given the set of nodes in the graph, consider all paths in G + # from source to sink, making sure every node appears in a path? + + # Every node in G that is not the START/END node must have a degree + # of at least 2 (one inbound and one outbound edge). If G has any + # node with a degree of 1, it cannot be considered valid. + g_degree_view: Iterable = nx.degree(g, (n for n in g.nodes if n not in [source, sink])) + is_valid = min([d[1] for d in g_degree_view]) > 1 + assert is_valid + except (nx.exception.NodeNotFound, AssertionError): + return False + return True + + +def processable_graph_nodes(g: nx.DiGraph) -> Iterable[Node]: + """Traverse the graph G and produce an iterator of any nodes that are + candidates for processing, i.e., their status is waiting/prepared/running + and their ancestors are complete/successful. Graph nodes in a failed state + will block the graph and prevent candidacy for subsequent nodes. + + Yields + ------ + `lsst.cmservice.db.campaigns_v2.Node` + A Node ORM object that has been ``expunge``d from its ``Session``. + + Notes + ----- + This function operates only on valid graphs (see `validate_graph()`) that + have been built by the `graph_from_edge_list_v2()` function, where each + graph-node is decorated with a "model" attribute referring to an expunged + instance of ``Node``. This ``Node`` can be ``add``ed back to a ``Session`` + and manipulated in the usual way. + """ + processable_nodes: MutableSet[Node] = set() + + # A valid campaign graph will have only one source (START) with in_degree 0 + # and only one sink (END) with out_degree 0 + source = next(v for v, d in g.in_degree() if d == 0) + sink = next(v for v, d in g.out_degree() if d == 0) + + # For each path through the graph, evaluate the state of nodes to determine + # which nodes are up for processing. When there are multiple paths, we have + # parallelization and common ancestors may be evaluated more than once, + # which is an exercise in optimization left as a TODO + for path in nx.all_simple_paths(g, source, sink): + for n in path: + node: Node = g.nodes[n]["model"] + if node.status.is_processable_element(): + processable_nodes.add(node) + # We found a processable node in this path, stop traversal + break + elif node.status.is_bad(): + # We reached a failed node in this path, it is blocked + break + else: + # This node must be in a "successful" terminal state + continue + + # the inspection should stop when there are no more nodes to check + yield from processable_nodes diff --git a/src/lsst/cmservice/common/jsonpatch.py b/src/lsst/cmservice/common/jsonpatch.py index b57ce69af..5c0768060 100644 --- a/src/lsst/cmservice/common/jsonpatch.py +++ b/src/lsst/cmservice/common/jsonpatch.py @@ -3,7 +3,7 @@ """ import operator -from collections.abc import MutableMapping, MutableSequence +from collections.abc import Mapping, MutableMapping, MutableSequence from functools import reduce from typing import TYPE_CHECKING, Any, Literal @@ -51,7 +51,7 @@ def apply_json_patch[T: MutableMapping](op: JSONPatch, o: T) -> T: numeric, e.g., {"1": "first", "2": "second"} - Unsupported: JSON pointer values that refer to an entire object, e.g., "" -- the JSON Patch must have a root element ("/") per the model. - - Unsupported: JSON pointer values taht refer to a nameless object, e.g., + - Unsupported: JSON pointer values that refer to a nameless object, e.g., "/" -- JSON allows object keys to be the empty string ("") but this is disallowed by the application. """ @@ -222,3 +222,31 @@ def apply_json_patch[T: MutableMapping](op: JSONPatch, o: T) -> T: raise JSONPatchError(f"Unknown JSON Patch operation: {op.op}") return o + + +def apply_json_merge[T: MutableMapping](patch: Any, o: T) -> T: + """Applies a patch to a mapping object as per the RFC7396 JSON Merge Patch. + + Notably, this operation may only target a ``MutableMapping`` as an analogue + of a JSON object. This means that any keyed value in a Mapping may be + replaced, added, or removed by a JSON Merge. This is not appropriate for + patches that need to perform more tactical updates, such as modifying + elements of a ``Sequence``. + + This function does not allow setting a field value in the target to `None`; + instead, any `None` value in a patch is an instruction to remove that + field from the target completely. + + This function differs from the RFC in the following ways: it will not + replace the entire target object with a new mapping (i.e., the target must + be a Mapping). + """ + if isinstance(patch, Mapping): + for k, v in patch.items(): + if v is None: + _ = o.pop(k, None) + else: + o[k] = apply_json_merge(v, o.get(k, {})) + return o + else: + return patch diff --git a/src/lsst/cmservice/db/campaigns_v2.py b/src/lsst/cmservice/db/campaigns_v2.py index 655fa9112..8350f43aa 100644 --- a/src/lsst/cmservice/db/campaigns_v2.py +++ b/src/lsst/cmservice/db/campaigns_v2.py @@ -1,3 +1,5 @@ +"""ORM Models for v2 tables and objects.""" + from datetime import datetime from typing import Any from uuid import NAMESPACE_DNS, UUID, uuid5 @@ -43,15 +45,6 @@ def jsonb_column(name: str, aliases: list[str] | None = None) -> Any: ) -# NOTES -# - model validation is not triggered when table=True -# - Every object model needs to have three flavors: -# 1. the declarative model of the object's database table -# 2. the model of the manifest when creating a new object -# 3. the model of the manifest when updating an object -# 4. a response model for APIs related to the object - - class BaseSQLModel(SQLModel): __table_args__ = {"schema": config.db.table_schema} metadata = metadata @@ -71,10 +64,6 @@ class CampaignBase(BaseSQLModel): metadata_: dict = jsonb_column("metadata", aliases=["metadata", "metadata_"]) configuration: dict = jsonb_column("configuration", aliases=["configuration", "data", "spec"]) - -class CampaignModel(CampaignBase): - """model used for resource creation.""" - @model_validator(mode="before") @classmethod def custom_model_validator(cls, data: Any, info: ValidationInfo) -> Any: @@ -83,7 +72,7 @@ def custom_model_validator(cls, data: Any, info: ValidationInfo) -> Any: """ if isinstance(data, dict): if "name" not in data: - raise ValueError("'name' must be specified.") + raise ValueError(" name missing.") if "namespace" not in data: data["namespace"] = _default_campaign_namespace if "id" not in data: @@ -91,7 +80,7 @@ def custom_model_validator(cls, data: Any, info: ValidationInfo) -> Any: return data -class Campaign(CampaignModel, table=True): +class Campaign(CampaignBase, table=True): """Model used for database operations involving campaigns_v2 table rows""" __tablename__: str = "campaigns_v2" # type: ignore[misc] @@ -111,6 +100,12 @@ class CampaignUpdate(BaseSQLModel): class NodeBase(BaseSQLModel): """nodes_v2 db table""" + def __hash__(self) -> int: + """A Node is hashable according to its unique ID, so it can be used in + sets and other places hashable types are required. + """ + return self.id.int + id: UUID = Field(primary_key=True) name: str namespace: UUID @@ -119,33 +114,32 @@ class NodeBase(BaseSQLModel): default=ManifestKind.other, sa_column=Column("kind", Enum(ManifestKind, length=20, native_enum=False, create_constraint=False)), ) - status: StatusField | None = Field( + status: StatusField = Field( default=StatusEnum.waiting, sa_column=Column("status", Enum(StatusEnum, length=20, native_enum=False, create_constraint=False)), ) metadata_: dict = jsonb_column("metadata", aliases=["metadata", "metadata_"]) configuration: dict = jsonb_column("configuration", aliases=["configuration", "data", "spec"]) - -class NodeModel(NodeBase): - """model validating class for Nodes""" - @model_validator(mode="before") @classmethod def custom_model_validator(cls, data: Any, info: ValidationInfo) -> Any: + """Validates the model based on different types of raw inputs, + where some default non-optional fields can be auto-populated. + """ if isinstance(data, dict): - if "version" not in data: - data["version"] = 1 - if "name" not in data: - raise ValueError("'name' must be specified.") - if "namespace" not in data: - data["namespace"] = _default_campaign_namespace + if (node_name := data.get("name")) is None: + raise ValueError(" name missing.") + if (node_namespace := data.get("namespace")) is None: + raise ValueError(" namespace missing.") + if (node_version := data.get("version")) is None: + data["version"] = node_version = 1 if "id" not in data: - data["id"] = uuid5(namespace=data["namespace"], name=f"""{data["name"]}.{data["version"]}""") + data["id"] = uuid5(namespace=node_namespace, name=f"{node_name}.{node_version}") return data -class Node(NodeModel, table=True): +class Node(NodeBase, table=True): __tablename__: str = "nodes_v2" # type: ignore[misc] machine: UUID | None = Field(foreign_key="machines_v2.id", default=None, ondelete="CASCADE") @@ -163,28 +157,12 @@ class EdgeBase(BaseSQLModel): configuration: dict = jsonb_column("configuration", aliases=["configuration", "data", "spec"]) -class EdgeModel(EdgeBase): - """model validating class for Edges""" - - @model_validator(mode="before") - @classmethod - def custom_model_validator(cls, data: Any, info: ValidationInfo) -> Any: - if isinstance(data, dict): - if "name" not in data: - raise ValueError("'name' must be specified.") - if "namespace" not in data: - raise ValueError("Edges may only exist in a 'namespace'.") - if "id" not in data: - data["id"] = uuid5(namespace=data["namespace"], name=data["name"]) - return data - - -class EdgeResponseModel(EdgeModel): +class EdgeResponseModel(EdgeBase): source: Any target: Any -class Edge(EdgeModel, table=True): +class Edge(EdgeBase, table=True): __tablename__: str = "edges_v2" # type: ignore[misc] @@ -216,24 +194,6 @@ class ManifestBase(BaseSQLModel): spec: dict = jsonb_column("spec", aliases=["spec", "configuration", "data"]) -class ManifestModel(ManifestBase): - """model validating class for Manifests""" - - @model_validator(mode="before") - @classmethod - def custom_model_validator(cls, data: Any, info: ValidationInfo) -> Any: - if isinstance(data, dict): - if "version" not in data: - data["version"] = 1 - if "name" not in data: - raise ValueError("'name' must be specified.") - if "namespace" not in data: - data["namespace"] = _default_campaign_namespace - if "id" not in data: - data["id"] = uuid5(namespace=data["namespace"], name=f"""{data["name"]}.{data["version"]}""") - return data - - class Manifest(ManifestBase, table=True): __tablename__: str = "manifests_v2" # type: ignore[misc] diff --git a/src/lsst/cmservice/db/manifests_v2.py b/src/lsst/cmservice/db/manifests_v2.py index 0b080ba43..af3cfde28 100644 --- a/src/lsst/cmservice/db/manifests_v2.py +++ b/src/lsst/cmservice/db/manifests_v2.py @@ -5,6 +5,7 @@ """ from typing import Self +from uuid import uuid4 from pydantic import AliasChoices, BaseModel, ConfigDict, Field, ValidationInfo, model_validator @@ -99,3 +100,43 @@ def custom_model_validator(self, info: ValidationInfo) -> Self: raise ValueError("Campaigns may only be created from a manifest") return self + + +class EdgeMetadata(ManifestMetadata): + """Metadata model for an Edge Manifest. + + A default random alphanumeric 8-byte name is generated if no name provided. + """ + + name: str = Field(default_factory=lambda: uuid4().hex[:8]) + + +class EdgeSpec(ManifestSpec): + """Spec model for an Edge Manifest.""" + + source: str + target: str + + +class EdgeManifest(Manifest[EdgeMetadata, EdgeSpec]): + """validating model for Edges""" + + @model_validator(mode="after") + def custom_model_validator(self, info: ValidationInfo) -> Self: + """Validate an Edge Manifest after a model has been created.""" + if self.kind is not ManifestKind.edge: + raise ValueError("Edges may only be created from an manifest") + + return self + + +class NodeManifest(Manifest[VersionedMetadata, ManifestSpec]): + """validating model for Nodes""" + + @model_validator(mode="after") + def custom_model_validator(self, info: ValidationInfo) -> Self: + """Validate a Node Manifest after a model has been created.""" + if self.kind is not ManifestKind.node: + raise ValueError("Nodes may only be created from an manifest") + + return self diff --git a/src/lsst/cmservice/routers/v2/__init__.py b/src/lsst/cmservice/routers/v2/__init__.py index 94d0c1a86..938b69d99 100644 --- a/src/lsst/cmservice/routers/v2/__init__.py +++ b/src/lsst/cmservice/routers/v2/__init__.py @@ -2,7 +2,9 @@ from . import ( campaigns, + edges, manifests, + nodes, ) router = APIRouter( @@ -10,4 +12,6 @@ ) router.include_router(campaigns.router) +router.include_router(edges.router) router.include_router(manifests.router) +router.include_router(nodes.router) diff --git a/src/lsst/cmservice/routers/v2/campaigns.py b/src/lsst/cmservice/routers/v2/campaigns.py index dfcb80339..71ab74966 100644 --- a/src/lsst/cmservice/routers/v2/campaigns.py +++ b/src/lsst/cmservice/routers/v2/campaigns.py @@ -4,7 +4,7 @@ representing campaign objects within CM-Service. """ -from collections.abc import Sequence +from collections.abc import Mapping, Sequence from typing import TYPE_CHECKING, Annotated from uuid import UUID, uuid5 @@ -13,8 +13,10 @@ from sqlmodel import col, select from sqlmodel.ext.asyncio.session import AsyncSession +from ...common.graph import graph_from_edge_list_v2, graph_to_dict from ...common.logging import LOGGER -from ...db.campaigns_v2 import Campaign, CampaignUpdate, Edge, Node +from ...common.timestamp import element_time +from ...db.campaigns_v2 import Campaign, CampaignUpdate, Edge, Manifest, Node from ...db.manifests_v2 import CampaignManifest from ...db.session import db_session_dependency @@ -40,9 +42,17 @@ async def read_campaign_collection( limit: Annotated[int, Query(le=100)] = 10, offset: Annotated[int, Query()] = 0, ) -> Sequence[Campaign]: - """...""" + """A paginated API returning a list of all Campaigns known to the + application, from newest to oldest. + """ try: - campaigns = await session.exec(select(Campaign).offset(offset).limit(limit)) + statement = ( + select(Campaign) + .order_by(Campaign.metadata_["crtime"].desc().nulls_last()) + .offset(offset) + .limit(limit) + ) + campaigns = await session.exec(statement) response.headers["Next"] = str( request.url_for("read_campaign_collection").include_query_params( @@ -92,6 +102,9 @@ async def read_campaign_resource( response.headers["Edges"] = str( request.url_for("read_campaign_edge_collection", campaign_name=campaign.id) ) + response.headers["Manifests"] = str( + request.url_for("read_campaign_manifest_collection", campaign_name=campaign.id) + ) return campaign else: raise HTTPException(status_code=404) @@ -187,26 +200,66 @@ async def read_campaign_node_collection( limit: Annotated[int, Query(le=100)] = 10, offset: Annotated[int, Query()] = 0, ) -> Sequence[Node]: - # This is a convenience api that could also be `/nodes?campaign=... + """A paginated API returning a list of all Nodes in the namespace of a + single Campaign. + """ # The input could be a campaign UUID or it could be a literal name. # TODO this could just as well be a campaign query with a join to nodes - s = select(Node) + statement = select(Node).order_by(Node.metadata_["crtime"].asc().nulls_last()) + try: if campaign_id := UUID(campaign_name): - s = s.where(Node.namespace == campaign_id) + statement = statement.where(Node.namespace == campaign_id) except ValueError: # FIXME get an id from a name raise HTTPException(status_code=422, detail="campaign_name must be a uuid") - s = s.offset(offset).limit(limit) - nodes = await session.exec(s) + statement = statement.offset(offset).limit(limit) + nodes = await session.exec(statement) response.headers["Next"] = str( request.url_for( "read_campaign_node_collection", - campaign_name=campaign_name, + campaign_name=campaign_id, ).include_query_params(offset=(offset + limit), limit=limit), ) - # TODO Previous + response.headers["Self"] = str(request.url_for("read_campaign_resource", campaign_name=campaign_id)) + return nodes.all() + + +@router.get( + "/{campaign_name}/manifests", + summary="Get campaign Manifests", +) +async def read_campaign_manifest_collection( + request: Request, + response: Response, + session: Annotated[AsyncSession, Depends(db_session_dependency)], + campaign_name: str, + limit: Annotated[int, Query(le=100)] = 10, + offset: Annotated[int, Query()] = 0, +) -> Sequence[Manifest]: + """A paginated API returning a list of all Manifests in the namespace of a + single Campaign. + """ + + # The input could be a campaign UUID or it could be a literal name. + statement = select(Manifest).order_by(Manifest.metadata_["crtime"].asc().nulls_last()) + + try: + if campaign_id := UUID(campaign_name): + statement = statement.where(Manifest.namespace == campaign_id) + except ValueError: + # FIXME get an id from a name + raise HTTPException(status_code=422, detail="campaign_name must be a uuid") + statement = statement.offset(offset).limit(limit) + nodes = await session.exec(statement) + response.headers["Next"] = str( + request.url_for( + "read_campaign_manifest_collection", + campaign_name=campaign_id, + ).include_query_params(offset=(offset + limit), limit=limit), + ) + response.headers["Self"] = str(request.url_for("read_campaign_resource", campaign_name=campaign_id)) return nodes.all() @@ -222,7 +275,10 @@ async def read_campaign_edge_collection( *, resolve_names: bool = False, ) -> Sequence[Edge]: - # This is a convenience api that could also be `/edges?campaign=... + """A paginated API returning a list of all Edges in the namespace of a + single Campaign. This list of Edges can be used to construct the Campaign + graph. + """ # The input could be a campaign UUID or it could be a literal name. # This is why raw SQL is better than ORMs @@ -244,7 +300,7 @@ async def read_campaign_edge_collection( .join_from(Edge, target_nodes, Edge.target == target_nodes.id) ) else: - s = select(Edge) + s = select(Edge).order_by(col(Edge.name).asc().nulls_last()) try: if campaign_id := UUID(campaign_name): s = s.where(Edge.namespace == campaign_id) @@ -252,6 +308,8 @@ async def read_campaign_edge_collection( # FIXME get an id from a name raise HTTPException(status_code=422, detail="campaign_name must be a uuid") edges = await session.exec(s) + + response.headers["Self"] = str(request.url_for("read_campaign_resource", campaign_name=campaign_id)) return edges.all() @@ -301,13 +359,16 @@ async def create_campaign_resource( session: Annotated[AsyncSession, Depends(db_session_dependency)], manifest: CampaignManifest, ) -> Campaign: + """An API to create a Campaign from an appropriate Manifest.""" # Create a campaign spec from the manifest, delegating the creation of new # dynamic fields to the model validation method, -OR- create new dynamic # fields here. + campaign_metadata = manifest.metadata_.model_dump() + campaign_metadata |= {"crtime": element_time()} campaign = Campaign.model_validate( dict( - name=manifest.metadata_.name, - metadata_=manifest.metadata_.model_dump(), + name=campaign_metadata.pop("name"), + metadata_=campaign_metadata, # owner = ... # TODO Get username from gafaelfawr # noqa: ERA001 ) ) @@ -333,3 +394,42 @@ async def create_campaign_resource( ) return campaign + + +@router.get( + "/{campaign_name_or_id}/graph", + status_code=200, + summary="Construct and return a Campaign's graph of nodes", +) +async def read_campaign_graph( + request: Request, + response: Response, + campaign_name_or_id: str, + session: Annotated[AsyncSession, Depends(db_session_dependency)], +) -> Mapping: + """Reads the graph resource for a campaign and returns its JSON represent- + ation as serialized by the ``networkx.node_link_data()` function, i.e, the + "node-link format". + """ + + # The input could be a campaign UUID or it could be a literal name. + campaign_id: UUID | None + try: + campaign_id = UUID(campaign_name_or_id) + except ValueError: + s = select(Campaign.id).where(Campaign.name == campaign_name_or_id) + campaign_id = (await session.exec(s)).one_or_none() + + if campaign_id is None: + raise HTTPException(status_code=404, detail="No such campaign found.") + + # Fetch the Edges for the campaign + statement = select(Edge).filter_by(namespace=campaign_id) + edges = (await session.exec(statement)).all() + + # Organize the edges into a graph. The graph nodes are annotated with their + # current database attributes according to the "simple" node view. + graph = await graph_from_edge_list_v2(edges=edges, node_type=Node, session=session, node_view="simple") + + response.headers["Self"] = str(request.url_for("read_campaign_resource", campaign_name=campaign_id)) + return graph_to_dict(graph) diff --git a/src/lsst/cmservice/routers/v2/edges.py b/src/lsst/cmservice/routers/v2/edges.py new file mode 100644 index 000000000..d96857966 --- /dev/null +++ b/src/lsst/cmservice/routers/v2/edges.py @@ -0,0 +1,211 @@ +"""http routers for managing Edges. + +The /edges endpoint supports a collection resource and single resources +representing edge objects within CM-Service. +""" + +from collections.abc import Sequence +from typing import Annotated +from uuid import UUID, uuid5 + +from fastapi import APIRouter, Depends, HTTPException, Query, Request, Response +from sqlmodel import col, select +from sqlmodel.ext.asyncio.session import AsyncSession + +from ...common.logging import LOGGER +from ...db.campaigns_v2 import Campaign, Edge +from ...db.manifests_v2 import EdgeManifest +from ...db.session import db_session_dependency + +# TODO should probably bind a logger to the fastapi app or something +logger = LOGGER.bind(module=__name__) + + +# Build the router +router = APIRouter( + prefix="/edges", + tags=["edges", "v2"], +) + + +@router.get( + "/", + summary="Get a list of edges", +) +async def read_edges_collection( + request: Request, + response: Response, + session: Annotated[AsyncSession, Depends(db_session_dependency)], + offset: Annotated[int, Query(ge=0)] = 0, + limit: Annotated[int, Query(le=100)] = 10, +) -> Sequence[Edge]: + """Fetches and returns all edges known to the service. + + Notes + ----- + For campaign-scoped edges, one should use the /campaigns/{}/edges route. + """ + try: + statement = select(Edge).order_by(col(Edge.name).desc()).offset(offset).limit(limit) + edges = await session.exec(statement) + response.headers["Next"] = ( + request.url_for("read_edges_collection") + .include_query_params(offset=(offset + limit), limit=limit) + .__str__() + ) + response.headers["Previous"] = ( + request.url_for("read_edges_collection") + .include_query_params(offset=(offset - limit), limit=limit) + .__str__() + ) + return edges.all() + except Exception as msg: + logger.exception() + raise HTTPException(status_code=500, detail=f"{str(msg)}") from msg + + +@router.get( + "/{edge_name}", + summary="Get edge detail", +) +async def read_edge_resource( + request: Request, + response: Response, + edge_name: str, + session: Annotated[AsyncSession, Depends(db_session_dependency)], +) -> Edge: + """Fetch a single edge from the database given the edge name or id. + + The response headers include links to the connected nodes, the associated + campaign, and to the graph with which the edge is associated (i.e., the + collection of all campaign edges). + """ + s = select(Edge) + # The input could be UUID or a literal name. + try: + if edge_id := UUID(edge_name): + s = s.where(Edge.id == edge_id) + except ValueError: + s = s.where(Edge.name == edge_name) + + edge = (await session.exec(s)).one_or_none() + if edge is None: + raise HTTPException(status_code=404) + response.headers["Self"] = request.url_for("read_edge_resource", edge_name=edge.id).__str__() + response.headers["Source"] = request.url_for("read_node_resource", node_name=edge.source).__str__() + response.headers["Target"] = request.url_for("read_node_resource", node_name=edge.target).__str__() + response.headers["Campaign"] = request.url_for( + "read_campaign_resource", campaign_name=edge.namespace + ).__str__() + response.headers["Graph"] = request.url_for( + "read_campaign_edge_collection", campaign_name=edge.namespace + ).__str__() + return edge + + +@router.post( + "/", + summary="Add a edge resource", +) +async def create_edge_resource( + request: Request, + response: Response, + manifest: EdgeManifest, + session: Annotated[AsyncSession, Depends(db_session_dependency)], +) -> Edge: + """Creates a new edge from a Manifest. + + The Manifest must be of type "edge" and include a campaign namespace in its + metadata. If an edge name is not provided, a random name is assigned. + + ``` + --- + apiVersion: "io.lsst.cmservice/v1" + kind: edge + metadata: + name: {edge name} + namespace: {campaign uuid} + spec: + source: {node name or id} + target: {node name or id} + ``` + """ + edge_name = manifest.metadata_.name + source_node = manifest.spec.source + target_node = manifest.spec.target + + # A edge must exist in the namespace of an existing campaign + edge_namespace: str = manifest.metadata_.namespace + try: + edge_namespace_uuid: UUID | None = UUID(edge_namespace) + except ValueError: + # get the campaign ID by its name to use as a namespace + edge_namespace_uuid = ( + await session.exec(select(Campaign.id).where(Campaign.name == edge_namespace)) + ).one_or_none() + + # it is an error if the provided namespace (campaign) does not exist + if edge_namespace_uuid is None: + raise HTTPException(status_code=422, detail="Requested campaign namespace does not exist.") + + # an edge may specify the source and target nodes by name and version, + # which means the UUID of these nodes is deterministic, or we could go to + # the database to discover them + validate their existence. + # TODO the edge spec should support mappings for source/target nodes but + # for now assume the provided name has `.vN` appended to it already or + # default to v1 + # TODO support node id in spec + source_node = f"{source_node}.1" if "." not in source_node else str(source_node) + target_node = f"{target_node}.1" if "." not in target_node else str(target_node) + + # An edge's name is not necessarily deterministic, so for the ID we'll + # construct a UUID5 that involves the nodes instead + edge_id = uuid5(edge_namespace_uuid, f"{source_node}->{target_node}") + + edge = Edge( + id=edge_id, + name=edge_name, + namespace=edge_namespace_uuid, + source=uuid5(edge_namespace_uuid, source_node), + target=uuid5(edge_namespace_uuid, target_node), + configuration=manifest.spec.model_dump(), + ) + + # The merge operation is effectively an upsert should an edge matching the + # id already exist + edge = await session.merge(edge, load=True) + await session.commit() + + response.headers["Self"] = request.url_for("read_edge_resource", edge_name=edge.id).__str__() + response.headers["Source"] = request.url_for("read_node_resource", node_name=edge.source).__str__() + response.headers["Target"] = request.url_for("read_node_resource", node_name=edge.target).__str__() + response.headers["Campaign"] = request.url_for( + "read_campaign_resource", campaign_name=edge.namespace + ).__str__() + response.headers["Graph"] = request.url_for( + "read_campaign_edge_collection", campaign_name=edge.namespace + ).__str__() + return edge + + +@router.delete( + "/{edge_id}", + summary="Delete edge", + status_code=204, +) +async def delete_edge_resource( + request: Request, + response: Response, + session: Annotated[AsyncSession, Depends(db_session_dependency)], + edge_id: UUID, +) -> None: + """Delete an edge given its id.""" + s = select(Edge).where(Edge.id == edge_id) + edge_to_delete = (await session.exec(s)).one_or_none() + + if edge_to_delete is None: + raise HTTPException(status_code=404, detail="No such edge.") + + await session.delete(edge_to_delete) + await session.commit() + return None diff --git a/src/lsst/cmservice/routers/v2/manifests.py b/src/lsst/cmservice/routers/v2/manifests.py index 308f4a33e..32be3106b 100644 --- a/src/lsst/cmservice/routers/v2/manifests.py +++ b/src/lsst/cmservice/routers/v2/manifests.py @@ -14,6 +14,7 @@ from ...common.jsonpatch import JSONPatch, JSONPatchError, apply_json_patch from ...common.logging import LOGGER +from ...common.timestamp import element_time from ...db.campaigns_v2 import Campaign, Manifest, _default_campaign_namespace from ...db.manifests_v2 import ManifestModel from ...db.session import db_session_dependency @@ -51,7 +52,13 @@ async def read_manifest_collection( ) ) try: - nodes = await session.exec(select(Manifest).offset(offset).limit(limit)) + statement = ( + select(Manifest) + .order_by(Manifest.metadata_["crtime"].desc().nulls_last()) + .offset(offset) + .limit(limit) + ) + nodes = await session.exec(statement) return nodes.all() except Exception as msg: logger.exception() @@ -150,13 +157,16 @@ async def create_one_or_more_manifests( _previous = (await session.exec(s)).one_or_none() _version = _previous.version if _previous else manifest.metadata_.version _version += 1 + + _manifest_metadata = manifest.metadata_.model_dump() + _manifest_metadata |= {"crtime": element_time()} _manifest = Manifest( id=uuid5(_namespace_uuid, f"{_name}.{_version}"), - name=_name, + name=_manifest_metadata.pop("name"), namespace=_namespace_uuid, kind=manifest.kind, version=_version, - metadata_=manifest.metadata_.model_dump(), + metadata_=_manifest_metadata, spec=manifest.spec.model_dump(), ) @@ -240,6 +250,7 @@ async def update_manifest_resource( ) # create Manifest from new_manifest, add to session, and commit + new_manifest["metadata"] |= {"crtime": element_time()} new_manifest_db = Manifest.model_validate(new_manifest) session.add(new_manifest_db) await session.commit() diff --git a/src/lsst/cmservice/routers/v2/nodes.py b/src/lsst/cmservice/routers/v2/nodes.py new file mode 100644 index 000000000..59b95619e --- /dev/null +++ b/src/lsst/cmservice/routers/v2/nodes.py @@ -0,0 +1,248 @@ +"""http routers for managing Nodes. + +The /nodes endpoint supports a collection resource and single resources +representing node objects within CM-Service. +""" + +from collections.abc import Sequence +from typing import TYPE_CHECKING, Annotated +from uuid import UUID, uuid5 + +from fastapi import APIRouter, Depends, HTTPException, Query, Request, Response +from sqlmodel import col, select +from sqlmodel.ext.asyncio.session import AsyncSession + +from ...common.jsonpatch import JSONPatch, JSONPatchError, apply_json_patch +from ...common.logging import LOGGER +from ...common.timestamp import element_time +from ...db.campaigns_v2 import Campaign, Node +from ...db.manifests_v2 import NodeManifest +from ...db.session import db_session_dependency + +# TODO should probably bind a logger to the fastapi app or something +logger = LOGGER.bind(module=__name__) + + +# Build the router +router = APIRouter( + prefix="/nodes", + tags=["nodes", "v2"], +) + + +@router.get( + "/", + summary="Get a list of nodes", +) +async def read_nodes_collection( + request: Request, + response: Response, + session: Annotated[AsyncSession, Depends(db_session_dependency)], + offset: Annotated[int, Query(ge=0)] = 0, + limit: Annotated[int, Query(le=100)] = 10, +) -> Sequence[Node]: + """Fetches and returns all nodes known to the service. + + Notes + ----- + For campaign-scoped nodes, one should use the /campaigns/{}/nodes route. + """ + try: + statement = ( + select(Node).order_by(Node.metadata_["crtime"].desc().nulls_last()).offset(offset).limit(limit) + ) + nodes = await session.exec(statement) + response.headers["Next"] = ( + request.url_for("read_nodes_collection") + .include_query_params(offset=(offset + limit), limit=limit) + .__str__() + ) + response.headers["Previous"] = ( + request.url_for("read_nodes_collection") + .include_query_params(offset=(offset - limit), limit=limit) + .__str__() + ) + return nodes.all() + except Exception as msg: + logger.exception() + raise HTTPException(status_code=500, detail=f"{str(msg)}") from msg + + +@router.get( + "/{node_name}", + summary="Get node detail", +) +async def read_node_resource( + request: Request, + response: Response, + node_name: str, + session: Annotated[AsyncSession, Depends(db_session_dependency)], +) -> Node: + """Fetch a single node from the database given either the node id + or its name. + """ + s = select(Node) + # The input could be a campaign UUID or it could be a literal name. + try: + if node_id := UUID(node_name): + s = s.where(Node.id == node_id) + except ValueError: + s = s.where(Node.name == node_name) + + node = (await session.exec(s)).one_or_none() + if node is None: + raise HTTPException(status_code=404) + response.headers["Self"] = request.url_for("read_node_resource", node_name=node.id).__str__() + response.headers["Campaign"] = request.url_for( + "read_campaign_resource", campaign_name=node.namespace + ).__str__() + return node + + +@router.post( + "/", + summary="Add a node resource", +) +async def create_node_resource( + request: Request, + response: Response, + manifest: NodeManifest, + session: Annotated[AsyncSession, Depends(db_session_dependency)], +) -> Node: + node_name = manifest.metadata_.name + node_namespace = manifest.metadata_.namespace + + try: + node_namespace_uuid: UUID | None = UUID(node_namespace) + except ValueError: + # get the campaign ID by its name to use as a namespace + node_namespace_uuid = ( + await session.exec(select(Campaign.id).where(Campaign.name == node_namespace)) + ).one_or_none() + + # it is an error if the provided namespace (campaign) does not exist + # FIXME but this could also be handled by FK constraints + if node_namespace_uuid is None: + raise HTTPException(status_code=404, detail="Requested campaign namespace does not exist.") + + # A node must be a new version if name+namespace already exists + # - check db for node as name+namespace, get current version and increment + node_version = int(manifest.metadata_.version) + + s = ( + select(Node) + .where(Node.name == node_name) + .where(Node.namespace == node_namespace_uuid) + .order_by(col(Node.version).desc()) + .limit(1) + ) + previous_node = (await session.exec(s)).one_or_none() + + node_version = previous_node.version if previous_node else node_version + node_version += 1 + node_metadata = manifest.metadata_.model_dump() + node_metadata |= {"crtime": element_time()} + node = Node( + id=uuid5(node_namespace_uuid, f"{node_name}.{node_version}"), + name=node_metadata.pop("name"), + namespace=node_namespace_uuid, + version=node_version, + configuration=manifest.spec.model_dump(), + metadata_=node_metadata, + ) + + # Put the node in the database + session.add(node) + await session.commit() + await session.refresh(node) + response.headers["Self"] = request.url_for("read_node_resource", node_name=node.id).__str__() + response.headers["Campaign"] = request.url_for( + "read_campaign_resource", campaign_name=node.namespace + ).__str__() + return node + + +@router.patch( + "/{node_name_or_id}", + summary="Update node detail", + status_code=202, +) +async def update_node_resource( + request: Request, + response: Response, + session: Annotated[AsyncSession, Depends(db_session_dependency)], + node_name_or_id: str, + patch_data: Sequence[JSONPatch], +) -> Node: + """Partial update method for nodes. + + A Nodes's spec or metadata may be updated with this PATCH operation. All + updates to a Node creates a new version of the Node instead of + updating an existing record in-place. This preserves history and keeps + previous node versions available. + + A Node's name, id, kind, or namespace may not be modified by this + method, and attempts to do so will produce a 4XX client error. + + This PATCH endpoint supports RFC6902 json-patch requests. + + Notes + ----- + - This API always targets the latest version of a manifest when applying + a patch. This requires and maintains a "linear" sequence of versions; + it is not permissible to "patch" a previous version and create a "tree"- + like history of manifests. For exmaple, every manifest may be diffed + against any previous version without having to consider branches. + """ + use_rfc6902 = False + if request.headers["Content-Type"] == "application/json-patch+json": + use_rfc6902 = True + else: + raise HTTPException(status_code=406, detail="Unsupported Content-Type") + + if TYPE_CHECKING: + assert use_rfc6902 + + s = select(Node) + # The input could be a UUID or it could be a literal name. + try: + if _id := UUID(node_name_or_id): + s = s.where(Node.id == _id) + except ValueError: + s = s.where(Node.name == node_name_or_id) + + # we want to order and sort by version, in descending order, so we always + # fetch only the most recent version of manifest + # FIXME this implies that when a node ID is provided, it should be an + # error if it is not the most recent version. + s = s.order_by(col(Node.version).desc()).limit(1) + + old_manifest = (await session.exec(s)).one_or_none() + if old_manifest is None: + raise HTTPException(status_code=404, detail="No such node") + + new_manifest = old_manifest.model_dump(by_alias=True) + new_manifest["version"] += 1 + new_manifest["id"] = uuid5(new_manifest["namespace"], f"{new_manifest['name']}.{new_manifest['version']}") + + for patch in patch_data: + try: + apply_json_patch(patch, new_manifest) + except JSONPatchError as e: + raise HTTPException( + status_code=422, + detail=f"Unable to process one or more patch operations: {e}", + ) + + # create Manifest from new_manifest, add to session, and commit + new_manifest["metadata"] |= {"crtime": element_time()} + new_manifest_db = Node.model_validate(new_manifest) + session.add(new_manifest_db) + await session.commit() + + response.headers["Self"] = request.url_for("read_node_resource", node_name=new_manifest_db.id).__str__() + response.headers["Campaign"] = request.url_for( + "read_campaign_resource", campaign_name=new_manifest_db.namespace + ).__str__() + + return new_manifest_db diff --git a/tests/common/test_jsonpatch.py b/tests/common/test_jsonpatch.py index 73e823914..912bdc3d9 100644 --- a/tests/common/test_jsonpatch.py +++ b/tests/common/test_jsonpatch.py @@ -2,7 +2,7 @@ import pytest -from lsst.cmservice.common.jsonpatch import JSONPatch, JSONPatchError, apply_json_patch +from lsst.cmservice.common.jsonpatch import JSONPatch, JSONPatchError, apply_json_merge, apply_json_patch @pytest.fixture @@ -148,3 +148,17 @@ def test_jsonpatch_test(target_object: dict[str, Any]) -> None: op = JSONPatch(op="test", path="/spec/a_list/-", value="bob_alice") with pytest.raises(JSONPatchError): _ = apply_json_patch(op, target_object) + + +def test_json_merge_patch(target_object: dict[str, Any]) -> None: + """Tests the RFC7396 JSON Merge patch function.""" + + patch = { + "metadata": {"owner": None, "pilot": "bob_loblaw"}, + "spec": {"new_key": {"new_key": "new_value"}}, + } + new_object = apply_json_merge(patch, target_object) + + assert new_object["metadata"]["pilot"] == "bob_loblaw" + assert "owner" not in new_object["metadata"] + assert new_object["spec"]["new_key"]["new_key"] == "new_value" diff --git a/tests/v2/test_edge_routes.py b/tests/v2/test_edge_routes.py new file mode 100644 index 000000000..1c8285dec --- /dev/null +++ b/tests/v2/test_edge_routes.py @@ -0,0 +1,142 @@ +"""Tests v2 fastapi edge routes""" + +from uuid import uuid4 + +import pytest +from httpx import AsyncClient + +pytestmark = pytest.mark.asyncio(loop_scope="module") +"""All tests in this module will run in the same event loop.""" + + +async def test_list_delete_no_edges(aclient: AsyncClient) -> None: + """Tests listing and deleting edges when there are no edges available.""" + x = await aclient.get("/cm-service/v2/edges") + assert x.is_success + assert len(x.json()) == 0 + + x = await aclient.get(f"/cm-service/v2/edges/{uuid4()}") + assert x.status_code == 404 + + x = await aclient.delete(f"/cm-service/v2/edges/{uuid4()}") + assert x.status_code == 404 + + +async def test_edge_negative(aclient: AsyncClient) -> None: + """Tests edge route negative outcomes.""" + + # negative test: failing request model validation + x = await aclient.post( + "cm-service/v2/edges", + json={ + "kind": "no_such_kind", + }, + ) + assert x.is_client_error + + # negative test: using wrong manifest kind + x = await aclient.post( + "cm-service/v2/edges", + json={ + "kind": "other", + }, + ) + assert x.is_client_error + + # negative test: missing edge source or target + x = await aclient.post( + "cm-service/v2/edges", + json={ + "kind": "edge", + "metadata": {"name": uuid4().hex[8:], "namespace": "test_campaign"}, + "spec": { + "source": "START", + }, + }, + ) + assert x.is_client_error + + x = await aclient.post( + "cm-service/v2/edges", + json={ + "kind": "edge", + "metadata": {"name": uuid4().hex[8:], "namespace": "test_campaign"}, + "spec": { + "target": "END", + }, + }, + ) + assert x.is_client_error + + # negative test: missing campaign namespace + x = await aclient.post( + "cm-service/v2/edges", + json={ + "kind": "edge", + "metadata": {"name": uuid4().hex[8:]}, + "spec": {"source": "START", "target": "END"}, + }, + ) + assert x.is_client_error + + # negative: fail to delete an edge using a non-uuid string (like a name) + x = await aclient.delete( + "/cm-service/v2/edges/not_an_edge_id", + ) + assert x.status_code == 422 + + +async def test_edge_lifecycle(aclient: AsyncClient) -> None: + """Tests edge lifecycle.""" + campaign_name = uuid4().hex[:8] + + # Create a campaign for edges. Campaigns come with START and END nodes. + x = await aclient.post( + "/cm-service/v2/campaigns", + json={ + "kind": "campaign", + "metadata": {"name": campaign_name}, + "spec": {}, + }, + ) + assert x.is_success + campaign_id = x.json()["id"] + + # Create an edge between START and END + x = await aclient.post( + "/cm-service/v2/edges", + json={ + "kind": "edge", + "metadata": { + "name": uuid4().hex[8:], + "namespace": campaign_id, + }, + "spec": { + "source": "START", + "target": "END", + }, + }, + ) + assert x.is_success + assert "Campaign" in x.headers + edge = x.json() + edge_name = edge["name"] + edge_self_url = x.headers["Self"] + target_node_url = x.headers["Target"] + source_node_url = x.headers["Source"] + + # Verify header links to source and target nodes + x = await aclient.get(source_node_url) + assert x.is_success + assert x.json()["name"] == "START" + + x = await aclient.get(target_node_url) + assert x.is_success + assert x.json()["name"] == "END" + + x = await aclient.get(f"/cm-service/v2/edges/{edge_name}") + assert x.is_success + edge = x.json() + + x = await aclient.delete(f"{edge_self_url}") + assert x.is_success diff --git a/tests/v2/test_graph.py b/tests/v2/test_graph.py new file mode 100644 index 000000000..3f57d3444 --- /dev/null +++ b/tests/v2/test_graph.py @@ -0,0 +1,230 @@ +"""Tests graph operations using v2 objects""" + +from collections.abc import AsyncGenerator +from uuid import uuid4 + +import networkx as nx +import pytest +import pytest_asyncio +from httpx import AsyncClient + +from lsst.cmservice.common.enums import StatusEnum +from lsst.cmservice.common.graph import graph_from_edge_list_v2, processable_graph_nodes, validate_graph +from lsst.cmservice.common.types import AnyAsyncSession +from lsst.cmservice.db.campaigns_v2 import Edge + +pytestmark = pytest.mark.asyncio(loop_scope="module") +"""All tests in this module will run in the same event loop.""" + + +@pytest_asyncio.fixture(scope="module", loop_scope="module") +async def test_campaign(aclient: AsyncClient) -> AsyncGenerator[str]: + """Fixture managing a test campaign with two (additional) nodes.""" + campaign_name = uuid4().hex[-8:] + node_ids = [] + + x = await aclient.post( + "/cm-service/v2/campaigns", + json={ + "apiVersion": "io.lsst.cmservice/v1", + "kind": "campaign", + "metadata": {"name": campaign_name}, + "spec": {}, + }, + ) + campaign_edge_url = x.headers["Edges"] + campaign = x.json() + + # create a trio of nodes for the campaign + for _ in range(3): + x = await aclient.post( + "/cm-service/v2/nodes", + json={ + "apiVersion": "io.lsst.cmservice/v1", + "kind": "node", + "metadata": {"name": uuid4().hex[-8:], "namespace": campaign["id"]}, + "spec": {}, + }, + ) + node = x.json() + node_ids.append(node["name"]) + + # Create edges between each campaign node with parallelization + _ = await aclient.post( + "/cm-service/v2/edges", + json={ + "apiVersion": "io.lsst.cmservice/v1", + "kind": "edge", + "metadata": {"name": uuid4().hex[-8:], "namespace": campaign["id"]}, + "spec": { + "source": "START", + "target": node_ids[0], + }, + }, + ) + _ = await aclient.post( + "/cm-service/v2/edges", + json={ + "apiVersion": "io.lsst.cmservice/v1", + "kind": "edge", + "metadata": {"name": uuid4().hex[-8:], "namespace": campaign["id"]}, + "spec": { + "source": node_ids[0], + "target": node_ids[1], + }, + }, + ) + _ = await aclient.post( + "/cm-service/v2/edges", + json={ + "apiVersion": "io.lsst.cmservice/v1", + "kind": "edge", + "metadata": {"name": uuid4().hex[-8:], "namespace": campaign["id"]}, + "spec": { + "source": node_ids[0], + "target": node_ids[2], + }, + }, + ) + _ = await aclient.post( + "/cm-service/v2/edges", + json={ + "apiVersion": "io.lsst.cmservice/v1", + "kind": "edge", + "metadata": {"name": uuid4().hex[-8:], "namespace": campaign["id"]}, + "spec": { + "source": node_ids[1], + "target": "END", + }, + }, + ) + _ = await aclient.post( + "/cm-service/v2/edges", + json={ + "apiVersion": "io.lsst.cmservice/v1", + "kind": "edge", + "metadata": {"name": uuid4().hex[-8:], "namespace": campaign["id"]}, + "spec": { + "source": node_ids[2], + "target": "END", + }, + }, + ) + yield campaign_edge_url + + +async def test_build_and_walk_graph( + aclient: AsyncClient, session: AnyAsyncSession, test_campaign: str +) -> None: + """Test the generation and traversal of a campaign graph as created in the + ``test_campaign`` fixture. + + Test that the graph is traversed from START to END in order, and that as + graph nodes are "processable" they can be handled. In this test, the status + of each node is set to "accepted" and updated in the databse. The campaign + graph is recreated between each stage of the mock graph processing. + + The test campaign is a set of 3 nodes arranged in a graph: + + ``` + START --> A --> B --> END + --> C --> + ``` + """ + edge_list = [Edge.model_validate(edge) for edge in (await aclient.get(test_campaign)).json()] + graph = await graph_from_edge_list_v2(edge_list, session) + + # the START node should be the only processable Node + for node in processable_graph_nodes(graph): + assert node.name == "START" + assert node.status is StatusEnum.waiting + # Add the Node back to the session and update its status + session.add(node) + await session.refresh(node) + node.status = StatusEnum.accepted + await session.commit() + + # Repeat the graph building and traversal, this time expecting a single + # node that is not "START" + graph = await graph_from_edge_list_v2(edge_list, session) + for node in processable_graph_nodes(graph): + assert node.name != "START" + assert node.status is StatusEnum.waiting + # Add the Node back to the session and update its status + session.add(node) + await session.refresh(node) + node.status = StatusEnum.accepted + await session.commit() + + # Repeat the graph building and traversal, this time expecting a pair of + # nodes processable in parallel + graph = await graph_from_edge_list_v2(edge_list, session) + count = 0 + for node in processable_graph_nodes(graph): + count += 1 + assert node.name != "START" + assert node.status is StatusEnum.waiting + # Add the Node back to the session and update its status + session.add(node) + await session.refresh(node) + node.status = StatusEnum.accepted + await session.commit() + assert count == 2 + + # Finally, expect the END node + graph = await graph_from_edge_list_v2(edge_list, session) + for node in processable_graph_nodes(graph): + assert node.name == "END" + assert node.status is StatusEnum.waiting + # Add the Node back to the session and update its status + session.add(node) + await session.refresh(node) + node.status = StatusEnum.accepted + await session.commit() + + +def test_validate_graph() -> None: + """Test basic graph validation operations using a simple DAG.""" + edge_list = [("A", "B"), ("B", "C"), ("C", "D"), ("C", "E"), ("D", "F"), ("E", "F")] + + g = nx.DiGraph() + g.add_edges_from(edge_list) + + # this is a valid graph + assert validate_graph(g, "A", "F") + + # add a new parallel node with no path to sink + g.add_edge("C", "CC") + assert not validate_graph(g, "A", "F") + + # create a cycle with the new node + g.add_edge("CC", "A") + assert not validate_graph(g, "A", "F") + + # correct the path + g.remove_edge("CC", "A") + g.add_edge("CC", "F") + assert validate_graph(g, "A", "F") + + # remove the edges from a node + g.remove_edge("CC", "F") + g.remove_edge("C", "CC") + # the graph is invalid because "CC" is now an isolate + assert not validate_graph(g, "A", "F") + + # remove the unneeded node + g.remove_node("CC") + assert validate_graph(g, "A", "F") + + +async def test_campaign_graph_route(aclient: AsyncClient, test_campaign: str) -> None: + """Tests the acquisition of a serialized graph from a REST endpoint and + the subsequent reconstruction of a valid graph from the node-link data. + """ + graph_url = test_campaign.replace("/edges", "/graph") + x = await aclient.get(graph_url) + assert x.is_success + + # Test reconstruction of the serialized graph + graph = nx.node_link_graph(x.json(), edges="edges") + assert validate_graph(graph) diff --git a/tests/v2/test_manifest_routes.py b/tests/v2/test_manifest_routes.py index 0c4144e11..8f3b60ee7 100644 --- a/tests/v2/test_manifest_routes.py +++ b/tests/v2/test_manifest_routes.py @@ -131,6 +131,13 @@ async def test_load_manifests(aclient: AsyncClient) -> None: assert len(manifests) == 3 assert manifests[-1]["spec"]["one"] == 1 + # Get all the loaded manifests from the campaign route + x = await aclient.get(f"/cm-service/v2/campaigns/{campaign_id}/manifests") + assert x.is_success + manifests = x.json() + assert len(manifests) == 2 + assert manifests[-1]["spec"]["one"] == 1 + async def test_patch_manifest(aclient: AsyncClient) -> None: """Tests partial update of manifests and single resource retrieval.""" diff --git a/tests/v2/test_node_routes.py b/tests/v2/test_node_routes.py new file mode 100644 index 000000000..4ee5a2ddb --- /dev/null +++ b/tests/v2/test_node_routes.py @@ -0,0 +1,130 @@ +"""Tests v2 fastapi node routes""" + +from uuid import uuid4 + +import pytest +from httpx import AsyncClient + +pytestmark = pytest.mark.asyncio(loop_scope="module") +"""All tests in this module will run in the same event loop.""" + + +async def test_list_delete_no_nodes(aclient: AsyncClient) -> None: + """Tests listing nodes when there are no nodes available.""" + x = await aclient.get("/cm-service/v2/nodes") + assert x.is_success + assert len(x.json()) == 0 + + x = await aclient.get(f"/cm-service/v2/nodes/{uuid4()}") + assert x.status_code == 404 + + # there is no delete api for nodes + x = await aclient.delete(f"/cm-service/v2/nodes/{uuid4()}") + assert x.status_code == 405 + + +async def test_node_negative(aclient: AsyncClient) -> None: + """Tests node route negative outcomes.""" + + # negative test: failing request model validation + x = await aclient.post( + "cm-service/v2/nodes", + json={ + "kind": "no_such_kind", + }, + ) + assert x.is_client_error + + # negative test: using wrong manifest kind + x = await aclient.post( + "cm-service/v2/nodes", + json={ + "kind": "other", + }, + ) + assert x.is_client_error + + # negative test: missing node name + x = await aclient.post( + "cm-service/v2/nodes", + json={ + "kind": "node", + "metadata": {}, + "spec": {}, + }, + ) + assert x.is_client_error + + # negative test: missing campaign namespace + x = await aclient.post( + "cm-service/v2/nodes", + json={ + "kind": "node", + "metadata": {"name": uuid4().hex[8:]}, + "spec": {}, + }, + ) + assert x.is_client_error + + +async def test_node_lifecycle(aclient: AsyncClient) -> None: + """Tests node lifecycle.""" + campaign_name = uuid4().hex[:8] + + # Create a campaign for edges. Campaigns come with START and END nodes. + x = await aclient.post( + "/cm-service/v2/campaigns", + json={ + "kind": "campaign", + "metadata": {"name": campaign_name}, + "spec": {}, + }, + ) + assert x.is_success + campaign_id = x.json()["id"] + + # Create a new campaign node + x = await aclient.post( + "/cm-service/v2/nodes", + json={ + "kind": "node", + "metadata": {"name": uuid4().hex[8:], "namespace": campaign_id}, + "spec": { + "handler": "lsst.cmservice.handlers.element_handler.ElementHandler", + "pipeline_yaml": "${DRP_PIPE_DIR}/pipelines/HSC/DRP-RC2.yaml#step1", + "query": [{"instrument": "HSC"}, {"skymap": "hsc_rings_v1"}], + "split": { + "dataset": "raw", + "field": "exposure", + "method": "query", + "minGroups": 3, + "maxInFlight": 3, + }, + }, + }, + ) + assert x.is_success + node = x.json() + assert node["version"] == 1 + node_url = x.headers["Self"] + + # Edit a Node using RFC6902 json-patch + x = await aclient.patch( + node_url, + headers={"Content-Type": "application/json-patch+json"}, + json=[ + {"op": "replace", "path": "/configuration/split/maxInFlight", "value": 4}, + { + "op": "add", + "path": "/configuration/query/-", + "value": {"dimension": "fact"}, + }, + ], + ) + assert x.is_success + node = x.json() + + # test that the updates have been made and the version has been bumped + assert node["configuration"]["split"]["maxInFlight"] == 4 + assert node["configuration"]["query"][-1]["dimension"] == "fact" + assert node["version"] == 2