Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions dotmotif/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@
from networkx.algorithms import isomorphism

from .parsers.v2 import ParserV2
from .validators import DisagreeingEdgesValidator, Validator
from .validators import (
DisagreeingEdgesValidator,
ImpossibleConstraintValidator,
Validator,
)

from .executors.NetworkXExecutor import NetworkXExecutor
from .executors.GrandIsoExecutor import GrandIsoExecutor
Expand Down Expand Up @@ -75,7 +79,8 @@ def __init__(self, input_motif: Optional[str] = None, **kwargs):
self.parser = kwargs.get("parser", DEFAULT_MOTIF_PARSER)
self.exclude_automorphisms = kwargs.get("exclude_automorphisms", False)
self.validators: List[Validator] = kwargs.get(
"validators", [DisagreeingEdgesValidator()]
"validators",
[DisagreeingEdgesValidator(), ImpossibleConstraintValidator()],
)
self._g = nx.MultiDiGraph()

Expand Down Expand Up @@ -117,6 +122,11 @@ def from_motif(self, cmd: str):

self._propagate_automorphic_constraints()

# Post-parse validation hooks (e.g., constraint collisions from automorphisms)
for val in self.validators:
if hasattr(val, "validate_motif"):
val.validate_motif(self)

return self

def from_nx(self, graph: nx.DiGraph) -> "Motif":
Expand Down
113 changes: 113 additions & 0 deletions dotmotif/tests/test_dm_flags.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import unittest
import pytest
import dotmotif
import networkx as nx
from dotmotif.executors.Neo4jExecutor import Neo4jExecutor
Expand Down Expand Up @@ -120,3 +121,115 @@ def test_constraints_propagate_multi_auto(self):
)
assert len(m.list_automorphisms()) == 2
assert len(m.list_node_constraints()) == 3

def test_conflicting_constraints_raise(self):
with pytest.raises(ValueError):
dotmotif.Motif(
"""
A -> B
B -> A
A.radius = 5
A.radius = 6
"""
)

def test_conflicting_constraints_on_automorphisms_raise(self):
with pytest.raises(ValueError):
dotmotif.Motif(
"""
A -> B
B -> A
A === B
A.radius = 5
B.radius = 6
"""
)

def test_conflicting_equality_and_inequality_from_automorphism(self):
with pytest.raises(ValueError):
dotmotif.Motif(
"""
A -> B
B -> A
A === B
A.type = 4
B.type != 4
"""
)

def test_gtlt_raise(self):
with pytest.raises(ValueError):
dotmotif.Motif(
"""
A -> B
B -> A
A.size > 5
A.size < 3
"""
)

def test_gtlt_meta_raise(self):
with pytest.raises(ValueError):
dotmotif.Motif(
"""
A -> B
B -> A
A.size > B.size
A.size < B.size
"""
)

def test_ge_le_same_value_ok(self):
# Non-strict bounds that meet at a point are allowed
dotmotif.Motif(
"""
A -> B
B -> A
A.size >= 5
A.size <= 5
"""
)

def test_gt_le_strict_conflict(self):
with pytest.raises(ValueError):
dotmotif.Motif(
"""
A -> B
B -> A
A.size > 5
A.size <= 5
"""
)

def test_gt_eq_conflict(self):
with pytest.raises(ValueError):
dotmotif.Motif(
"""
A -> B
B -> A
A.size > 5
A.size = 5
"""
)

def test_dynamic_ge_le_same_target_ok(self):
# Cross-node non-strict sandwich on same target is allowed
dotmotif.Motif(
"""
A -> B
B -> A
A.size >= B.size
A.size <= B.size
"""
)

def test_dynamic_gt_le_conflict(self):
with pytest.raises(ValueError):
dotmotif.Motif(
"""
A -> B
B -> A
A.size > B.size
A.size <= B.size
"""
)
196 changes: 164 additions & 32 deletions dotmotif/validators/__init__.py
Original file line number Diff line number Diff line change
@@ -1,50 +1,182 @@
#!/usr/bin/env python3
import abc

import networkx as nx
from typing import Optional


class Validator(abc.ABC):
@abc.abstractmethod
def validate(self) -> bool:
...


class NeuronConnectomeValidator(Validator):
class NeuronConnectomeValidatorError(Exception):
pass
"""Base class for motif validators."""

def __init__(self, g: nx.Graph) -> None:
self.g = g
def validate(self, *args, **kwargs) -> bool: # pragma: no cover - interface
raise NotImplementedError

def validate(self) -> bool:
"""
Return True if valid.

Raises NeuronConnectomeValidatorError if invalid.
"""
def validate_motif(self, motif) -> bool:
"""Optional motif-level validation hook."""
return True


class DisagreeingEdgesValidatorError(Exception):
class DisagreeingEdgesValidatorError(ValueError):
pass


class DisagreeingEdgesValidator(Validator):
def __init__(self) -> None:
"""Reject edges that disagree on existence for the same endpoints."""

def validate(
self,
g,
u,
v,
type: Optional[str] = None,
exists: bool = True,
) -> bool:
# If an edge already exists in the opposite existence state, reject it.
if g.has_edge(u, v):
for _, edge in g.get_edge_data(u, v).items():
if edge.get("exists", True) != exists:
raise DisagreeingEdgesValidatorError(
f"Edge existence conflict between {u}->{v}"
)
return True


class ImpossibleConstraintValidator(Validator):
class ConstraintCollisionError(ValueError):
pass

