diff --git a/CONTRIBUTORS b/CONTRIBUTORS index 9e78c618f0..f9ccc9a073 100644 --- a/CONTRIBUTORS +++ b/CONTRIBUTORS @@ -108,6 +108,7 @@ Mark Watts Mark van der Pas Martin Wendt Martin van der Werff +Matt Goldberg Matthias Urban Maurizio Nagni Maxim Kolchin diff --git a/rdflib/extras/pathfinding.py b/rdflib/extras/pathfinding.py new file mode 100644 index 0000000000..473d9946cd --- /dev/null +++ b/rdflib/extras/pathfinding.py @@ -0,0 +1,1184 @@ +""" +Dijkstra-based pathfinding utility for RDFLib graphs. + +[`find_paths`][rdflib.extras.pathfinding.find_paths] is the +primary entry point. It performs Dijkstra-style graph +traversal to find paths between start and end nodes in any rdflib +[`Graph`][rdflib.graph.Graph]-compatible object. For unweighted paths +this reduces to breadth-first order. + +## Capabilities + +* **Fixed nodes, SPARQL patterns, or unbound** start / end +* **Fixed predicates, rdflib property paths, SPARQL patterns, or unbound** + path traversal +* **Shortest-path mode**: Keep only the shortest path(s) per end node +* **Weighted path lengths**: When the path is a SPARQL pattern that binds + ``?length``, each step carries a custom weight; shortest-path comparison + uses cumulative weighted length instead of hop count +* **Max-length limits**: Cap the cumulative weighted path length (for + unweighted paths this is equivalent to a hop-count limit) +* **Early termination**: Stop extending a path once it reaches a valid end +* **Automatic direction reversal**: Traversal starts from whichever side is + more constrained +* **Per-path cycle detection** like in SPARQL property path evaluation + +## Beyond SPARQL property paths + +SPARQL property paths are powerful for reachability queries, but they have +significant limitations when you need more than a boolean "is there a path?" answer: + +* **No intermediate-node access**: Property paths collapse the traversal + into a single ``(start, end)`` binding. You cannot inspect or filter + the nodes *along* the path. +* **No per-step metadata**: There is no way to capture edge predicates, + variable bindings, or weights for each hop. +* **No weighted and/or shortest path**: SPARQL has no built-in mechanism + for associating numeric costs with edges or to search for shortest path. +* **Limited termination control**: You cannot tell a property path to + stop at the first node that satisfies an arbitrary pattern, or to + continue past it. + +``find_paths`` fills these gaps by combining pathfinding with optional +expressiveness of SPARQL graph patterns at every layer (start selection, +per-hop expansion, end validation). Each discovered path is returned as +an ordered sequence of :class:`PathStep` objects carrying the node, +edge, bindings, and weighted length, information that is unavailable +from a property-path query. + +## How does this differ from other pathfinding tools? + +``find_paths`` relaxes several constraints sometimes found in +pathfinding functions in graph databases or other tools: + +* **Flexible endpoints**: Some pathfinding tools require + exactly one concrete start node and one concrete end node. + ``find_paths`` accepts a single node, an iterable of nodes, a + SPARQL WHERE-clause pattern, or ``None`` (fully unbound) for both + start and end independently. This makes it straightforward to + search from *all nodes matching a pattern* to *all nodes matching + another pattern* in a single call. + +* **Per-step metadata**: Some shortest-path results + return only the sequence of nodes (and sometimes edges). + ``find_paths`` returns a :class:`PathStep` per hop carrying the + node, edge predicate (when unbound), arbitrary SPARQL variable + bindings, and a per-step weighted length, which is usually + unavailable without post-processing. + +* **Weighted shortest path with SPARQL-defined costs**: Most RDF + stores have no built-in mechanism for associating numeric costs + with edges and selecting the minimum-cost route. By binding a + ``?length`` variable in the hop pattern, ``find_paths`` uses + a Dijkstra-style approach to find the shortest path, + something that would otherwise typically require extracting a + subgraph and running a separate algorithm using another library. + +* **SPARQL graph patterns as hop definitions**: Rather than + restricting hops to a single predicate or property-path expression, + ``find_paths`` accepts an arbitrary SPARQL WHERE-clause body as the + hop definition. This allows multi-triple patterns per step (e.g. + reified edges, intermediate nodes with type constraints) without + needing to flatten the graph into a simpler structure first. + +## Data types + +Each discovered path is returned as a +[`PathResult`][rdflib.extras.pathfinding.PathResult] containing an ordered +list of [`PathStep`][rdflib.extras.pathfinding.PathStep] objects. + +[`PathStep.length`][rdflib.extras.pathfinding.PathStep] is always ``1`` +unless the path is a SPARQL string pattern that binds ``?length``, in which +case the numeric value of that variable is used. +[`PathResult.length`][rdflib.extras.pathfinding.PathResult] is the sum of +all step lengths (``0`` for zero-length paths where ``start == end``). + +The contents of each step depend on the ``path`` argument type: + +| ``path`` type | ``PathStep.node`` | ``PathStep.length`` | ``PathStep.edge`` | ``PathStep.bindings`` | +| --- | --- | --- | --- | --- | +| ``None`` (unbound) | Node reached | ``1`` | Predicate traversed | ``None`` | +| ``URIRef`` or ``Path`` | Node reached | ``1`` | ``None`` | ``None`` | +| ``str`` (SPARQL pattern) | Node reached | ``?length`` value or ``1`` | ``None`` | Extra variable bindings dict (excluding ``?length``) | + +## Parameter quick-reference + +| Parameter | Accepts | Default | +| --- | --- | --- | +| ``start`` | ``Identifier``, ``Iterable[Identifier]``, ``str`` (SPARQL pattern with ``?start``), or ``None`` | ``None`` | +| ``end`` | ``Identifier``, ``Iterable[Identifier]``, ``str`` (SPARQL pattern with ``?end``), or ``None`` | ``None`` | +| ``path`` | ``URIRef``, ``Path``, ``str`` (SPARQL pattern with ``?start`` and ``?end``), or ``None`` | ``None`` | +| ``shortest`` | ``bool`` | ``True`` | +| ``terminate_on_first_match`` | ``bool`` | ``True`` | +| ``max_length`` | ``int``, ``float``, or ``None`` | ``None`` | +| ``initNs`` | ``dict`` of prefix-to-namespace mappings | ``None`` | + +## Examples + +Setting up a small graph for several of the examples below: + +```python +>>> from rdflib import Graph, Namespace +>>> from rdflib.extras.pathfinding import find_paths + +>>> EX = Namespace("http://example.org/") +>>> g = Graph() +>>> g.bind("ex", EX) +>>> _ = g.add((EX.Alice, EX.knows, EX.Bob)) +>>> _ = g.add((EX.Bob, EX.knows, EX.Carol)) +>>> _ = g.add((EX.Carol, EX.knows, EX.Dave)) +>>> _ = g.add((EX.Bob, EX.knows, EX.Dave)) + +``` + +### Simple predicate path + +Find all paths from Alice to Dave via ``:knows``: + +```python +>>> results = find_paths(g, start=EX.Alice, path=EX.knows, end=EX.Dave, shortest=False) +>>> len(results) +2 + +``` + +### Shortest path + +```python +>>> results = find_paths( +... g, start=EX.Alice, path=EX.knows, end=EX.Dave +... ) +>>> len(results) +1 +>>> len(results[0].steps) +2 + +``` + +### Unbound path (captures edge predicates) + +When ``path=None``, every predicate is traversed and each step records the +edge used: + +```python +>>> results = find_paths(g, start=EX.Alice, path=None, end=EX.Bob) +>>> results[0].steps[0].edge == EX.knows +True + +``` + +### SPARQL pattern start + +Use a WHERE-clause body to select start nodes dynamically: + +```python +>>> results = find_paths( +... g, +... start="?start ex:knows ex:Bob", +... path=EX.knows, +... end=EX.Dave, +... shortest=False, +... ) +>>> len(results) +2 + +``` + +### SPARQL pattern end + +Use an ASK-style pattern to filter valid end nodes: + +```python +>>> _ = g.add((EX.Dave, EX.role, EX.Manager)) +>>> results = find_paths( +... g, +... start=EX.Alice, +... path=EX.knows, +... end="?end ex:role ex:Manager", +... ) +>>> all(r.end == EX.Dave for r in results) +True + +``` + +### Exploring reachable nodes with ``max_length`` (cumulative path length) + +```python +>>> results = find_paths( +... g, +... start=EX.Alice, +... path=EX.knows, +... end=None, +... terminate_on_first_match=False, +... max_length=2, +... ) +>>> sorted(set(str(r.end).rsplit("/", 1)[-1] for r in results)) +['Alice', 'Bob', 'Carol', 'Dave'] + +``` + +### Property Paths + +rdflib Path objects are supported. +Each application of the full property path counts as one step: + +```python +>>> _ = g.add((EX.Bob, EX.friendOf, EX.Frank)) +>>> _ = g.add((EX.Dave, EX.friendOf, EX.Eve)) +>>> results = find_paths( +... g, start=EX.Alice, path=EX.knows / EX.friendOf, end=None, +... shortest=False, terminate_on_first_match=False, +... ) +>>> len(results) >= 1 +True + +``` + +### Weighted shortest path with ``?length`` + +When the path is a SPARQL pattern that binds ``?length``, each step's +weight is taken from that variable. ``shortest=True`` then picks the +path with the lowest *cumulative weighted length*, even if it has more +hops. The ``?length`` variable is consumed by +[`PathStep.length`][rdflib.extras.pathfinding.PathStep] and does +**not** appear in ``PathStep.bindings``. + +In this example a logistics company models shipping routes between +warehouses. Each route carries a cost. The direct +route from the New York warehouse to London costs $950, but routing +through Rotterdam ($200 + $350 = $550) is cheaper despite the extra hop: + +```python +>>> from rdflib import Literal +>>> routes = Graph() +>>> routes.bind("ex", EX) +>>> # Direct route: NewYork -> London, cost $950 +>>> _ = routes.add((EX.route1, EX.origin, EX.NewYork)) +>>> _ = routes.add((EX.route1, EX.destination, EX.London)) +>>> _ = routes.add((EX.route1, EX.shippingCost, Literal(950.0))) +>>> # NewYork -> Rotterdam, cost $200 +>>> _ = routes.add((EX.route2, EX.origin, EX.NewYork)) +>>> _ = routes.add((EX.route2, EX.destination, EX.Rotterdam)) +>>> _ = routes.add((EX.route2, EX.shippingCost, Literal(200.0))) +>>> # Rotterdam -> London, cost $350 +>>> _ = routes.add((EX.route3, EX.origin, EX.Rotterdam)) +>>> _ = routes.add((EX.route3, EX.destination, EX.London)) +>>> _ = routes.add((EX.route3, EX.shippingCost, Literal(350.0))) +>>> results = find_paths( +... routes, +... start=EX.NewYork, +... path="?route ex:origin ?start ; ex:destination ?end ; ex:shippingCost ?length", +... end=EX.London, +... shortest=True, +... terminate_on_first_match=True, +... ) +>>> len(results) # only the cheapest route +1 +>>> results[0].length # $200 + $350 +550.0 +>>> len(results[0].steps) # 2 hops via Rotterdam +2 + +``` + +### Provenance: most recent causal activity from a department + +Given a ``prov:Activity``, find the most causally recent ``prov:Activity`` +that was started by someone from the Legal department. The path pattern +walks backward through the PROV chain. Each step matches an entity that +was generated by one activity and used by the next, while the end pattern +filters for activities whose associated agent belongs to Legal: + +```python +results = find_paths( + graph, + start=EX.MyActivity, + path=PROV.used / PROV.wasGeneratedBy, + # Stop at activites started by the legal department + end="?end prov:wasStartedBy/org:memberOf ex:LegalDepartment", + shortest=True, # Shortest path to that activity only + terminate_on_first_match=True, # Stop at the first match + initNs={ + "prov": "http://www.w3.org/ns/prov#", + "org": "http://www.w3.org/ns/org#", + }, +) +activities = {path.end for path in results} +``` + +### Supply-chain lineage: raw materials in a finished product + +From a ``ex:PhysicalObject`` (a subclass of ``prov:Entity``), find every +"original" physical object that went into it (e.g. all raw materials that were +combined through a chain of activities to produce a finished product). + +Because the inputs and outputs of ``prov:Activity`` instances can include +entities that are *not* physical objects (documents, data records, etc.), +the path pattern explicitly checks that intermediate nodes must be +``ex:PhysicalObject`` instances. This guarantees that only paths through +physical objects are traversed, preventing unrelated physical objects from +appearing in the results: + +```python +results = find_paths( + graph, + # Find all upstream physical objects that are "original", i.e. have no further provenance + start="?start a ex:PhysicalObject . FILTER NOT EXISTS {?start prov:wasGeneratedBy []}", + # Ensure only paths through physical objects are traversed. + path="?end prov:wasGeneratedBy/prov:used ?start . ?start a ex:PhysicalObject .", + end=EX.FinishedProduct, + initNs={ + "prov": "http://www.w3.org/ns/prov#", + }, +) +raw_materials = {path.start for path in results} +``` + +- Matt Goldberg, 2026 +""" + +from __future__ import annotations + +import heapq +from collections.abc import Callable, Generator, Iterable +from dataclasses import dataclass, field +from enum import Enum +from typing import TYPE_CHECKING, NamedTuple, Union, cast + +if TYPE_CHECKING: + import typing_extensions as te + + from rdflib.graph import _ObjectType, _SubjectType + from rdflib.plugins.sparql.sparql import Query + +from rdflib import Graph, URIRef, Variable +from rdflib.paths import Path +from rdflib.plugins.sparql import prepareQuery +from rdflib.query import ResultRow +from rdflib.term import Identifier + + +class TraversalDirection(str, Enum): + """Traversal direction for :func:`_choose_direction`.""" + + FORWARD = "forward" + REVERSE = "reverse" + + +@dataclass(frozen=True) +class PathStep: + """ + One step along a discovered path. + + Attributes: + node: The node reached at this step. Always present. + length: The weighted length of this step. Always present. + Always ``1`` except when the path argument is a SPARQL string + whose pattern binds a ``?length`` variable: in that case the + numeric value of ``?length`` is used instead. + edge: Populated only when the path argument is ``None`` (unbound). + Contains the predicate (URIRef) traversed to reach *node*. + bindings: Populated only when the path argument is a SPARQL + string. Dict mapping each extra variable name (str, without + '?') to its bound value (Identifier) for this step's pattern + match. The ``?length`` variable, if present in the query, is + consumed by the *length* field and **not** included here. + """ + + node: Identifier + length: float + edge: Identifier | None = None + bindings: dict[str, Identifier] | None = None + + +@dataclass +class PathResult: + """ + A single discovered path from *start* to *end*. + + Attributes: + start: The starting node of the path. + end: The ending node of the path. + length: Total weighted length of the path (sum of step lengths). + For a zero-length path (``start == end``), *length* is ``0``. + steps: Ordered list of PathStep objects, one per edge traversed. + ``steps[-1].node == end`` (always, when ``len(steps) > 0``). + For a zero-length path (``start == end``), *steps* is ``[]``. + """ + + start: Identifier + end: Identifier + length: float = 0 + steps: list[PathStep] = field(default_factory=list) + + +# Union accepted for start / end parameters +_NodeSpec: te.TypeAlias = Union[Identifier, Iterable[Identifier], str, None] + +# Union accepted for the path parameter +_PathSpec: te.TypeAlias = Union[URIRef, Path, str, None] + +# Expansion function: current_node -> iterable of (neighbor, step) +_ExpandFn: te.TypeAlias = Callable[[Identifier], Iterable[tuple[Identifier, PathStep]]] + +# End-validation function: candidate_node -> bool +_EndCheckFn: te.TypeAlias = Callable[[Identifier], bool] + + +def _build_namespace_map( + graph: Graph, + initNs: dict[str, str] | None, # NOQA: N803 consistent capitalization of initNs +) -> dict[str, str]: + """ + Merge graph namespace_manager bindings with caller-supplied *initNs*. + + Caller-supplied prefixes take precedence. + + Args: + graph: The rdflib graph whose ``namespace_manager`` provides the base + prefix bindings. + initNs: Optional mapping of ``{prefix: namespace_uri}`` strings. When + not ``None``, these override any same-prefix binding from the + graph. + + Returns: + Combined namespace map suitable for ``prepareQuery(initNs=...)``. + """ + ns_map: dict[str, str] = { + prefix: str(ns) for prefix, ns in graph.namespace_manager.namespaces() + } + if initNs: + ns_map.update(initNs) + return ns_map + + +def _is_sparql_pattern(value: Union[_NodeSpec, _PathSpec]) -> te.TypeIs[str]: + """ + Check whether *value* is a plain ``str`` representing a SPARQL pattern. + + Args: + value: The value to test. + + Returns: + ``True`` if *value* is a ``str`` but not an ``Identifier``. + """ + return isinstance(value, str) and not isinstance(value, Identifier) + + +def _prepare_and_validate( + query_string: str, + ns_map: dict[str, str], + required_vars: set[str], + label: str, +) -> Query: + """ + Prepare a SPARQL query and validate that it contains *required_vars*. + + Args: + query_string: Full SPARQL query string. + ns_map: Namespace prefix-to-URI mapping. + required_vars: Set of variable names (without ``?``) that must appear + in the query. + label: Human-readable label for error messages. + + Returns: + The prepared query object. + + Raises: + ValueError: If any of *required_vars* is missing from the parsed + algebra. + """ + # Prepare the query once to reuse many times + prepared = prepareQuery(query_string, initNs=ns_map) + + # Verify that all required variables exist in the pattern + parsed_vars = {str(v) for v in prepared.algebra._vars} + missing = required_vars - parsed_vars + if missing: + formatted = ", ".join(f"?{v}" for v in sorted(missing)) + raise ValueError( + f"{label} must contain variable(s) {formatted}. " + f"Parsed variables: {sorted('?' + v for v in parsed_vars)}. " + f"Pattern: {query_string!r}" + ) + + return prepared + + +def _specificity(value: _NodeSpec) -> int: + """ + Return a specificity score used by :func:`_choose_direction`. + + The traversal is more efficient when it starts from the more constrained + (specific) side. This function assigns an integer score so that the + caller can compare start vs. end specificity. + + Scores (higher = more specific): + + - 3: single ``Identifier`` (exactly one node) + - 2: iterable of ``Identifier`` (finite known set) + - 1: SPARQL pattern ``str`` (set determined at query time) + - 0: ``None`` (unbound: every node in the graph) + + Args: + value: A start / end specification as accepted by + :func:`find_paths`. + + Returns: + Specificity score in the range ``[0, 3]``. + """ + if isinstance(value, Identifier): + return 3 + if _is_sparql_pattern(value): + return 1 + if value is None: + return 0 + # Assume iterable of Identifiers. Don't consume it in case it is a generator. + return 2 + + +def _choose_direction( + start: _NodeSpec, + end: _NodeSpec, +) -> TraversalDirection: + """ + Decide whether the traversal should run forward or in reverse. + + The heuristic compares the specificity of *start* and *end* (via + :func:`_specificity`). When the end side is more constrained than + the start side, the traversal is reversed so that it begins from the + smaller frontier, reducing the number of paths explored. + + Specificity ranking (most to least): single ``Identifier`` > + ``Iterable[Identifier]`` > ``str`` (SPARQL pattern) > ``None`` + (unbound). + + Args: + start: The caller's start specification. + end: The caller's end specification. + + Returns: + :attr:`TraversalDirection.FORWARD` if start is at least as specific + as end; :attr:`TraversalDirection.REVERSE` otherwise. + """ + if _specificity(end) > _specificity(start): + return TraversalDirection.REVERSE + + return TraversalDirection.FORWARD + + +def _resolve_origins( + origin_spec: _NodeSpec, + graph: Graph, + ns_map: dict[str, str], + origin_var: str, +) -> set[Identifier]: + """ + Materialize the origin specification into a concrete set of nodes. + + Dispatches on the type of *origin_spec*: + + - ``Identifier``: returns a singleton set. + - SPARQL pattern ``str``: compiles and executes a + ``SELECT DISTINCT ?`` query against *graph* and + collects the bound values. + - ``None``: returns the union of all subjects and objects in *graph* + (i.e. every node). + - Any other iterable: materializes it into a ``set``. + + Args: + origin_spec: The origin-side node specification. + graph: The rdflib graph to query. + ns_map: Namespace prefix-to-URI mapping for SPARQL compilation. + origin_var: The SPARQL variable name (``"start"`` or ``"end"``) to + extract from query result rows. + + Returns: + The concrete set of origin nodes (may be empty). + """ + if isinstance(origin_spec, Identifier): + return {origin_spec} + if _is_sparql_pattern(origin_spec): + # Prepare SPARQL queries for string patterns if relevant + # Origin pattern -> SELECT (to enumerate starting nodes) + label = ("End" if origin_var == "end" else "Start") + " pattern (origin)" + q = f"SELECT DISTINCT ?{origin_var} WHERE {{{origin_spec}}}" + prepared = _prepare_and_validate(q, ns_map, {origin_var}, label) + + # Evaluate the query to ge the set of origin nodes + results = graph.query(prepared) + var = Variable(origin_var) + return { + cast("Identifier", row[var]) + for row in results + if isinstance(row, ResultRow) and row[var] is not None + } + if origin_spec is None: + # Unbound: all nodes in the graph + return set(graph.subjects()) | set(graph.objects()) + # Iterable of Identifiers + return set(origin_spec) + + +def _build_expand_fn( + path_spec: _PathSpec, + graph: Graph, + ns_map: dict[str, str], + forward: bool, +) -> _ExpandFn: + """ + Build and return a one-hop expansion function for the traversal. + + The returned callable has the signature + ``(node: Identifier) -> Iterable[tuple[Identifier, PathStep]]`` + and yields ``(neighbor, step)`` pairs reachable from *node* in + a single step. + + The implementation dispatched depends on *path_spec*: + + - ``None`` (unbound): traverses every predicate in *graph*. + Each step carries the predicate as ``PathStep.edge``. + - ``URIRef`` or ``Path``: uses ``graph.objects`` / ``graph.subjects`` + with the given predicate or property-path object. + - ``str`` (SPARQL pattern): compiles and executes a + ``SELECT * WHERE { ... }`` query with ``?start`` or ``?end`` + bound to *node*. Each step carries extra variable bindings in + ``PathStep.bindings``. If the pattern binds ``?length``, its + numeric value is used as ``PathStep.length`` (and excluded from + ``bindings``). + + When *forward* is ``False`` the traversal direction is reversed + (objects -> subjects). + + Args: + path_spec: The path specification. + graph: The rdflib graph to traverse. + ns_map: Namespace prefix-to-URI mapping for SPARQL compilation. + forward: ``True`` for forward traversal, ``False`` for reverse. + + Returns: + A callable ``(Identifier) -> Iterable[(Identifier, PathStep)]``. + """ + + if path_spec is None: + # Unbound path: traverse all predicates; step carries edge predicate + if forward: + + def _expand_unbound_fwd( + node: Identifier, + ) -> Generator[tuple[Identifier, PathStep], None, None]: + for pred, obj in graph.predicate_objects(cast("_SubjectType", node)): + yield obj, PathStep(node=obj, length=1, edge=pred) + + return _expand_unbound_fwd + else: + + def _expand_unbound_rev( + node: Identifier, + ) -> Generator[tuple[Identifier, PathStep], None, None]: + for subj, pred in graph.subject_predicates(cast("_ObjectType", node)): + yield subj, PathStep(node=subj, length=1, edge=pred) + + return _expand_unbound_rev + + # URIRef or Path: traverse that path; no extra metadata + if not _is_sparql_pattern(path_spec) and isinstance(path_spec, (URIRef, Path)): + if forward: + + def _expand_path_fwd( + node: Identifier, + ) -> Generator[tuple[Identifier, PathStep], None, None]: + for obj in graph.objects(cast("_SubjectType", node), path_spec): + yield obj, PathStep(node=obj, length=1) + + return _expand_path_fwd + else: + + def _expand_path_rev( + node: Identifier, + ) -> Generator[tuple[Identifier, PathStep], None, None]: + for subj in graph.subjects(path_spec, cast("_ObjectType", node)): + yield subj, PathStep(node=subj, length=1) + + return _expand_path_rev + + # str: SPARQL pattern; step carries extra bindings with optional ?length + bind_var = Variable("start") if forward else Variable("end") + read_var = Variable("end") if forward else Variable("start") + + # Path pattern -> SELECT * (for finding neighbors) + q = f"SELECT * WHERE {{{path_spec}}}" + _path_query = _prepare_and_validate(q, ns_map, {"start", "end"}, "Path pattern") + + def _expand_sparql( + node: Identifier, + ) -> Generator[tuple[Identifier, PathStep], None, None]: + results = graph.query( + _path_query, + initBindings={bind_var: node}, + ) + for row in results: + if not isinstance(row, ResultRow): + continue + neighbor = cast("Identifier", row[read_var]) + bindings = { + str(v): row[v] + for v in row.labels + if str(v) not in ("start", "end") and row[v] is not None + } + # Extract ?length if bound + if "length" in bindings: + raw_length = bindings.pop("length") + try: + step_length = float(raw_length) + except (ValueError, TypeError) as exc: + raise TypeError( + f"?length must be numeric, got {raw_length.n3()} " + ) from exc + if step_length < 0: + raise ValueError( + f"?length must be non-negative, got {raw_length.n3()}. " + + "Negative edge weights break the Dijkstra shortest-path guarantee." + ) + else: + step_length = 1 + yield ( + neighbor, + PathStep( + node=neighbor, + length=step_length, + bindings=cast( + "dict[str, Identifier] | None", + bindings if bindings else None, + ), + ), + ) + + return _expand_sparql + + +def _build_end_check_fn( + end_spec: _NodeSpec, + graph: Graph, + ns_map: dict[str, str], + end_var: str, +) -> tuple[_EndCheckFn, frozenset[Identifier] | None]: + """ + Build an end-node validation function and optionally a known-ends set. + + Returns a two-element tuple ``(check_fn, known_end_nodes)``: + + - ``check_fn``: a callable ``(Identifier) -> bool`` that returns + ``True`` when the candidate node is a valid target. + - ``known_end_nodes``: a ``frozenset`` of all valid end nodes when + the target set is finite and known up-front (single ``Identifier`` + or iterable of ``Identifier``s). ``None`` when the end set is + open-ended (``None`` or SPARQL pattern). This is used by the + shortest-path pruning optimization in :func:`_traverse`. + + Dispatch by *end_spec* type: + + - ``None``: every node is a valid end (returns ``lambda: True``). + - ``Identifier``: only that single node is valid. + - SPARQL pattern ``str``: compiles and executes an ASK query with + ``?`` bound to the candidate. + - Iterable of ``Identifier``: materializes into a ``frozenset`` + for O(1) membership testing. + + Args: + end_spec: The target-side node specification. + graph: The rdflib graph to query. + ns_map: Namespace prefix-to-URI mapping for SPARQL compilation. + end_var: The SPARQL variable name to bind when validating candidates. + + Returns: + The validation function and (when determinable) the frozen set + of known end nodes. + """ + + # Unbound: All nodes match, no specified end nodes + if end_spec is None: + return lambda _node: True, None + + # Single Identifier: Match only that node + if isinstance(end_spec, Identifier): + target = end_spec + return lambda node: node == target, frozenset({target}) + + # str (SPARQL pattern): Match if ASK is true when end node is pre-bound, no specified end nodes + if _is_sparql_pattern(end_spec): + var = Variable(end_var) + + # Target pattern -> ASK (to validate candidate end nodes) + label = ("Start" if end_var == "start" else "End") + " pattern (target)" + q = f"ASK WHERE {{{end_spec}}}" + _end_query = _prepare_and_validate(q, ns_map, {end_var}, label) + + def _check_ask(node: Identifier) -> bool: + result = graph.query(_end_query, initBindings={var: node}) + return bool(result.askAnswer) + + return _check_ask, None + + # Iterable of Identifiers: Materialize into a frozenset + target_set = frozenset(end_spec) + return (lambda node: node in target_set), target_set + + +def _reverse_path( + traversal_origin: Identifier, + reverse_steps: list[PathStep], +) -> list[PathStep]: + """ + Transform reverse-collected steps into forward-order steps. + + When the traversal runs in reverse (end -> start), the steps are + collected in reverse traversal order: each step's ``node`` is the + node the traversal arrived *from* (in the caller's forward + perspective). This function re-orders the steps and reassigns + ``node`` so that the resulting list reads from start to end as expected. + + Each step's ``length``, ``edge``, and ``bindings`` are preserved + from the corresponding source step. + + Args: + traversal_origin: The traversal origin node (the path's end node + in forward order). + reverse_steps: Steps collected during reverse traversal, in + reverse order. + + Returns: + Steps in forward order (start -> end). Empty list if + *reverse_steps* is empty. + """ + if not reverse_steps: + return [] + + # In reverse order, step[i].node is the node the traversal came FROM. + # Forward-order destinations: step[n-2].node, ..., step[0].node, traversal_origin + reversed_steps = list(reversed(reverse_steps)) + forward_nodes = [s.node for s in reversed_steps[1:]] + [traversal_origin] + + return [ + PathStep( + node=dest, + length=source.length, + edge=source.edge, + bindings=source.bindings, + ) + for source, dest in zip(reversed_steps, forward_nodes) + ] + + +def _unreverse_results(results: list[PathResult]) -> None: + """ + Fix results that were collected in reverse direction. + + Args: + results: The list of results to fix up (modified in-place). + """ + for result in results: + # The traversal origin was the caller's end; the traversal + # target was the caller's start. Swap them back. + origin_node = result.start + result.start = result.end + result.end = origin_node + # Reverse the steps + result.steps = _reverse_path(origin_node, result.steps) + + +class _PartialPath(NamedTuple): + """Immutable representation of a partial path in the traversal frontier.""" + + origin: Identifier + current: Identifier + visited: frozenset[Identifier] + steps: tuple[PathStep, ...] + + +def _traverse( + origin_nodes: set[Identifier], + expand: _ExpandFn, + is_valid_end: _EndCheckFn, + shortest: bool, + terminate_on_first_match: bool, + max_length: float | None, + known_end_nodes: frozenset[Identifier] | None = None, +) -> list[PathResult]: + """ + Core traversal loop using a priority queue (Dijkstra-style). + + Paths are explored in order of increasing cumulative weighted length + via a min-heap. For unweighted paths (every step has ``length == 1``) + this degrades to breadth-first order. + + When *shortest* is ``True``, the heap ordering guarantees that the + first time a path reaches an end node, it is via the shortest + (minimum cumulative weighted length) route. Ties (multiple paths of + the same minimum length to the same end node) are preserved: the + traversal continues popping entries of equal length before moving on. + + When *shortest* is ``False``, all acyclic paths are collected without + pruning. The heap ordering is unused in this mode but adds only + negligible overhead. + + Path length is the **cumulative weighted length**: the sum of each + step's ``length`` attribute. When the path is a SPARQL pattern that + binds ``?length``, the step length equals that value. For all other + paths, every every step has ``length == 1``, so cumulative length + equals the hop count. + + **Max-length filtering**: when *max_length* is not ``None``, any + newly expanded path whose cumulative weighted length exceeds + *max_length* is discarded. + + **Cross-node cutoff**: when *shortest* is ``True`` **and** + *known_end_nodes* is supplied (i.e. the caller knows the finite set + of valid end nodes up-front), the shortest path length found so far + is tracked for each end node via a ``settled`` dict. Once + every known end node has been reached at least once, the maximum of + those settled lengths becomes a global cutoff: any partial path whose + cumulative length exceeds the cutoff is discarded, because it cannot + produce a shortest path to any remaining end node. + + Args: + origin_nodes: Concrete set of starting nodes. + expand: One-hop expansion callable + ``(Identifier) -> Iterable[(Identifier, PathStep)]`` built + by :func:`_build_expand_fn`. + is_valid_end: Callable ``(Identifier) -> bool`` that returns + ``True`` when a candidate node is a valid target, built by + :func:`_build_end_check_fn`. + shortest: If ``True``, keep only the shortest path(s) per end + node by cumulative weighted length. + terminate_on_first_match: If ``True``, stop extending a path + once it reaches a valid end node. + max_length: Maximum cumulative weighted path length. ``None`` + means no limit. + known_end_nodes: When not ``None``, the finite set of valid end + nodes known up-front. Enables early termination when all + targets have been reached. + + Returns: + All discovered paths matching the constraints. When *shortest* + is ``True``, only the minimum-length path(s) per end node are + included. + """ + + results: list[PathResult] = [] + counter = 0 # tie-breaker for heap stability (FIFO among equal lengths) + + # Map nodes to corresponding shortest cumulative length found so far. + # Because the heap pops in order of increasing length, the first + # time an end node is reached its length is the shortest for that node. + settled: dict[Identifier, float] = {} + # If all end nodes are known and all end nodes are settled, + # partial paths exceeding global_cutoff = max(settled.values()) are pruned + # as they cannot possibly be a shortest path for any end node. + global_cutoff: float | None = None + + # Zero-length paths: origin is also a valid end + heap: list[tuple[float, int, _PartialPath]] = [] + for node in origin_nodes: + if is_valid_end(node): + results.append(PathResult(start=node, end=node, length=0, steps=[])) + if shortest: + settled[node] = 0.0 + # Update global cutoff if all known ends are settled + if known_end_nodes is not None and len(settled) >= len(known_end_nodes): + global_cutoff = max(settled.values()) + entry = _PartialPath(node, node, frozenset({node}), ()) + heapq.heappush(heap, (0.0, counter, entry)) + counter += 1 + + # Priority-queue traversal + while heap: + cumulative_length, _, partial = heapq.heappop(heap) + origin, current, visited, steps = partial + + # Global cutoff: discard if this partial path already exceeds + # the longest shortest-path among all known end nodes. + if global_cutoff is not None and cumulative_length > global_cutoff: + continue + + # Max-length check + if max_length is not None and cumulative_length > max_length: + continue + + # Record result if current node is a valid end. + # Checking here guarantees that the heap ordering is respected. + # The first time a path ending at a given node is popped, it has + # the shortest cumulative length. + if steps and is_valid_end(current): + # Shortest pruning: skip if a strictly shorter path to + # this end node was already settled. + if shortest and current in settled and cumulative_length > settled[current]: + continue + + results.append( + PathResult( + start=origin, + end=current, + length=cumulative_length, + steps=list(steps), + ) + ) + + # Update settled state + if shortest and current not in settled: + settled[current] = cumulative_length + # Update global cutoff once all known ends are settled + if ( + global_cutoff is None + and known_end_nodes is not None + and len(settled) >= len(known_end_nodes) + ): + global_cutoff = max(settled.values()) + + if terminate_on_first_match: + continue # do NOT expand further from this node + + # Expand neighbors + for neighbor, new_step in expand(current): + # Per-path cycle detection + if neighbor in visited: + continue + + new_cumulative_length = cumulative_length + new_step.length + + # Max-length check on the expanded path + if max_length is not None and new_cumulative_length > max_length: + continue + + # Global cutoff check on the expanded path + if global_cutoff is not None and new_cumulative_length > global_cutoff: + continue + + new_steps = (*steps, new_step) + new_visited = visited | {neighbor} + + # Push onto the heap for later processing + entry = _PartialPath(origin, neighbor, new_visited, new_steps) + heapq.heappush(heap, (new_cumulative_length, counter, entry)) + counter += 1 + + return results + + +def find_paths( + graph: Graph, + start: _NodeSpec = None, + path: _PathSpec = None, + end: _NodeSpec = None, + shortest: bool = True, + terminate_on_first_match: bool = True, + max_length: float | None = None, + initNs: ( # NOQA: N803 consistent capitalization of initNs + dict[str, str] | None + ) = None, +) -> list[PathResult]: + """ + Find paths in an RDFLib graph. + + Args: + graph: Any rdflib graph-like object. + start: Fixed node, iterable of fixed nodes, SPARQL WHERE-clause + body (must contain ``?start``), or ``None`` (unbound). + path: Fixed predicate, rdflib property path, SPARQL WHERE-clause + body (must contain ``?start`` and ``?end``), or ``None`` + (unbound). + + When *path* is a SPARQL string and the pattern binds a + ``?length`` variable, its numeric value is used as the + weighted length of each step (see :class:`PathStep`). The + ``?length`` variable is consumed and does **not** appear in + ``PathStep.bindings``. If ``?length`` is not bound, each + step has ``length == 1``. + end: Fixed node, iterable of fixed nodes, SPARQL WHERE-clause + body (must contain ``?end``), or ``None`` (unbound). + shortest: If ``True`` (the default), return only the shortest + path(s) per end node. "Shortest" is determined by + cumulative weighted length (``PathResult.length``), which + equals the hop count when no ``?length`` variable is bound. + If multiple paths of the same minimum length reach the same + end node, all are returned. Set to ``False`` to return all + discovered acyclic paths. + terminate_on_first_match: If ``True``, stop extending a path + once it reaches a valid end. + max_length: Maximum cumulative weighted path length. Only paths + whose total length (``PathResult.length``) does not exceed + this value are returned. For non-SPARQL paths (or SPARQL + paths that do not bind ``?length``), every step has + ``length == 1``, so *max_length* is equivalent to a + hop-count limit. When the path is a SPARQL pattern that + binds ``?length``, the limit applies to the sum of the + per-step weights. + initNs: Namespace prefix mapping merged with graph namespaces. + + Returns: + All discovered paths matching the query constraints. + + Raises: + ValueError: If all three of *start*, *path*, *end* are ``None``, + or if a SPARQL pattern is missing required variables, or if + *max_length* is negative. + """ + + if start is None and path is None and end is None: + raise ValueError( + "At least one of start, path, or end must be provided. " + "A fully unbound query (all three None) is not supported." + ) + + if max_length is not None and max_length < 0: + raise ValueError(f"max_length must be non-negative, got {max_length}") + + ns_map = _build_namespace_map(graph, initNs) + + # Determine search direction. Must be chosen before preparing queries so we know which + # string pattern becomes a SELECT (origin) vs ASK (target). + direction = _choose_direction(start, end) + reversed_search = direction == TraversalDirection.REVERSE + + if reversed_search: + origin_spec = end # caller's end becomes traversal origin + target_spec = start # caller's start becomes traversal target + origin_var = "end" + target_var = "start" + else: + origin_spec = start + target_spec = end + origin_var = "start" + target_var = "end" + + # Resolve origin nodes from which to start finding paths + origin_nodes = _resolve_origins(origin_spec, graph, ns_map, origin_var) + + # Terminate early if no origins are found + if not origin_nodes: + return [] + + # Build expansion function + expand = _build_expand_fn(path, graph, ns_map, not reversed_search) + + # Build end-validation function + is_valid_end, known_end_nodes = _build_end_check_fn( + target_spec, graph, ns_map, target_var + ) + + # Execute traversal + results = _traverse( + origin_nodes=origin_nodes, + expand=expand, + is_valid_end=is_valid_end, + shortest=shortest, + terminate_on_first_match=terminate_on_first_match, + max_length=max_length, + known_end_nodes=known_end_nodes, + ) + + # If search was reversed, fix results accordingly + if reversed_search: + _unreverse_results(results) + + return results diff --git a/test/test_extras/test_pathfinding.py b/test/test_extras/test_pathfinding.py new file mode 100644 index 0000000000..854ee2b214 --- /dev/null +++ b/test/test_extras/test_pathfinding.py @@ -0,0 +1,2160 @@ +import pytest + +from rdflib import BNode, Graph, Literal, Namespace +from rdflib.extras.pathfinding import ( + PathResult, + TraversalDirection, + _build_end_check_fn, + _build_expand_fn, + _build_namespace_map, + _choose_direction, + _is_sparql_pattern, + _prepare_and_validate, + _resolve_origins, + find_paths, +) + +EX = Namespace("http://example.org/") +NS = {"ex": str(EX)} + + +@pytest.fixture() +def linear_graph() -> Graph: + """A -> B -> C -> D via ex:knows, plus a shortcut B -> D.""" + g = Graph() + g.bind("ex", EX) + g.add((EX.Alice, EX.knows, EX.Bob)) + g.add((EX.Bob, EX.knows, EX.Carol)) + g.add((EX.Carol, EX.knows, EX.Dave)) + g.add((EX.Bob, EX.knows, EX.Dave)) + return g + + +@pytest.fixture() +def cyclic_graph() -> Graph: + """A -> B -> C -> A (cycle) via ex:knows.""" + g = Graph() + g.bind("ex", EX) + g.add((EX.A, EX.knows, EX.B)) + g.add((EX.B, EX.knows, EX.C)) + g.add((EX.C, EX.knows, EX.A)) + return g + + +@pytest.fixture() +def diamond_graph() -> Graph: + """Diamond: A -> B, A -> C, B -> D, C -> D via ex:knows.""" + g = Graph() + g.bind("ex", EX) + g.add((EX.A, EX.knows, EX.B)) + g.add((EX.A, EX.knows, EX.C)) + g.add((EX.B, EX.knows, EX.D)) + g.add((EX.C, EX.knows, EX.D)) + return g + + +@pytest.fixture() +def multi_pred_graph() -> Graph: + """Graph with multiple predicates between nodes.""" + g = Graph() + g.bind("ex", EX) + g.add((EX.Alice, EX.knows, EX.Bob)) + g.add((EX.Bob, EX.worksWith, EX.Carol)) + g.add((EX.Carol, EX.knows, EX.Dave)) + return g + + +@pytest.fixture() +def manager_graph(linear_graph: Graph) -> Graph: + """linear_graph extended with role annotations.""" + linear_graph.add((EX.Dave, EX.role, EX.Manager)) + linear_graph.add((EX.Carol, EX.role, EX.Manager)) + linear_graph.add((EX.Alice, EX.department, EX.Engineering)) + linear_graph.add((EX.Bob, EX.department, EX.Engineering)) + return linear_graph + + +def _path_nodes(result: PathResult) -> list[str]: + """Extract node local names from a PathResult for easy assertion.""" + names = [str(result.start).rsplit("/", 1)[-1]] + names.extend(str(s.node).rsplit("/", 1)[-1] for s in result.steps) + return names + + +def _path_node_sets(results: list[PathResult]) -> set[tuple[str, ...]]: + """Convert results to a set of node-name tuples for order-independent comparison.""" + return {tuple(_path_nodes(r)) for r in results} + + +class TestIsSparqlPattern: + """Unit tests for _is_sparql_pattern.""" + + @pytest.mark.parametrize( + "value, expected", + [ + pytest.param("?start ex:knows ?end", True, id="sparql_pattern"), + pytest.param("", True, id="empty_string"), + pytest.param( + TraversalDirection.FORWARD, True, id="str_enum_not_identifier" + ), + pytest.param(EX.knows, False, id="uriref"), + pytest.param(None, False, id="none"), + pytest.param(42, False, id="integer"), + pytest.param([EX.Alice], False, id="list"), + pytest.param(BNode(), False, id="bnode"), + pytest.param(Literal("hello"), False, id="literal"), + ], + ) + def test_is_sparql_pattern(self, value, expected): + assert _is_sparql_pattern(value) is expected + + +class TestPrepareAndValidate: + """Unit tests for _prepare_and_validate.""" + + def test_valid_select_query(self): + """A well-formed SELECT with the required variable succeeds.""" + q = "SELECT DISTINCT ?start WHERE { ?start ex:knows ex:Bob }" + prepared = _prepare_and_validate(q, NS, {"start"}, "test") + assert prepared is not None + + def test_valid_ask_query(self): + """A well-formed ASK with the required variable succeeds.""" + q = "ASK WHERE { ?end ex:role ex:Manager }" + prepared = _prepare_and_validate(q, NS, {"end"}, "test") + assert prepared is not None + + def test_missing_single_variable_raises(self): + """Missing a required variable raises ValueError.""" + q = "SELECT ?x WHERE { ?x ex:knows ex:Bob }" + with pytest.raises(ValueError, match="start"): + _prepare_and_validate(q, NS, {"start"}, "Start pattern") + + def test_missing_multiple_variables_raises(self): + """Missing multiple required variables lists them all.""" + q = "SELECT ?x WHERE { ?x ex:knows ?y }" + with pytest.raises(ValueError, match="end") as exc_info: + _prepare_and_validate(q, NS, {"start", "end"}, "Path pattern") + assert "start" in str(exc_info.value) + + def test_extra_variables_allowed(self): + """Extra variables beyond the required set are fine.""" + q = "SELECT * WHERE { ?start ex:knows ?end . ?start ex:worksAt ?company }" + prepared = _prepare_and_validate(q, NS, {"start", "end"}, "test") + assert prepared is not None + + def test_namespace_resolution(self): + """Prefixes from ns_map are resolved correctly.""" + ns = {"ex": "http://example.org/"} + q = "SELECT ?start WHERE { ?start ex:knows ex:Bob }" + prepared = _prepare_and_validate(q, ns, {"start"}, "test") + assert prepared is not None + + def test_error_message_includes_pattern(self): + """Error message includes the original query string.""" + q = "SELECT ?x WHERE { ?x ex:knows ex:Bob }" + with pytest.raises(ValueError, match=r"SELECT \?x WHERE"): + _prepare_and_validate(q, NS, {"start"}, "test") + + def test_error_message_includes_label(self): + """Error message includes the human-readable label.""" + q = "SELECT ?x WHERE { ?x ex:knows ex:Bob }" + with pytest.raises(ValueError, match="My custom label"): + _prepare_and_validate(q, NS, {"start"}, "My custom label") + + +class TestChooseDirection: + """Unit tests for _choose_direction.""" + + @pytest.mark.parametrize( + "start, end, expected", + [ + pytest.param( + EX.Alice, None, TraversalDirection.FORWARD, id="id_start-none_end" + ), + pytest.param( + None, EX.Alice, TraversalDirection.REVERSE, id="none_start-id_end" + ), + pytest.param( + EX.Alice, EX.Bob, TraversalDirection.FORWARD, id="id_start-id_end-tie" + ), + pytest.param( + None, None, TraversalDirection.FORWARD, id="none_start-none_end-tie" + ), + pytest.param( + "?start ex:knows ex:Bob", + EX.Dave, + TraversalDirection.REVERSE, + id="pattern_start-id_end", + ), + pytest.param( + EX.Alice, + "?end ex:role ex:Manager", + TraversalDirection.FORWARD, + id="id_start-pattern_end", + ), + pytest.param( + [EX.Alice, EX.Bob], + EX.Dave, + TraversalDirection.REVERSE, + id="iter_start-id_end", + ), + pytest.param( + EX.Alice, + [EX.Bob, EX.Carol], + TraversalDirection.FORWARD, + id="id_start-iter_end", + ), + pytest.param( + "?start ex:dept ex:Eng", + [EX.B, EX.C], + TraversalDirection.REVERSE, + id="pattern_start-iter_end", + ), + pytest.param( + [EX.A, EX.B], + "?end ex:role ex:Manager", + TraversalDirection.FORWARD, + id="iter_start-pattern_end", + ), + pytest.param( + None, + "?end ex:role ex:Manager", + TraversalDirection.REVERSE, + id="none_start-pattern_end", + ), + pytest.param( + "?start ex:knows ex:Bob", + None, + TraversalDirection.FORWARD, + id="pattern_start-none_end", + ), + pytest.param( + None, [EX.A], TraversalDirection.REVERSE, id="none_start-iter_end" + ), + pytest.param( + [EX.A], None, TraversalDirection.FORWARD, id="iter_start-none_end" + ), + ], + ) + def test_direction_selection(self, start, end, expected): + result = _choose_direction(start, end) + assert result == expected + assert isinstance(result, TraversalDirection) + + +class TestResolveOrigins: + """Unit tests for _resolve_origins.""" + + def test_single_identifier(self, linear_graph: Graph): + """Single Identifier returns a singleton set.""" + result = _resolve_origins(EX.Alice, linear_graph, {}, "start") + assert result == {EX.Alice} + + def test_iterable_of_identifiers(self, linear_graph: Graph): + """Iterable of Identifiers is materialized into a set.""" + result = _resolve_origins([EX.Alice, EX.Bob], linear_graph, {}, "start") + assert result == {EX.Alice, EX.Bob} + + def test_generator_consumed(self, linear_graph: Graph): + """Generator is consumed into a set.""" + + def gen(): + yield EX.Alice + yield EX.Bob + + result = _resolve_origins(gen(), linear_graph, {}, "start") + assert result == {EX.Alice, EX.Bob} + + def test_empty_iterable(self, linear_graph: Graph): + """Empty iterable returns empty set.""" + result = _resolve_origins([], linear_graph, {}, "start") + assert result == set() + + def test_none_returns_all_nodes(self): + """None returns all subjects and objects in the graph.""" + g = Graph() + g.bind("ex", EX) + g.add((EX.A, EX.knows, EX.B)) + g.add((EX.B, EX.knows, EX.C)) + result = _resolve_origins(None, g, {}, "start") + assert EX.A in result + assert EX.B in result + assert EX.C in result + # EX.knows is a predicate, not a subject/object node + assert EX.knows not in result + + def test_sparql_pattern(self, linear_graph: Graph): + """SPARQL pattern string compiles and executes a SELECT query.""" + ns_map = _build_namespace_map(linear_graph, NS) + pattern = "?start ex:knows ex:Bob" + result = _resolve_origins(pattern, linear_graph, ns_map, "start") + assert EX.Alice in result + + def test_sparql_pattern_no_matches(self): + """SPARQL pattern that matches nothing returns empty set.""" + g = Graph() + g.bind("ex", EX) + g.add((EX.A, EX.knows, EX.B)) + ns_map = _build_namespace_map(g, NS) + pattern = "?start ex:friendOf ex:B" + result = _resolve_origins(pattern, g, ns_map, "start") + assert result == set() + + def test_set_of_identifiers(self, linear_graph: Graph): + """Set of Identifiers is returned as-is (materialized).""" + result = _resolve_origins({EX.Alice, EX.Carol}, linear_graph, {}, "start") + assert result == {EX.Alice, EX.Carol} + + def test_sparql_pattern_with_end_var(self): + """SPARQL pattern with origin_var='end' (reverse direction) compiles correctly.""" + g = Graph() + g.bind("ex", EX) + g.add((EX.Alice, EX.knows, EX.Bob)) + g.add((EX.Carol, EX.knows, EX.Bob)) + ns_map = _build_namespace_map(g, NS) + pattern = "?end ex:knows ex:Bob" + result = _resolve_origins(pattern, g, ns_map, "end") + assert EX.Alice in result + assert EX.Carol in result + + def test_sparql_pattern_missing_variable_returns_empty(self): + """SPARQL pattern not binding the origin variable returns empty set. + + When the user's pattern uses ?x instead of ?start, the compiled + SELECT DISTINCT ?start query returns rows where ?start is unbound, + which are filtered out, yielding an empty set. + """ + g = Graph() + g.bind("ex", EX) + g.add((EX.A, EX.knows, EX.B)) + ns_map = _build_namespace_map(g, NS) + result = _resolve_origins("?x ex:knows ex:B", g, ns_map, "start") + assert result == set() + + +class TestBuildExpandFn: + """Unit tests for _build_expand_fn.""" + + @pytest.fixture() + def simple_graph(self) -> Graph: + g = Graph() + g.bind("ex", EX) + g.add((EX.A, EX.knows, EX.B)) + g.add((EX.A, EX.likes, EX.C)) + g.add((EX.B, EX.knows, EX.C)) + return g + + # --- Unbound path (path=None) --- + + def test_unbound_forward_yields_all_predicates(self, simple_graph: Graph): + """path=None, forward: yields all (object, step) with edge set.""" + expand = _build_expand_fn(None, simple_graph, {}, forward=True) + results = list(expand(EX.A)) + neighbors = {node for node, _ in results} + assert EX.B in neighbors + assert EX.C in neighbors + # Each step should have an edge + for _, step in results: + assert step.edge is not None + assert step.length == 1 + + def test_unbound_reverse_yields_subjects(self, simple_graph: Graph): + """path=None, reverse: yields subjects that point to the given node.""" + expand = _build_expand_fn(None, simple_graph, {}, forward=False) + results = list(expand(EX.C)) + neighbors = {node for node, _ in results} + assert EX.A in neighbors # A -likes-> C + assert EX.B in neighbors # B -knows-> C + for _, step in results: + assert step.edge is not None + + def test_unbound_no_neighbors(self, simple_graph: Graph): + """path=None on a node with no outgoing edges yields nothing.""" + expand = _build_expand_fn(None, simple_graph, {}, forward=True) + results = list(expand(EX.C)) + assert results == [] + + # --- URIRef path --- + + def test_uriref_forward(self, simple_graph: Graph): + """URIRef path, forward: yields only objects matching that predicate.""" + expand = _build_expand_fn(EX.knows, simple_graph, {}, forward=True) + results = list(expand(EX.A)) + neighbors = {node for node, _ in results} + assert neighbors == {EX.B} + for _, step in results: + assert step.edge is None # URIRef path doesn't set edge + assert step.length == 1 + + def test_uriref_reverse(self, simple_graph: Graph): + """URIRef path, reverse: yields subjects with that predicate to the node.""" + expand = _build_expand_fn(EX.knows, simple_graph, {}, forward=False) + results = list(expand(EX.B)) + neighbors = {node for node, _ in results} + assert neighbors == {EX.A} + + def test_uriref_no_match(self, simple_graph: Graph): + """URIRef path with no matching triples yields nothing.""" + expand = _build_expand_fn(EX.friendOf, simple_graph, {}, forward=True) + results = list(expand(EX.A)) + assert results == [] + + # --- Property path --- + + def test_property_path_forward(self): + """rdflib Path object, forward: applies the full path as one step.""" + g = Graph() + g.bind("ex", EX) + g.add((EX.A, EX.knows, EX.B)) + g.add((EX.B, EX.likes, EX.C)) + expand = _build_expand_fn(EX.knows / EX.likes, g, {}, forward=True) + results = list(expand(EX.A)) + neighbors = {node for node, _ in results} + assert EX.C in neighbors + + def test_property_path_reverse(self): + """rdflib Path object, reverse: applies the full path in reverse.""" + g = Graph() + g.bind("ex", EX) + g.add((EX.A, EX.knows, EX.B)) + g.add((EX.B, EX.likes, EX.C)) + expand = _build_expand_fn(EX.knows / EX.likes, g, {}, forward=False) + results = list(expand(EX.C)) + neighbors = {node for node, _ in results} + assert EX.A in neighbors + + # --- SPARQL pattern path --- + + def test_sparql_forward_basic(self, simple_graph: Graph): + """SPARQL pattern, forward: binds ?start and reads ?end.""" + ns_map = _build_namespace_map(simple_graph, NS) + expand = _build_expand_fn( + "?start ex:knows ?end", simple_graph, ns_map, forward=True + ) + results = list(expand(EX.A)) + neighbors = {node for node, _ in results} + assert neighbors == {EX.B} + + def test_sparql_reverse(self, simple_graph: Graph): + """SPARQL pattern, reverse: binds ?end and reads ?start.""" + ns_map = _build_namespace_map(simple_graph, NS) + expand = _build_expand_fn( + "?start ex:knows ?end", simple_graph, ns_map, forward=False + ) + results = list(expand(EX.B)) + neighbors = {node for node, _ in results} + assert neighbors == {EX.A} + + def test_sparql_extra_bindings(self): + """SPARQL pattern with extra variables populates step.bindings.""" + g = Graph() + g.bind("ex", EX) + g.add((EX.A, EX.worksAt, EX.Acme)) + g.add((EX.B, EX.worksAt, EX.Acme)) + g.add((EX.A, EX.knows, EX.B)) + ns_map = _build_namespace_map(g, NS) + pattern = "?start ex:worksAt ?company . ?end ex:worksAt ?company . ?start ex:knows ?end" + expand = _build_expand_fn(pattern, g, ns_map, forward=True) + results = list(expand(EX.A)) + assert len(results) >= 1 + _, step = results[0] + assert step.bindings is not None + assert "company" in step.bindings + assert step.bindings["company"] == EX.Acme + + def test_sparql_with_length_variable(self): + """SPARQL pattern binding ?length uses it as step weight.""" + g = Graph() + g.bind("ex", EX) + g.add((EX.e1, EX.fromNode, EX.A)) + g.add((EX.e1, EX.toNode, EX.B)) + g.add((EX.e1, EX.weight, Literal(5.0))) + ns_map = _build_namespace_map(g, NS) + pattern = ( + "?edge ex:fromNode ?start . ?edge ex:toNode ?end . ?edge ex:weight ?length" + ) + expand = _build_expand_fn(pattern, g, ns_map, forward=True) + results = list(expand(EX.A)) + assert len(results) == 1 + _, step = results[0] + assert step.length == 5.0 + assert step.bindings is None or "length" not in step.bindings + + def test_sparql_with_length_and_extra_bindings(self): + """SPARQL pattern binding ?length excludes it from bindings but keeps others.""" + g = Graph() + g.bind("ex", EX) + g.add((EX.e1, EX.fromNode, EX.A)) + g.add((EX.e1, EX.toNode, EX.B)) + g.add((EX.e1, EX.weight, Literal(5.0))) + g.add((EX.e1, EX.label, Literal("highway"))) + ns_map = _build_namespace_map(g, NS) + pattern = ( + "?edge ex:fromNode ?start . ?edge ex:toNode ?end . " + "?edge ex:weight ?length . ?edge ex:label ?edgeLabel" + ) + expand = _build_expand_fn(pattern, g, ns_map, forward=True) + results = list(expand(EX.A)) + assert len(results) == 1 + _, step = results[0] + assert step.length == 5.0 + assert step.bindings is not None + assert "edgeLabel" in step.bindings + assert "length" not in step.bindings + + def test_sparql_missing_start_variable_raises(self): + """SPARQL pattern missing ?start raises ValueError.""" + g = Graph() + g.bind("ex", EX) + ns_map = _build_namespace_map(g, NS) + with pytest.raises(ValueError, match="start"): + _build_expand_fn("?x ex:knows ?end", g, ns_map, forward=True) + + def test_sparql_missing_end_variable_raises(self): + """SPARQL pattern missing ?end raises ValueError.""" + g = Graph() + g.bind("ex", EX) + ns_map = _build_namespace_map(g, NS) + with pytest.raises(ValueError, match="end"): + _build_expand_fn("?start ex:knows ?x", g, ns_map, forward=True) + + +class TestBuildEndCheckFn: + """Unit tests for _build_end_check_fn.""" + + @pytest.fixture() + def role_graph(self) -> Graph: + g = Graph() + g.bind("ex", EX) + g.add((EX.Alice, EX.role, EX.Manager)) + g.add((EX.Bob, EX.role, EX.Engineer)) + g.add((EX.Carol, EX.role, EX.Manager)) + return g + + # --- None (unbound) --- + + def test_none_matches_everything(self, role_graph: Graph): + """end_spec=None: every node is valid, known_end_nodes is None.""" + check_fn, known = _build_end_check_fn(None, role_graph, {}, "end") + assert known is None + assert check_fn(EX.Alice) is True + assert check_fn(EX.Bob) is True + assert check_fn(EX.Nonexistent) is True + + # --- Single Identifier --- + + def test_single_identifier_matches_only_that_node(self, role_graph: Graph): + """end_spec=Identifier: only that node matches.""" + check_fn, known = _build_end_check_fn(EX.Alice, role_graph, {}, "end") + assert known == frozenset({EX.Alice}) + assert check_fn(EX.Alice) is True + assert check_fn(EX.Bob) is False + + # --- Iterable of Identifiers --- + + def test_iterable_matches_members(self, role_graph: Graph): + """end_spec=iterable: members match, non-members don't.""" + check_fn, known = _build_end_check_fn( + [EX.Alice, EX.Carol], role_graph, {}, "end" + ) + assert known == frozenset({EX.Alice, EX.Carol}) + assert check_fn(EX.Alice) is True + assert check_fn(EX.Carol) is True + assert check_fn(EX.Bob) is False + + def test_empty_iterable_matches_nothing(self, role_graph: Graph): + """end_spec=[]: no node matches.""" + check_fn, known = _build_end_check_fn([], role_graph, {}, "end") + assert known == frozenset() + assert check_fn(EX.Alice) is False + + def test_set_of_identifiers(self, role_graph: Graph): + """end_spec=set: works like iterable.""" + check_fn, known = _build_end_check_fn({EX.Bob}, role_graph, {}, "end") + assert known == frozenset({EX.Bob}) + assert check_fn(EX.Bob) is True + assert check_fn(EX.Alice) is False + + # --- SPARQL pattern --- + + def test_sparql_pattern_matches(self, role_graph: Graph): + """end_spec=str: ASK query validates matching nodes.""" + ns_map = _build_namespace_map(role_graph, NS) + pattern = "?end ex:role ex:Manager" + check_fn, known = _build_end_check_fn(pattern, role_graph, ns_map, "end") + assert known is None # SPARQL pattern → unknown end set + assert check_fn(EX.Alice) is True # Alice is a Manager + assert check_fn(EX.Carol) is True # Carol is a Manager + assert check_fn(EX.Bob) is False # Bob is an Engineer + + def test_sparql_pattern_no_matches(self, role_graph: Graph): + """SPARQL pattern that matches no node returns False for all.""" + ns_map = _build_namespace_map(role_graph, NS) + pattern = "?end ex:role ex:Director" + check_fn, known = _build_end_check_fn(pattern, role_graph, ns_map, "end") + assert known is None + assert check_fn(EX.Alice) is False + assert check_fn(EX.Bob) is False + + # --- known_end_nodes correctness --- + + def test_known_end_nodes_none_for_unbound(self, role_graph: Graph): + """Unbound end → known_end_nodes is None.""" + _, known = _build_end_check_fn(None, role_graph, {}, "end") + assert known is None + + def test_known_end_nodes_frozenset_for_identifier(self, role_graph: Graph): + """Single Identifier → known_end_nodes is frozenset of that node.""" + _, known = _build_end_check_fn(EX.Alice, role_graph, {}, "end") + assert isinstance(known, frozenset) + assert known == frozenset({EX.Alice}) + + def test_known_end_nodes_frozenset_for_iterable(self, role_graph: Graph): + """Iterable → known_end_nodes is frozenset of those nodes.""" + _, known = _build_end_check_fn([EX.Alice, EX.Bob], role_graph, {}, "end") + assert isinstance(known, frozenset) + assert known == frozenset({EX.Alice, EX.Bob}) + + def test_known_end_nodes_none_for_sparql(self, role_graph: Graph): + """SPARQL pattern → known_end_nodes is None.""" + ns_map = _build_namespace_map(role_graph, NS) + pattern = "?end ex:role ex:Manager" + _, known = _build_end_check_fn(pattern, role_graph, ns_map, "end") + assert known is None + + def test_sparql_pattern_with_start_var(self): + """SPARQL pattern with end_var='start' (reverse direction) compiles correctly.""" + g = Graph() + g.bind("ex", EX) + g.add((EX.Alice, EX.dept, EX.Engineering)) + g.add((EX.Bob, EX.dept, EX.Sales)) + ns_map = _build_namespace_map(g, NS) + pattern = "?start ex:dept ex:Engineering" + check_fn, known = _build_end_check_fn(pattern, g, ns_map, "start") + assert known is None + assert check_fn(EX.Alice) is True + assert check_fn(EX.Bob) is False + + def test_sparql_pattern_missing_variable_raises(self): + """SPARQL pattern missing the required variable raises ValueError.""" + g = Graph() + g.bind("ex", EX) + ns_map = _build_namespace_map(g, NS) + with pytest.raises(ValueError, match="end"): + _build_end_check_fn("?x ex:role ex:Manager", g, ns_map, "end") + + +class TestBasicTraversal: + """Fixed start, fixed predicate, fixed end — single and multiple paths.""" + + def test_single_path(self, linear_graph: Graph): + results = find_paths(linear_graph, start=EX.Alice, path=EX.knows, end=EX.Carol) + assert len(results) == 1 + assert results[0].start == EX.Alice + assert results[0].end == EX.Carol + assert len(results[0].steps) == 2 + assert results[0].steps[-1].node == EX.Carol + + def test_multiple_paths(self, linear_graph: Graph): + results = find_paths( + linear_graph, + start=EX.Alice, + path=EX.knows, + end=EX.Dave, + shortest=False, + ) + paths = _path_node_sets(results) + assert ("Alice", "Bob", "Dave") in paths + assert ("Alice", "Bob", "Carol", "Dave") in paths + assert len(results) == 2 + + def test_no_path_exists(self, linear_graph: Graph): + results = find_paths(linear_graph, start=EX.Dave, path=EX.knows, end=EX.Alice) + assert results == [] + + def test_direct_neighbor(self, linear_graph: Graph): + results = find_paths(linear_graph, start=EX.Alice, path=EX.knows, end=EX.Bob) + assert len(results) == 1 + assert len(results[0].steps) == 1 + assert results[0].steps[0].node == EX.Bob + + +class TestShortestMode: + """Verify only minimum-length paths returned; multiple shortest of same length.""" + + def test_shortest_picks_shorter(self, linear_graph: Graph): + results = find_paths( + linear_graph, + start=EX.Alice, + path=EX.knows, + end=EX.Dave, + ) + assert len(results) == 1 + assert len(results[0].steps) == 2 # Alice -> Bob -> Dave + + def test_multiple_shortest_same_length(self, diamond_graph: Graph): + results = find_paths( + diamond_graph, + start=EX.A, + path=EX.knows, + end=EX.D, + ) + assert len(results) == 2 + for r in results: + assert len(r.steps) == 2 + + def test_shortest_zero_length(self, linear_graph: Graph): + """When start == end, zero-length path is the shortest.""" + results = find_paths( + linear_graph, + start=EX.Alice, + path=EX.knows, + end=EX.Alice, + ) + assert len(results) == 1 + assert results[0].steps == [] + + def test_shortest_per_end_node(self): + """shortest=True keeps the shortest path(s) per end node, not globally. + + Graph: A -knows-> B -knows-> C -knows-> D + B -knows-> D (shortcut) + + With end=None and terminate_on_first_match=False: + - Shortest to B is length 1 (A->B) + - Shortest to C is length 2 (A->B->C) + - Shortest to D is length 2 (A->B->D), NOT length 3 (A->B->C->D) + """ + g = Graph() + g.bind("ex", EX) + g.add((EX.A, EX.knows, EX.B)) + g.add((EX.B, EX.knows, EX.C)) + g.add((EX.C, EX.knows, EX.D)) + g.add((EX.B, EX.knows, EX.D)) + + results = find_paths( + g, + start=EX.A, + path=EX.knows, + end=None, + shortest=True, + terminate_on_first_match=False, + max_length=10, + ) + + # Group results by end node + by_end: dict = {} + for r in results: + by_end.setdefault(r.end, []).append(r) + + # A has zero-length path (shortest to self) + assert len(by_end[EX.A][0].steps) == 0 + + # B: shortest is length 1 + assert all(len(r.steps) == 1 for r in by_end[EX.B]) + + # C: shortest is length 2 + assert all(len(r.steps) == 2 for r in by_end[EX.C]) + + # D: shortest is length 2 (via B->D shortcut), NOT length 3 + assert all(len(r.steps) == 2 for r in by_end[EX.D]) + + def test_shortest_does_not_stop_early(self): + """BFS continues past first found path to find shortest to farther nodes. + + Graph: A -knows-> B -knows-> C + With shortest=True, end=None, we should find paths to B AND C, + not just stop after finding B. + """ + g = Graph() + g.bind("ex", EX) + g.add((EX.A, EX.knows, EX.B)) + g.add((EX.B, EX.knows, EX.C)) + + results = find_paths( + g, + start=EX.A, + path=EX.knows, + end=None, + shortest=True, + terminate_on_first_match=False, + max_length=10, + ) + + ends = {r.end for r in results} + assert EX.A in ends # zero-length + assert EX.B in ends # length 1 + assert EX.C in ends # length 2 — BFS didn't stop after finding B + + def test_shortest_multiple_same_length_per_end(self): + """Multiple paths of same shortest length to same end are all kept. + + Diamond: A -> B -> D, A -> C -> D + Both paths to D are length 2; both should be returned. + """ + g = Graph() + g.bind("ex", EX) + g.add((EX.A, EX.knows, EX.B)) + g.add((EX.A, EX.knows, EX.C)) + g.add((EX.B, EX.knows, EX.D)) + g.add((EX.C, EX.knows, EX.D)) + + results = find_paths( + g, + start=EX.A, + path=EX.knows, + end=EX.D, + shortest=True, + ) + + assert len(results) == 2 + paths = _path_node_sets(results) + assert ("A", "B", "D") in paths + assert ("A", "C", "D") in paths + + +class TestUnboundPath: + """Verify edge predicates captured in steps when path=None.""" + + def test_edge_captured(self, linear_graph: Graph): + results = find_paths(linear_graph, start=EX.Alice, path=None, end=EX.Bob) + assert len(results) == 1 + step = results[0].steps[0] + assert step.node == EX.Bob + assert step.edge == EX.knows + assert step.bindings is None + + def test_multi_predicate_edges(self, multi_pred_graph: Graph): + results = find_paths(multi_pred_graph, start=EX.Alice, path=None, end=EX.Dave) + assert len(results) >= 1 + # Path should be Alice -knows-> Bob -worksWith-> Carol -knows-> Dave + r = results[0] + edges = [s.edge for s in r.steps] + assert EX.knows in edges + assert EX.worksWith in edges + + +class TestQueryPath: + """Verify extra bindings captured; multiple bindings produce distinct paths.""" + + def test_extra_bindings_captured(self): + g = Graph() + g.bind("ex", EX) + g.add((EX.Alice, EX.worksAt, EX.Acme)) + g.add((EX.Bob, EX.worksAt, EX.Acme)) + g.add((EX.Alice, EX.knows, EX.Bob)) + + results = find_paths( + g, + start=EX.Alice, + path="?start ex:worksAt ?company . ?end ex:worksAt ?company . ?start ex:knows ?end", + end=EX.Bob, + initNs=NS, + ) + assert len(results) >= 1 + step = results[0].steps[0] + assert step.bindings is not None + assert "company" in step.bindings + assert step.bindings["company"] == EX.Acme + + def test_no_extra_bindings_when_only_start_end(self): + g = Graph() + g.bind("ex", EX) + g.add((EX.Alice, EX.knows, EX.Bob)) + + results = find_paths( + g, + start=EX.Alice, + path="?start ex:knows ?end", + end=EX.Bob, + initNs=NS, + ) + assert len(results) == 1 + step = results[0].steps[0] + # No extra variables beyond ?start and ?end + assert step.bindings is None or step.bindings == {} + + +class TestPatternStart: + """Verify eager evaluation of start pattern to set of start nodes.""" + + def test_pattern_start_selects_correct_nodes(self, manager_graph: Graph): + results = find_paths( + manager_graph, + start="?start ex:department ex:Engineering", + path=EX.knows, + end=EX.Dave, + initNs=NS, + ) + starts = {r.start for r in results} + # Alice and Bob are in Engineering + assert EX.Alice in starts or EX.Bob in starts + # All results should end at Dave + assert all(r.end == EX.Dave for r in results) + + def test_pattern_start_no_matches(self): + g = Graph() + g.bind("ex", EX) + g.add((EX.Alice, EX.knows, EX.Bob)) + results = find_paths( + g, + start="?start ex:department ex:Engineering", + path=EX.knows, + end=EX.Bob, + initNs=NS, + ) + assert results == [] + + +class TestPatternEnd: + """Verify ASK-based filtering and terminate_on_first_match behavior.""" + + def test_pattern_end_filters_correctly(self, manager_graph: Graph): + results = find_paths( + manager_graph, + start=EX.Alice, + path=EX.knows, + end="?end ex:role ex:Manager", + shortest=False, + initNs=NS, + ) + assert len(results) >= 1 + for r in results: + assert r.end in (EX.Dave, EX.Carol) + + def test_terminate_on_first_match_true(self, manager_graph: Graph): + """With terminate_on_first_match=True, each path stops at first Manager.""" + results = find_paths( + manager_graph, + start=EX.Alice, + path=EX.knows, + end="?end ex:role ex:Manager", + shortest=False, + terminate_on_first_match=True, + initNs=NS, + ) + # Should find Carol (via Bob) as the first Manager on that path + # Should NOT continue past Carol to find Dave on the same path + path_ends = {r.end for r in results} + assert EX.Carol in path_ends + + def test_terminate_on_first_match_false(self, manager_graph: Graph): + """With terminate_on_first_match=False, paths continue past Managers.""" + results = find_paths( + manager_graph, + start=EX.Alice, + path=EX.knows, + end="?end ex:role ex:Manager", + shortest=False, + terminate_on_first_match=False, + initNs=NS, + ) + path_ends = {r.end for r in results} + # Should find both Carol and Dave as Managers + assert EX.Carol in path_ends + assert EX.Dave in path_ends + + +class TestReverseDirection: + """Verify results are correct when BFS runs in reverse.""" + + def test_unbound_start_fixed_end(self, linear_graph: Graph): + """Unbound start + fixed end triggers reverse direction.""" + results = find_paths( + linear_graph, + start=None, + path=EX.knows, + end=EX.Dave, + ) + # Shortest paths to Dave: Bob->Dave and Carol->Dave (length 1) + # Plus zero-length Dave->Dave + assert len(results) >= 1 + for r in results: + assert r.end == EX.Dave + + def test_reverse_matches_forward(self, diamond_graph: Graph): + """Forward and reverse should find the same paths.""" + forward = find_paths( + diamond_graph, + start=EX.A, + path=EX.knows, + end=EX.D, + shortest=False, + ) + # Pattern start triggers forward; pattern end with fixed start also forward + # But None start + fixed end triggers reverse + reverse = find_paths( + diamond_graph, + start=None, + path=EX.knows, + end=EX.D, + shortest=False, + max_length=2, + terminate_on_first_match=False, + ) + # Both should find A->B->D and A->C->D among results + fwd_from_a = {tuple(_path_nodes(r)) for r in forward if r.start == EX.A} + rev_from_a = {tuple(_path_nodes(r)) for r in reverse if r.start == EX.A} + assert fwd_from_a == rev_from_a + + def test_pattern_start_fixed_end_reverses(self, manager_graph: Graph): + """str start + Identifier end triggers reverse.""" + results = find_paths( + manager_graph, + start="?start ex:department ex:Engineering", + path=EX.knows, + end=EX.Dave, + initNs=NS, + ) + assert len(results) >= 1 + assert all(r.end == EX.Dave for r in results) + + +class TestCycleDetection: + """Graph with cycles; verify no infinite loops; cyclic paths discarded.""" + + def test_no_infinite_loop(self, cyclic_graph: Graph): + """BFS on a cyclic graph terminates.""" + results = find_paths( + cyclic_graph, + start=EX.A, + path=EX.knows, + end=EX.C, + ) + assert len(results) >= 1 + # A -> B -> C + assert any(len(r.steps) == 2 for r in results) + + def test_cycle_not_revisited(self, cyclic_graph: Graph): + """No path should visit the same node twice.""" + results = find_paths( + cyclic_graph, + start=EX.A, + path=EX.knows, + end=None, + shortest=False, + terminate_on_first_match=False, + max_length=10, + ) + for r in results: + visited = [r.start] + [s.node for s in r.steps] + assert len(visited) == len( + set(visited) + ), f"Duplicate node in path: {visited}" + + +class TestZeroLengthPaths: + """Node is both start and end.""" + + def test_fixed_start_equals_fixed_end(self, linear_graph: Graph): + results = find_paths(linear_graph, start=EX.Alice, path=EX.knows, end=EX.Alice) + assert len(results) == 1 + assert results[0].start == EX.Alice + assert results[0].end == EX.Alice + assert results[0].steps == [] + + def test_zero_length_with_pattern_end(self, manager_graph: Graph): + """Alice is in Engineering; if end pattern matches Alice, zero-length path.""" + results = find_paths( + manager_graph, + start=EX.Alice, + path=EX.knows, + end="?end ex:department ex:Engineering", + initNs=NS, + ) + zero_len = [r for r in results if r.steps == []] + assert len(zero_len) == 1 + assert zero_len[0].start == EX.Alice + assert zero_len[0].end == EX.Alice + + +class TestSelfLoops: + """Triple (A, p, A) does not produce a path beyond zero-length.""" + + def test_self_loop_no_extra_paths(self): + g = Graph() + g.bind("ex", EX) + g.add((EX.A, EX.knows, EX.A)) # self-loop + + results = find_paths(g, start=EX.A, path=EX.knows, end=EX.A) + # Only the zero-length path; the self-loop is a cycle + assert len(results) == 1 + assert results[0].steps == [] + + def test_self_loop_with_other_edges(self): + g = Graph() + g.bind("ex", EX) + g.add((EX.A, EX.knows, EX.A)) # self-loop + g.add((EX.A, EX.knows, EX.B)) + + results = find_paths(g, start=EX.A, path=EX.knows, end=EX.B) + assert len(results) == 1 + assert results[0].steps[0].node == EX.B + + +class TestMaxLength: + """Verify max_length limits cumulative weighted path length.""" + + def test_max_length_zero(self, linear_graph: Graph): + """max_length=0 returns only zero-length paths.""" + results = find_paths( + linear_graph, + start=EX.Alice, + path=EX.knows, + end=EX.Alice, + max_length=0, + ) + assert len(results) == 1 + assert results[0].steps == [] + + def test_max_length_zero_no_match(self, linear_graph: Graph): + """max_length=0 with different start/end returns nothing.""" + results = find_paths( + linear_graph, + start=EX.Alice, + path=EX.knows, + end=EX.Dave, + max_length=0, + ) + assert results == [] + + def test_max_length_limits_path_length(self, linear_graph: Graph): + """max_length=1 should not find Alice->Bob->Dave (cumulative length 2).""" + results = find_paths( + linear_graph, + start=EX.Alice, + path=EX.knows, + end=EX.Dave, + max_length=1, + ) + assert results == [] + + def test_max_length_allows_exact_length(self, linear_graph: Graph): + """max_length=2 should find Alice->Bob->Dave (cumulative length exactly 2).""" + results = find_paths( + linear_graph, + start=EX.Alice, + path=EX.knows, + end=EX.Dave, + max_length=2, + ) + assert len(results) == 1 + assert len(results[0].steps) == 2 + + def test_max_length_negative_raises(self, linear_graph: Graph): + with pytest.raises(ValueError, match="non-negative"): + find_paths( + linear_graph, + start=EX.Alice, + path=EX.knows, + end=EX.Dave, + max_length=-1, + ) + + def test_max_length_with_weighted_paths(self): + """max_length applies to cumulative weighted length, not hop count. + + Graph (reified edges): + e1: A -> B, weight 3 + e2: B -> C, weight 4 + e3: A -> C, weight 10 + + max_length=7 should find A->B->C (weight 3+4=7) but NOT A->C (weight 10). + """ + g = Graph() + g.bind("ex", EX) + g.add((EX.e1, EX.fromNode, EX.A)) + g.add((EX.e1, EX.toNode, EX.B)) + g.add((EX.e1, EX.weight, Literal(3.0))) + g.add((EX.e2, EX.fromNode, EX.B)) + g.add((EX.e2, EX.toNode, EX.C)) + g.add((EX.e2, EX.weight, Literal(4.0))) + g.add((EX.e3, EX.fromNode, EX.A)) + g.add((EX.e3, EX.toNode, EX.C)) + g.add((EX.e3, EX.weight, Literal(10.0))) + + path_pattern = ( + "?edge ex:fromNode ?start . ?edge ex:toNode ?end . ?edge ex:weight ?length" + ) + + results = find_paths( + g, + start=EX.A, + path=path_pattern, + end=EX.C, + shortest=False, + terminate_on_first_match=False, + max_length=7, + initNs=NS, + ) + # Only the 2-hop path (weight 7) should be found; the 1-hop (weight 10) is excluded + assert len(results) == 1 + assert results[0].length == 7.0 + assert len(results[0].steps) == 2 + + def test_max_length_excludes_shorter_hop_heavier_path(self): + """A 1-hop path with weight > max_length is excluded even though it has fewer hops. + + Graph (reified edges): + e1: A -> B, weight 100 + + max_length=50 should NOT find A->B (weight 100 > 50). + """ + g = Graph() + g.bind("ex", EX) + g.add((EX.e1, EX.fromNode, EX.A)) + g.add((EX.e1, EX.toNode, EX.B)) + g.add((EX.e1, EX.weight, Literal(100.0))) + + path_pattern = ( + "?edge ex:fromNode ?start . ?edge ex:toNode ?end . ?edge ex:weight ?length" + ) + + results = find_paths( + g, + start=EX.A, + path=path_pattern, + end=EX.B, + max_length=50, + initNs=NS, + ) + assert results == [] + + def test_max_length_float_value(self): + """max_length accepts float values for weighted path limits. + + Graph (reified edges): + e1: A -> B, weight 2.5 + e2: B -> C, weight 2.5 + + max_length=5.0 should find A->B->C (weight 5.0, exactly at limit). + max_length=4.9 should NOT find A->B->C (weight 5.0 > 4.9). + """ + g = Graph() + g.bind("ex", EX) + g.add((EX.e1, EX.fromNode, EX.A)) + g.add((EX.e1, EX.toNode, EX.B)) + g.add((EX.e1, EX.weight, Literal(2.5))) + g.add((EX.e2, EX.fromNode, EX.B)) + g.add((EX.e2, EX.toNode, EX.C)) + g.add((EX.e2, EX.weight, Literal(2.5))) + + path_pattern = ( + "?edge ex:fromNode ?start . ?edge ex:toNode ?end . ?edge ex:weight ?length" + ) + + # Exactly at limit: should be found + results = find_paths( + g, + start=EX.A, + path=path_pattern, + end=EX.C, + shortest=False, + terminate_on_first_match=False, + max_length=5.0, + initNs=NS, + ) + assert len(results) == 1 + assert results[0].length == 5.0 + + # Just below limit: should NOT be found + results = find_paths( + g, + start=EX.A, + path=path_pattern, + end=EX.C, + shortest=False, + terminate_on_first_match=False, + max_length=4.9, + initNs=NS, + ) + assert results == [] + + def test_max_length_unweighted_equivalent_to_hop_count(self, linear_graph: Graph): + """For unweighted paths, max_length is equivalent to hop count. + + Graph: Alice -> Bob -> Carol -> Dave, plus Bob -> Dave shortcut. + + max_length=3, shortest=False: should find all paths with <= 3 hops. + """ + results = find_paths( + linear_graph, + start=EX.Alice, + path=EX.knows, + end=EX.Dave, + shortest=False, + max_length=3, + ) + # Both Alice->Bob->Dave (2 hops) and Alice->Bob->Carol->Dave (3 hops) + assert len(results) == 2 + lengths = sorted(r.length for r in results) + assert lengths == [2, 3] + + +class TestEmptyGraph: + """Returns [] for empty graph.""" + + def test_empty_graph_returns_empty(self): + g = Graph() + results = find_paths(g, start=EX.Alice, path=EX.knows, end=EX.Bob) + assert results == [] + + def test_empty_graph_zero_length(self): + """Even with start==end, empty graph has no nodes to validate.""" + g = Graph() + results = find_paths(g, start=EX.Alice, path=EX.knows, end=EX.Alice) + # Alice is a fixed Identifier, so zero-length path is still valid + assert len(results) == 1 + assert results[0].steps == [] + + +class TestUnboundStartEnd: + """Produces all acyclic paths (test with small graph).""" + + def test_all_paths_small_graph(self): + g = Graph() + g.bind("ex", EX) + g.add((EX.A, EX.knows, EX.B)) + g.add((EX.B, EX.knows, EX.C)) + + results = find_paths( + g, + start=None, + path=EX.knows, + end=None, + terminate_on_first_match=False, + max_length=5, + ) + # Should include paths from all nodes + starts = {r.start for r in results} + assert EX.A in starts + assert EX.B in starts + + def test_fully_unbound_raises(self): + g = Graph() + with pytest.raises(ValueError, match="(?i)at least one"): + find_paths(g) + + +class TestPropertyPaths: + """Sequence, alternative, inverse paths.""" + + def test_sequence_path(self): + g = Graph() + g.bind("ex", EX) + g.add((EX.Alice, EX.knows, EX.Bob)) + g.add((EX.Bob, EX.friendOf, EX.Carol)) + + results = find_paths( + g, start=EX.Alice, path=EX.knows / EX.friendOf, end=EX.Carol + ) + assert len(results) == 1 + assert results[0].steps[0].node == EX.Carol + assert results[0].steps[0].edge is None # property path, no edge + + def test_alternative_path(self): + g = Graph() + g.bind("ex", EX) + g.add((EX.Alice, EX.knows, EX.Bob)) + g.add((EX.Alice, EX.friendOf, EX.Carol)) + + results = find_paths( + g, + start=EX.Alice, + path=EX.knows | EX.friendOf, + end=None, + shortest=False, + ) + ends = {r.end for r in results} + assert EX.Alice in ends # zero-length + assert EX.Bob in ends + assert EX.Carol in ends + + def test_inverse_path(self): + g = Graph() + g.bind("ex", EX) + g.add((EX.Alice, EX.knows, EX.Bob)) + + results = find_paths(g, start=EX.Bob, path=~EX.knows, end=EX.Alice) + assert len(results) == 1 + assert results[0].steps[0].node == EX.Alice + + +class TestInitNs: + """Verify prefix resolution in pattern strings.""" + + def test_initns_resolves_prefixes(self, linear_graph: Graph): + results = find_paths( + linear_graph, + start="?start ex:knows ex:Bob", + path=EX.knows, + end=EX.Dave, + initNs={"ex": "http://example.org/"}, + ) + assert len(results) >= 1 + + def test_initns_overrides_graph_ns(self): + """initNs takes precedence over graph namespace_manager.""" + g = Graph() + other = Namespace("http://other.org/") + g.bind("ex", other) # graph binds ex to wrong namespace + g.add((EX.Alice, EX.knows, EX.Bob)) + + results = find_paths( + g, + start="?start ex:knows ex:Bob", + path=EX.knows, + end=EX.Bob, + initNs={"ex": "http://example.org/"}, # override + ) + assert len(results) == 1 + + +class TestBlankNodes: + """BNodes as start, end, intermediate.""" + + def test_bnode_as_intermediate(self): + g = Graph() + g.bind("ex", EX) + b = BNode() + g.add((EX.Alice, EX.knows, b)) + g.add((b, EX.knows, EX.Bob)) + + results = find_paths(g, start=EX.Alice, path=EX.knows, end=EX.Bob) + assert len(results) == 1 + assert results[0].steps[0].node == b + assert results[0].steps[1].node == EX.Bob + + def test_bnode_as_start(self): + g = Graph() + g.bind("ex", EX) + b = BNode() + g.add((b, EX.knows, EX.Alice)) + + results = find_paths(g, start=b, path=EX.knows, end=EX.Alice) + assert len(results) == 1 + assert results[0].start == b + + def test_bnode_as_end(self): + g = Graph() + g.bind("ex", EX) + b = BNode() + g.add((EX.Alice, EX.knows, b)) + + results = find_paths(g, start=EX.Alice, path=EX.knows, end=b) + assert len(results) == 1 + assert results[0].end == b + + +class TestErrorHandling: + """Validate error conditions.""" + + def test_start_pattern_missing_variable(self): + g = Graph() + with pytest.raises(ValueError, match="start"): + find_paths( + g, + start="?x ex:knows ex:Bob", + path=EX.knows, + end=EX.Bob, + initNs=NS, + ) + + def test_end_pattern_missing_variable(self): + g = Graph() + with pytest.raises(ValueError, match="end"): + find_paths( + g, + start=EX.Alice, + path=EX.knows, + end="?x ex:role ex:Manager", + initNs=NS, + ) + + def test_path_pattern_missing_start(self): + g = Graph() + with pytest.raises(ValueError, match="start"): + find_paths( + g, + start=EX.Alice, + path="?x ex:knows ?end", + end=EX.Bob, + initNs=NS, + ) + + def test_path_pattern_missing_end(self): + g = Graph() + with pytest.raises(ValueError, match="end"): + find_paths( + g, + start=EX.Alice, + path="?start ex:knows ?x", + end=EX.Bob, + initNs=NS, + ) + + +class TestIterableStartEnd: + """start and end accept an iterable of Identifiers.""" + + def test_list_start(self, linear_graph: Graph): + """Pass a list of start nodes.""" + results = find_paths( + linear_graph, + start=[EX.Alice, EX.Bob], + path=EX.knows, + end=EX.Dave, + shortest=False, + terminate_on_first_match=False, + ) + starts = {r.start for r in results} + assert EX.Alice in starts + assert EX.Bob in starts + assert all(r.end == EX.Dave for r in results) + + def test_set_start(self, linear_graph: Graph): + """Pass a set of start nodes.""" + results = find_paths( + linear_graph, + start={EX.Bob, EX.Carol}, + path=EX.knows, + end=EX.Dave, + ) + starts = {r.start for r in results} + # Both Bob and Carol can reach Dave + assert starts <= {EX.Bob, EX.Carol} + assert len(results) >= 2 + + def test_list_end(self, linear_graph: Graph): + """Pass a list of end nodes.""" + results = find_paths( + linear_graph, + start=EX.Alice, + path=EX.knows, + end=[EX.Bob, EX.Carol], + terminate_on_first_match=False, + ) + ends = {r.end for r in results} + assert EX.Bob in ends + assert EX.Carol in ends + + def test_set_end(self, linear_graph: Graph): + """Pass a set of end nodes.""" + results = find_paths( + linear_graph, + start=EX.Alice, + path=EX.knows, + end={EX.Bob, EX.Dave}, + terminate_on_first_match=False, + ) + ends = {r.end for r in results} + assert EX.Bob in ends + assert EX.Dave in ends + + def test_iterable_start_direction(self): + """Iterable start + fixed end should go forward (iterable < single Identifier).""" + g = Graph() + g.bind("ex", EX) + g.add((EX.A, EX.knows, EX.C)) + g.add((EX.B, EX.knows, EX.C)) + + results = find_paths( + g, + start=[EX.A, EX.B], + path=EX.knows, + end=EX.C, + ) + assert len(results) == 2 + starts = {r.start for r in results} + assert starts == {EX.A, EX.B} + + def test_iterable_end_reverses_over_pattern(self): + """Iterable end is more specific than pattern start, so should reverse.""" + g = Graph() + g.bind("ex", EX) + g.add((EX.A, EX.knows, EX.B)) + g.add((EX.A, EX.knows, EX.C)) + g.add((EX.A, EX.dept, EX.Eng)) + + results = find_paths( + g, + start="?start ex:dept ex:Eng", + path=EX.knows, + end=[EX.B, EX.C], + initNs=NS, + ) + ends = {r.end for r in results} + assert EX.B in ends + assert EX.C in ends + + def test_generator_start(self, linear_graph: Graph): + """Generators are consumed once and work as start.""" + + def gen(): + yield EX.Alice + yield EX.Bob + + results = find_paths( + linear_graph, + start=gen(), + path=EX.knows, + end=EX.Dave, + shortest=False, + terminate_on_first_match=False, + ) + starts = {r.start for r in results} + assert EX.Alice in starts + assert EX.Bob in starts + + def test_empty_iterable_start(self, linear_graph: Graph): + """Empty iterable start returns no results.""" + results = find_paths( + linear_graph, + start=[], + path=EX.knows, + end=EX.Dave, + ) + assert results == [] + + def test_empty_iterable_end(self, linear_graph: Graph): + """Empty iterable end returns no results (no valid end nodes).""" + results = find_paths( + linear_graph, + start=EX.Alice, + path=EX.knows, + end=[], + shortest=False, + terminate_on_first_match=False, + max_length=3, + ) + assert results == [] + + +class TestShortestPruning: + """Verify the pruning optimization when shortest=True and end nodes are known.""" + + def test_pruning_single_end_node(self): + """With a single known end node, paths longer than the shortest are pruned. + + Graph: A -> B -> C -> D -> E + B -> E (shortcut, length 2 from A) + + Shortest path A->E is length 2 (A->B->E). + The path A->B->C->D->E (length 4) should be pruned and not appear + even with shortest=False-like BFS exploration, because the pruning + discards partial paths longer than the known shortest. + """ + g = Graph() + g.bind("ex", EX) + g.add((EX.A, EX.knows, EX.B)) + g.add((EX.B, EX.knows, EX.C)) + g.add((EX.C, EX.knows, EX.D)) + g.add((EX.D, EX.knows, EX.E)) + g.add((EX.B, EX.knows, EX.E)) # shortcut + + results = find_paths( + g, + start=EX.A, + path=EX.knows, + end=EX.E, + shortest=True, + ) + assert len(results) == 1 + assert len(results[0].steps) == 2 # A -> B -> E + + def test_pruning_multiple_end_nodes(self): + """With multiple known end nodes, pruning activates once all are found. + + Graph: A -> B -> C -> D + B -> D (shortcut) + + end=[C, D]: + - Shortest to C is length 2 (A->B->C) + - Shortest to D is length 2 (A->B->D) + - max shortest = 2 + - Partial paths of length > 2 are pruned, so A->B->C->D (length 3) + is never explored. + """ + g = Graph() + g.bind("ex", EX) + g.add((EX.A, EX.knows, EX.B)) + g.add((EX.B, EX.knows, EX.C)) + g.add((EX.C, EX.knows, EX.D)) + g.add((EX.B, EX.knows, EX.D)) # shortcut + + results = find_paths( + g, + start=EX.A, + path=EX.knows, + end=[EX.C, EX.D], + shortest=True, + terminate_on_first_match=False, + ) + # Should find shortest to C (length 2) and shortest to D (length 2) + by_end: dict = {} + for r in results: + by_end.setdefault(r.end, []).append(r) + + assert all(len(r.steps) == 2 for r in by_end[EX.C]) + assert all(len(r.steps) == 2 for r in by_end[EX.D]) + + def test_pruning_preserves_ties(self): + """Pruning must not discard paths of equal length to the same end. + + Diamond: A -> B -> D, A -> C -> D + Both paths to D are length 2; both must be returned. + """ + g = Graph() + g.bind("ex", EX) + g.add((EX.A, EX.knows, EX.B)) + g.add((EX.A, EX.knows, EX.C)) + g.add((EX.B, EX.knows, EX.D)) + g.add((EX.C, EX.knows, EX.D)) + + results = find_paths( + g, + start=EX.A, + path=EX.knows, + end=EX.D, + shortest=True, + ) + assert len(results) == 2 + paths = _path_node_sets(results) + assert ("A", "B", "D") in paths + assert ("A", "C", "D") in paths + + def test_pruning_with_set_end(self): + """Pruning works when end is a set of Identifiers. + + Graph: A -> B -> C -> D -> E + D -> F + + end={E, F}: + - Shortest to E is length 4 (A->B->C->D->E) + - Shortest to F is length 4 (A->B->C->D->F) + - max shortest = 4 + - Partial paths of length > 4 are pruned. + """ + g = Graph() + g.bind("ex", EX) + g.add((EX.A, EX.knows, EX.B)) + g.add((EX.B, EX.knows, EX.C)) + g.add((EX.C, EX.knows, EX.D)) + g.add((EX.D, EX.knows, EX.E)) + g.add((EX.D, EX.knows, EX.F)) + + results = find_paths( + g, + start=EX.A, + path=EX.knows, + end={EX.E, EX.F}, + shortest=True, + terminate_on_first_match=False, + ) + ends = {r.end for r in results} + assert EX.E in ends + assert EX.F in ends + assert all(len(r.steps) == 4 for r in results) + + def test_pruning_not_active_for_pattern_end(self): + """Pruning is NOT active when end is a SPARQL pattern (unknown set). + + This test verifies correctness — the optimization should be skipped + for SPARQL pattern ends since the full set of end nodes is not known. + """ + g = Graph() + g.bind("ex", EX) + g.add((EX.A, EX.knows, EX.B)) + g.add((EX.B, EX.knows, EX.C)) + g.add((EX.B, EX.role, EX.Manager)) + g.add((EX.C, EX.role, EX.Manager)) + + results = find_paths( + g, + start=EX.A, + path=EX.knows, + end="?end ex:role ex:Manager", + shortest=True, + terminate_on_first_match=False, + initNs=NS, + ) + ends = {r.end for r in results} + # Both B and C are Managers; shortest to B is 1, shortest to C is 2 + assert EX.B in ends + assert EX.C in ends + + def test_pruning_not_active_for_unbound_end(self): + """Pruning is NOT active when end is None (unbound). + + All reachable nodes should still be found. + """ + g = Graph() + g.bind("ex", EX) + g.add((EX.A, EX.knows, EX.B)) + g.add((EX.B, EX.knows, EX.C)) + + results = find_paths( + g, + start=EX.A, + path=EX.knows, + end=None, + shortest=True, + terminate_on_first_match=False, + max_length=5, + ) + ends = {r.end for r in results} + assert EX.A in ends # zero-length + assert EX.B in ends + assert EX.C in ends + + def test_pruning_with_asymmetric_shortest_lengths(self): + """End nodes at different depths; pruning uses the max shortest length. + + Graph: A -> B -> C -> D + C -> E + + end=[B, E]: + - Shortest to B is length 1 (A->B) + - Shortest to E is length 3 (A->B->C->E) + - max shortest = 3 + - Partial paths of length > 3 are pruned. + - But paths of length <= 3 are NOT pruned, so A->B->C->D (length 3) + is still explored (though D is not a valid end, so no result). + """ + g = Graph() + g.bind("ex", EX) + g.add((EX.A, EX.knows, EX.B)) + g.add((EX.B, EX.knows, EX.C)) + g.add((EX.C, EX.knows, EX.D)) + g.add((EX.C, EX.knows, EX.E)) + + results = find_paths( + g, + start=EX.A, + path=EX.knows, + end=[EX.B, EX.E], + shortest=True, + terminate_on_first_match=False, + ) + by_end: dict = {} + for r in results: + by_end.setdefault(r.end, []).append(r) + + assert len(by_end[EX.B]) == 1 + assert len(by_end[EX.B][0].steps) == 1 + + assert len(by_end[EX.E]) == 1 + assert len(by_end[EX.E][0].steps) == 3 + + def test_pruning_with_zero_length_end(self): + """When start is also a known end node, zero-length path is found. + + Graph: A -> B -> C + end=[A, C]: + - Shortest to A is 0 (zero-length) + - Shortest to C is 2 + - max shortest = 2 + - Pruning activates after both are found. + """ + g = Graph() + g.bind("ex", EX) + g.add((EX.A, EX.knows, EX.B)) + g.add((EX.B, EX.knows, EX.C)) + + results = find_paths( + g, + start=EX.A, + path=EX.knows, + end=[EX.A, EX.C], + shortest=True, + terminate_on_first_match=False, + ) + by_end: dict = {} + for r in results: + by_end.setdefault(r.end, []).append(r) + + assert len(by_end[EX.A]) == 1 + assert by_end[EX.A][0].steps == [] # zero-length + + assert len(by_end[EX.C]) == 1 + assert len(by_end[EX.C][0].steps) == 2 + + +class TestWeightedLength: + """Verify PathStep.length, PathResult.length, and shortest-path logic + when a SPARQL path pattern binds ?length.""" + + def test_step_length_default_is_1_for_uriref(self): + """Non-SPARQL paths always produce steps with length=1.""" + g = Graph() + g.bind("ex", EX) + g.add((EX.A, EX.knows, EX.B)) + + results = find_paths(g, start=EX.A, path=EX.knows, end=EX.B) + assert len(results) == 1 + assert results[0].steps[0].length == 1 + assert results[0].length == 1 + + def test_step_length_default_is_1_for_unbound(self): + """Unbound path (path=None) always produces steps with length=1.""" + g = Graph() + g.bind("ex", EX) + g.add((EX.A, EX.knows, EX.B)) + + results = find_paths(g, start=EX.A, path=None, end=EX.B) + assert len(results) == 1 + assert results[0].steps[0].length == 1 + assert results[0].steps[0].edge == EX.knows + assert results[0].length == 1 + + def test_step_length_default_is_1_for_sparql_without_length_var(self): + """SPARQL path without ?length produces steps with length=1.""" + g = Graph() + g.bind("ex", EX) + g.add((EX.A, EX.knows, EX.B)) + + results = find_paths( + g, + start=EX.A, + path="?start ex:knows ?end", + end=EX.B, + initNs=NS, + ) + assert len(results) == 1 + assert results[0].steps[0].length == 1 + assert results[0].length == 1 + + def test_step_length_from_length_variable(self): + """SPARQL path binding ?length uses that value as step length.""" + g = Graph() + g.bind("ex", EX) + g.add((EX.A, EX.connectsTo, EX.B)) + g.add((EX.A, EX.distance, Literal(5.0))) + + results = find_paths( + g, + start=EX.A, + path="?start ex:connectsTo ?end . ?start ex:distance ?length", + end=EX.B, + initNs=NS, + ) + assert len(results) == 1 + assert results[0].steps[0].length == 5.0 + assert results[0].length == 5.0 + # ?length should NOT appear in bindings + step = results[0].steps[0] + assert step.bindings is None or "length" not in step.bindings + + def test_cumulative_length_across_steps(self): + """PathResult.length is the sum of step lengths.""" + g = Graph() + g.bind("ex", EX) + g.add((EX.A, EX.connectsTo, EX.B)) + g.add((EX.A, EX.distance, Literal(3.0))) + g.add((EX.B, EX.connectsTo, EX.C)) + g.add((EX.B, EX.distance, Literal(7.0))) + + results = find_paths( + g, + start=EX.A, + path="?start ex:connectsTo ?end . ?start ex:distance ?length", + end=EX.C, + shortest=False, + initNs=NS, + ) + assert len(results) == 1 + assert results[0].steps[0].length == 3.0 + assert results[0].steps[1].length == 7.0 + assert results[0].length == 10.0 + + def test_shortest_uses_weighted_length(self): + """shortest=True picks the path with lower cumulative weighted length, + even if it has more hops. + + Graph: + A -connectsTo-> B (distance 10) + A -connectsTo-> C (distance 1) + C -connectsTo-> B (distance 2) + + Two paths A->B: + Direct: A->B, length=10 (1 hop) + Via C: A->C->B, length=1+2=3 (2 hops) + + shortest=True should pick the 2-hop path (length 3). + """ + g = Graph() + g.bind("ex", EX) + g.add((EX.A, EX.connectsTo, EX.B)) + g.add((EX.A, EX.distance, Literal(10.0))) + g.add((EX.A, EX.connectsTo, EX.C)) + # A's distance is already 10 for A->B, but we need per-edge weights. + # Use a reified-edge pattern instead: + # Edge triples: (edge, from, A), (edge, to, B), (edge, weight, 10) + g2 = Graph() + g2.bind("ex", EX) + # Edge e1: A -> B, weight 10 + g2.add((EX.e1, EX.fromNode, EX.A)) + g2.add((EX.e1, EX.toNode, EX.B)) + g2.add((EX.e1, EX.weight, Literal(10.0))) + # Edge e2: A -> C, weight 1 + g2.add((EX.e2, EX.fromNode, EX.A)) + g2.add((EX.e2, EX.toNode, EX.C)) + g2.add((EX.e2, EX.weight, Literal(1.0))) + # Edge e3: C -> B, weight 2 + g2.add((EX.e3, EX.fromNode, EX.C)) + g2.add((EX.e3, EX.toNode, EX.B)) + g2.add((EX.e3, EX.weight, Literal(2.0))) + + path_pattern = ( + "?edge ex:fromNode ?start . ?edge ex:toNode ?end . ?edge ex:weight ?length" + ) + + results = find_paths( + g2, + start=EX.A, + path=path_pattern, + end=EX.B, + shortest=True, + terminate_on_first_match=False, + initNs=NS, + ) + # Should pick A->C->B (length 3) over A->B (length 10) + assert len(results) == 1 + assert results[0].length == 3.0 + assert len(results[0].steps) == 2 # 2 hops + + def test_shortest_all_paths_with_weights(self): + """shortest=False returns all paths; each has correct weighted length.""" + g = Graph() + g.bind("ex", EX) + # Edge e1: A -> B, weight 10 + g.add((EX.e1, EX.fromNode, EX.A)) + g.add((EX.e1, EX.toNode, EX.B)) + g.add((EX.e1, EX.weight, Literal(10.0))) + # Edge e2: A -> C, weight 1 + g.add((EX.e2, EX.fromNode, EX.A)) + g.add((EX.e2, EX.toNode, EX.C)) + g.add((EX.e2, EX.weight, Literal(1.0))) + # Edge e3: C -> B, weight 2 + g.add((EX.e3, EX.fromNode, EX.C)) + g.add((EX.e3, EX.toNode, EX.B)) + g.add((EX.e3, EX.weight, Literal(2.0))) + + path_pattern = ( + "?edge ex:fromNode ?start . ?edge ex:toNode ?end . ?edge ex:weight ?length" + ) + + results = find_paths( + g, + start=EX.A, + path=path_pattern, + end=EX.B, + shortest=False, + terminate_on_first_match=False, + initNs=NS, + ) + lengths = sorted(r.length for r in results) + assert 3.0 in lengths # A->C->B + assert 10.0 in lengths # A->B + + def test_zero_length_path_has_length_zero(self): + """Zero-length path (start==end) has length=0.""" + g = Graph() + g.bind("ex", EX) + g.add((EX.A, EX.knows, EX.B)) + + results = find_paths(g, start=EX.A, path=EX.knows, end=EX.A) + assert len(results) == 1 + assert results[0].length == 0 + assert results[0].steps == [] + + def test_length_not_in_bindings(self): + """?length is consumed by the length field and excluded from bindings.""" + g = Graph() + g.bind("ex", EX) + g.add((EX.e1, EX.fromNode, EX.A)) + g.add((EX.e1, EX.toNode, EX.B)) + g.add((EX.e1, EX.weight, Literal(5.0))) + g.add((EX.e1, EX.label, Literal("highway"))) + + path_pattern = ( + "?edge ex:fromNode ?start . " + "?edge ex:toNode ?end . " + "?edge ex:weight ?length . " + "?edge ex:label ?edgeLabel" + ) + + results = find_paths( + g, + start=EX.A, + path=path_pattern, + end=EX.B, + initNs=NS, + ) + assert len(results) == 1 + step = results[0].steps[0] + assert step.length == 5.0 + assert step.bindings is not None + assert "edgeLabel" in step.bindings + assert "length" not in step.bindings + + def test_path_result_length_matches_step_sum(self): + """PathResult.length always equals sum of step lengths.""" + g = Graph() + g.bind("ex", EX) + g.add((EX.A, EX.knows, EX.B)) + g.add((EX.B, EX.knows, EX.C)) + g.add((EX.C, EX.knows, EX.D)) + + results = find_paths( + g, + start=EX.A, + path=EX.knows, + end=EX.D, + shortest=False, + ) + for r in results: + assert r.length == sum(s.length for s in r.steps) + + def test_non_numeric_length_raises_type_error(self): + """?length bound to a non-numeric value raises TypeError.""" + g = Graph() + g.bind("ex", EX) + g.add((EX.e1, EX.fromNode, EX.A)) + g.add((EX.e1, EX.toNode, EX.B)) + g.add((EX.e1, EX.weight, Literal("not-a-number"))) + + path_pattern = ( + "?edge ex:fromNode ?start . ?edge ex:toNode ?end . ?edge ex:weight ?length" + ) + + with pytest.raises(TypeError, match=r"\?length must be numeric"): + find_paths( + g, + start=EX.A, + path=path_pattern, + end=EX.B, + initNs=NS, + ) + + def test_negative_length_raises_value_error(self): + """?length bound to a negative value raises ValueError.""" + g = Graph() + g.bind("ex", EX) + g.add((EX.e1, EX.fromNode, EX.A)) + g.add((EX.e1, EX.toNode, EX.B)) + g.add((EX.e1, EX.weight, Literal(-5.0))) + + path_pattern = ( + "?edge ex:fromNode ?start . ?edge ex:toNode ?end . ?edge ex:weight ?length" + ) + + with pytest.raises(ValueError, match=r"\?length must be non-negative"): + find_paths( + g, + start=EX.A, + path=path_pattern, + end=EX.B, + initNs=NS, + )