|
1 | 1 | from collections.abc import Iterable, Mapping, MutableSet, Sequence |
2 | | -from typing import Literal |
3 | | -from uuid import UUID |
| 2 | +from typing import TYPE_CHECKING, Literal |
| 3 | +from uuid import UUID, uuid4, uuid5 |
4 | 4 |
|
5 | 5 | import networkx as nx |
| 6 | +from sqlmodel import select |
| 7 | +from sqlmodel.ext.asyncio.session import AsyncSession |
6 | 8 |
|
| 9 | +from ..common.timestamp import element_time |
7 | 10 | from ..db import Script, ScriptDependency, Step, StepDependency |
8 | 11 | from ..db.campaigns_v2 import Edge, Node |
9 | 12 | from ..parsing.string import parse_element_fullname |
@@ -192,3 +195,89 @@ def processable_graph_nodes(g: nx.DiGraph) -> Iterable[Node]: |
192 | 195 |
|
193 | 196 | # the inspection should stop when there are no more nodes to check |
194 | 197 | yield from processable_nodes |
| 198 | + |
| 199 | + |
| 200 | +async def insert_node_to_graph( |
| 201 | + node_0: UUID, node_1: UUID, namespace: UUID, session: AsyncSession | None = None |
| 202 | +) -> None: |
| 203 | + """Apply an insert operation to a graph by adding a new node_1 immediately |
| 204 | + adjacent to node_0 where all downstream node_0 edges are moved to node_1. |
| 205 | +
|
| 206 | + ``` |
| 207 | + A --> B becomes A --> X --> B |
| 208 | +
|
| 209 | + A --> B becomes A --> X --> B |
| 210 | + `-> C `-> C |
| 211 | + ``` |
| 212 | + """ |
| 213 | + if TYPE_CHECKING: |
| 214 | + assert session is not None |
| 215 | + |
| 216 | + s = select(Edge).where(Edge.source == node_0) |
| 217 | + adjacent_edges = (await session.exec(s)).all() |
| 218 | + |
| 219 | + # Move all the adjacent edges from node_0 to node_1 |
| 220 | + for edge in adjacent_edges: |
| 221 | + edge.source = node_1 |
| 222 | + edge.metadata_["mtime"] = element_time() |
| 223 | + |
| 224 | + # Make node_0 and node_1 adjacent |
| 225 | + new_adjacency_name = uuid4() |
| 226 | + new_adjacency = Edge( |
| 227 | + id=uuid5(namespace, new_adjacency_name.bytes), |
| 228 | + name=new_adjacency_name.hex, |
| 229 | + namespace=namespace, |
| 230 | + source=node_0, |
| 231 | + target=node_1, |
| 232 | + ) |
| 233 | + session.add(new_adjacency) |
| 234 | + await session.commit() |
| 235 | + |
| 236 | + |
| 237 | +async def append_node_to_graph( |
| 238 | + node_0: UUID, node_1: UUID, namespace: UUID, session: AsyncSession | None = None |
| 239 | +) -> None: |
| 240 | + """Apply an append operation to a graph by adding a new node_1 parallel |
| 241 | + to an existing node_0 where all adjacencies are copied to node_1. |
| 242 | +
|
| 243 | + ``` |
| 244 | + A --> B --> C becomes A --> B --> C |
| 245 | + `-> X -/ |
| 246 | + ``` |
| 247 | + """ |
| 248 | + if TYPE_CHECKING: |
| 249 | + assert session is not None |
| 250 | + |
| 251 | + # create new "downstream" edges with node_1 |
| 252 | + s = select(Edge).where(Edge.source == node_0) |
| 253 | + downstream_edges = (await session.exec(s)).all() |
| 254 | + |
| 255 | + # create new "upstream" edges with node_1 |
| 256 | + s = select(Edge).where(Edge.target == node_0) |
| 257 | + upstream_edges = (await session.exec(s)).all() |
| 258 | + |
| 259 | + for edge in downstream_edges: |
| 260 | + session.expunge(edge) |
| 261 | + new_adjacency_name = uuid4() |
| 262 | + new_adjacency = Edge( |
| 263 | + id=uuid5(namespace, new_adjacency_name.bytes), |
| 264 | + name=new_adjacency_name.hex, |
| 265 | + namespace=namespace, |
| 266 | + source=node_1, |
| 267 | + target=edge.target, |
| 268 | + ) |
| 269 | + session.add(new_adjacency) |
| 270 | + |
| 271 | + for edge in upstream_edges: |
| 272 | + session.expunge(edge) |
| 273 | + new_adjacency_name = uuid4() |
| 274 | + new_adjacency = Edge( |
| 275 | + id=uuid5(namespace, new_adjacency_name.bytes), |
| 276 | + name=new_adjacency_name.hex, |
| 277 | + namespace=namespace, |
| 278 | + source=edge.source, |
| 279 | + target=node_1, |
| 280 | + ) |
| 281 | + session.add(new_adjacency) |
| 282 | + |
| 283 | + await session.commit() |
0 commit comments