def validate(self, g: nx.Graph, u, v, type=None, exists: bool = True) -> bool:
"""
Return True if valid.

Raises DisagreeingEdgesValidatorError if invalid.
"""
edges = g.edges(u, data=True)
for u_, v_, attrs in edges:
if u_ == u and v_ == v and exists != attrs["exists"]:
raise DisagreeingEdgesValidatorError(
f"Trying to add <{u}-{v} exists={exists}> but "
f"<{u_}-{v_} exists={attrs['exists']}> already in motif."
def validate(self, *args, **kwargs) -> bool:
"""Edge-level hook (no-op) to satisfy Validator interface."""
return True

def validate_motif(self, motif) -> bool:
"""Ensure node and edge constraints remain consistent."""

eq_ops = {"=", "=="}
neq_ops = {"!=", "<>"}

def _flatten(vals):
flat = []
for v in vals:
if isinstance(v, str):
flat.append(v)
elif isinstance(v, (list, tuple, set)):
flat.extend(v)
else:
flat.append(v)
return flat

def _check_attr(ctx: str, attr: str, ops: dict):
eq_values = []
neq_values = []
lower_bounds = [] # (value, strict)
upper_bounds = [] # (value, strict)

def op_repr(val, strict: bool, lower: bool) -> str:
return (
(">" if strict else ">=") if lower else ("<" if strict else "<=")
) + str(val)

# contains / !contains conflicts
contains_vals = set(_flatten(ops.get("contains", [])))
not_contains_vals = set(_flatten(ops.get("!contains", [])))
if contains_vals & not_contains_vals:
raise self.ConstraintCollisionError(
f"Conflicting contains/!contains on {ctx}.{attr}: {contains_vals & not_contains_vals}"
)

# in / !in conflicts
in_vals = set(_flatten(ops.get("in", [])))
not_in_vals = set(_flatten(ops.get("!in", [])))
if in_vals & not_in_vals:
raise self.ConstraintCollisionError(
f"Conflicting in/!in on {ctx}.{attr}: {in_vals & not_in_vals}"
)

for op, values in ops.items():
if op in eq_ops:
eq_values.extend(values)
elif op in neq_ops:
neq_values.extend(values)
elif op in {">", ">=", "<", "<="}:
if op in {">", ">="}:
lower_bounds.extend([(v, op == ">") for v in values])
else:
upper_bounds.extend([(v, op == "<") for v in values])

# Multiple distinct equality values collide
if len(set(eq_values)) > 1:
raise self.ConstraintCollisionError(
f"Conflicting equality constraints on {ctx}.{attr}: {set(eq_values)}"
)

# Equality vs inequality of the same value collides
if eq_values and any(v in eq_values for v in neq_values):
raise self.ConstraintCollisionError(
f"Conflicting equality/inequality constraints on {ctx}.{attr}: eq={eq_values}, neq={neq_values}"
)

# Range consistency: combine lower/upper bounds and equality
if lower_bounds:
lower_val, lower_strict = max(lower_bounds, key=lambda t: t[0])
else:
lower_val, lower_strict = None, False

if upper_bounds:
upper_val, upper_strict = min(upper_bounds, key=lambda t: t[0])
else:
upper_val, upper_strict = None, False

if eq_values:
eq_val = eq_values[0]
# Equality must satisfy bounds
if lower_val is not None:
if eq_val < lower_val or (eq_val == lower_val and lower_strict):
raise self.ConstraintCollisionError(
f"Equality {ctx}.{attr}={eq_val} violates lower bound {op_repr(lower_val, lower_strict, lower=True)}"
)
if upper_val is not None:
if eq_val > upper_val or (eq_val == upper_val and upper_strict):
raise self.ConstraintCollisionError(
f"Equality {ctx}.{attr}={eq_val} violates upper bound {op_repr(upper_val, upper_strict, lower=False)}"
)
else:
# Pure range: ensure non-empty interval
if lower_val is not None and upper_val is not None:
if lower_val > upper_val or (
lower_val == upper_val and (lower_strict or upper_strict)
):
raise self.ConstraintCollisionError(
f"Impossible bounds on {ctx}.{attr}: {op_repr(lower_val, lower_strict, lower=True)} and {op_repr(upper_val, upper_strict, lower=False)}"
)

# Static node constraints
for node, constraints in motif.list_node_constraints().items():
for attr, ops in constraints.items():
_check_attr(node, attr, ops)

# Static edge constraints
for (u, v), constraints in motif.list_edge_constraints().items():
for attr, ops in constraints.items():
_check_attr(f"{u}->{v}", attr, ops)

# Dynamic node constraints (cross-node attribute comparisons)
for node, constraints in motif.list_dynamic_node_constraints().items():
for attr, ops in constraints.items():
lowers = [] # (other_node, other_attr, op)
uppers = []
for op, targets in ops.items():
for other_node, other_attr in targets:
if op in {">", ">="}:
lowers.append((other_node, other_attr, op))
elif op in {"<", "<="}:
uppers.append((other_node, other_attr, op))

for on, oa, lop in lowers:
for un, ua, uop in uppers:
if (on, oa) == (un, ua):
# Only safe case is non-strict sandwich >= / <= on the same target
if not (lop == ">=" and uop == "<="):
raise self.ConstraintCollisionError(
f"Impossible dynamic bounds on {node}.{attr} with {on}.{oa}: {lop} and {uop}"
)

return True