Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 45 additions & 11 deletions code_review_graph/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,11 +492,42 @@ def resolve_bare_call_targets(self) -> int:

Returns the number of resolved edges.
"""
return self._resolve_bare_endpoints("CALLS", "target_qualified")

def resolve_bare_tested_by_sources(self) -> int:
"""Batch-resolve bare-name TESTED_BY sources using the global node table.

TESTED_BY edges are derived from test-to-production CALLS edges at
parse time, so their production endpoint (the source) inherits the
bare name of any call the parser couldn't resolve cross-file — the
common case, since tests almost always import the code they test.
Qualified-name lookups (tests_for, review context) miss those edges
entirely. Same in-place resolution and disambiguation strategy as
:meth:`resolve_bare_call_targets`.

Returns the number of resolved edges.
"""
return self._resolve_bare_endpoints("TESTED_BY", "source_qualified")

def _resolve_bare_endpoints(self, kind: str, endpoint: str) -> int:
"""Resolve bare-name edge endpoints of *kind* in column *endpoint*.

The opposite endpoint's file provides import evidence for
disambiguation: for CALLS the caller imports the callee's module,
for TESTED_BY the test file imports the module under test.
"""
if endpoint not in ("source_qualified", "target_qualified"):
raise ValueError(f"Invalid edge endpoint column: {endpoint!r}")
counterpart = (
"source_qualified" if endpoint == "target_qualified"
else "target_qualified"
)
conn = self._conn

bare_edges = conn.execute(
"SELECT id, source_qualified, target_qualified, file_path "
"FROM edges WHERE kind = 'CALLS' AND target_qualified NOT LIKE '%::%'"
f"SELECT id, source_qualified, target_qualified, file_path "
f"FROM edges WHERE kind = ? AND {endpoint} NOT LIKE '%::%'",
(kind,),
).fetchall()
if not bare_edges:
return 0
Expand All @@ -509,7 +540,7 @@ def resolve_bare_call_targets(self) -> int:
).fetchall():
node_lookup.setdefault(row["name"], []).append(row["qualified_name"])

# source_file -> set of imported files (for disambiguation)
# importing_file -> set of imported files (for disambiguation)
import_targets: dict[str, set[str]] = {}
for row in conn.execute(
"SELECT DISTINCT file_path, target_qualified FROM edges "
Expand All @@ -521,21 +552,21 @@ def resolve_bare_call_targets(self) -> int:

resolved = 0
for edge in bare_edges:
bare_name = edge["target_qualified"]
bare_name = edge[endpoint]
candidates = node_lookup.get(bare_name, [])
if not candidates:
continue

if len(candidates) == 1:
qualified = candidates[0]
else:
# Disambiguate via imports
src_qn = edge["source_qualified"]
src_file = (
src_qn.split("::", 1)[0] if "::" in src_qn
# Disambiguate via the counterpart endpoint's imports
ctx_qn = edge[counterpart]
ctx_file = (
ctx_qn.split("::", 1)[0] if "::" in ctx_qn
else edge["file_path"]
)
imported_files = import_targets.get(src_file, set())
imported_files = import_targets.get(ctx_file, set())
imported = [
c for c in candidates
if c.split("::", 1)[0] in imported_files
Expand All @@ -546,14 +577,17 @@ def resolve_bare_call_targets(self) -> int:
continue

conn.execute(
"UPDATE edges SET target_qualified = ? WHERE id = ?",
f"UPDATE edges SET {endpoint} = ? WHERE id = ?",
(qualified, edge["id"]),
)
resolved += 1

if resolved:
conn.commit()
logger.info("Resolved %d bare-name CALLS targets", resolved)
logger.info(
"Resolved %d bare-name %s %s", resolved, kind,
"sources" if endpoint == "source_qualified" else "targets",
)
return resolved

def get_all_files(self) -> list[str]:
Expand Down
34 changes: 29 additions & 5 deletions code_review_graph/postprocessing.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
"""Shared post-build processing pipeline.

After the core Tree-sitter parse (full_build or incremental_update), four
After the core Tree-sitter parse (full_build or incremental_update), five
post-processing steps must run to populate derived tables:

1. Compute node signatures
2. Rebuild FTS5 search index
3. Trace execution flows
4. Detect code communities
1. Resolve bare-name edge endpoints (cross-file CALLS targets and the
TESTED_BY sources that inherit from them)
2. Compute node signatures
3. Rebuild FTS5 search index
4. Trace execution flows
5. Detect code communities

This module extracts that pipeline so every entry point — MCP tool, CLI
commands, and watch mode — produces identical results.
Expand Down Expand Up @@ -39,6 +41,7 @@ def run_post_processing(store: GraphStore) -> dict[str, Any]:
result: dict[str, Any] = {}
warnings: list[str] = []

_resolve_bare_endpoints(store, result, warnings)
_compute_signatures(store, result, warnings)
_rebuild_fts_index(store, result, warnings)
_trace_flows(store, result, warnings)
Expand All @@ -52,6 +55,27 @@ def run_post_processing(store: GraphStore) -> dict[str, Any]:
# -- Individual steps (private) ------------------------------------------


def _resolve_bare_endpoints(
store: GraphStore,
result: dict[str, Any],
warnings: list[str],
) -> None:
"""Resolve bare-name CALLS targets and TESTED_BY sources.

Runs before the derived steps so flows and communities see
qualified edges wherever the name is unambiguous.
"""
try:
resolved = store.resolve_bare_call_targets()
resolved += store.resolve_bare_tested_by_sources()
result["bare_edges_resolved"] = resolved
except sqlite3.OperationalError as e:
logger.warning("Bare-endpoint resolution failed: %s", e)
warnings.append(
f"Bare-endpoint resolution failed: {type(e).__name__}: {e}"
)


def _compute_signatures(
store: GraphStore,
result: dict[str, Any],
Expand Down
20 changes: 20 additions & 0 deletions code_review_graph/tools/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ def _run_postprocess(
if postprocess == "none":
return warnings

# -- Resolve bare edge endpoints (fast; before derived steps) --
try:
resolved = store.resolve_bare_call_targets()
resolved += store.resolve_bare_tested_by_sources()
build_result["bare_edges_resolved"] = resolved
except sqlite3.OperationalError as e:
logger.warning("Bare-endpoint resolution failed: %s", e)
warnings.append(f"Bare-endpoint resolution failed: {type(e).__name__}: {e}")

# -- Signatures + FTS (fast, always run unless "none") --
try:
rows = store.get_nodes_without_signature()
Expand Down Expand Up @@ -463,6 +472,17 @@ def run_postprocess(
warnings: list[str] = []

try:
# -- Resolve bare edge endpoints first so derived steps see them --
try:
resolved = store.resolve_bare_call_targets()
resolved += store.resolve_bare_tested_by_sources()
result["bare_edges_resolved"] = resolved
except sqlite3.OperationalError as e:
logger.warning("Bare-endpoint resolution failed: %s", e)
warnings.append(
f"Bare-endpoint resolution failed: {type(e).__name__}: {e}"
)

try:
rows = store.get_nodes_without_signature()
for row in rows:
Expand Down
107 changes: 107 additions & 0 deletions tests/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,3 +414,110 @@ def test_uncapped_small_frontier_unchanged(self):
assert len(indirect_default) == 1
assert len(indirect_capped) == 1
assert indirect_default[0]["name"] == indirect_capped[0]["name"]


class TestResolveBareEndpoints:
"""Bare-name resolution for CALLS targets and TESTED_BY sources."""

def setup_method(self):
self.tmp = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
self.tmp.close() # close the OS handle so Windows can unlink in teardown
self.store = GraphStore(self.tmp.name)

def teardown_method(self):
self.store.close()
Path(self.tmp.name).unlink(missing_ok=True)

def _func(self, name, path, is_test=False):
self.store.upsert_node(NodeInfo(
kind="Test" if is_test else "Function",
name=name, file_path=path,
line_start=1, line_end=10, language="python",
is_test=is_test,
))

def _edge(self, kind, source, target, file_path):
self.store.upsert_edge(EdgeInfo(
kind=kind, source=source, target=target,
file_path=file_path, line=1,
))

def _edge_endpoints(self, kind):
rows = self.store._conn.execute(
"SELECT source_qualified, target_qualified FROM edges WHERE kind = ?",
(kind,),
).fetchall()
return [(r["source_qualified"], r["target_qualified"]) for r in rows]

def test_resolves_unique_tested_by_source(self):
self._func("parse", "src/app.py")
self._func("test_parse", "tests/test_app.py", is_test=True)
self._edge(
"TESTED_BY", "parse", "tests/test_app.py::test_parse",
"tests/test_app.py",
)
self.store.commit()

assert self.store.resolve_bare_tested_by_sources() == 1
assert self._edge_endpoints("TESTED_BY") == [
("src/app.py::parse", "tests/test_app.py::test_parse"),
]

def test_ambiguous_source_without_import_evidence_stays_bare(self):
self._func("parse", "src/app.py")
self._func("parse", "src/other.py")
self._func("test_parse", "tests/test_app.py", is_test=True)
self._edge(
"TESTED_BY", "parse", "tests/test_app.py::test_parse",
"tests/test_app.py",
)
self.store.commit()

assert self.store.resolve_bare_tested_by_sources() == 0
assert self._edge_endpoints("TESTED_BY") == [
("parse", "tests/test_app.py::test_parse"),
]

def test_ambiguous_source_disambiguated_by_test_file_imports(self):
self._func("parse", "src/app.py")
self._func("parse", "src/other.py")
self._func("test_parse", "tests/test_app.py", is_test=True)
self._edge(
"IMPORTS_FROM", "tests/test_app.py", "src/app.py",
"tests/test_app.py",
)
self._edge(
"TESTED_BY", "parse", "tests/test_app.py::test_parse",
"tests/test_app.py",
)
self.store.commit()

assert self.store.resolve_bare_tested_by_sources() == 1
assert (
"src/app.py::parse", "tests/test_app.py::test_parse",
) in self._edge_endpoints("TESTED_BY")

def test_unknown_source_name_is_left_alone(self):
self._func("test_ghost", "tests/test_app.py", is_test=True)
self._edge(
"TESTED_BY", "ghost", "tests/test_app.py::test_ghost",
"tests/test_app.py",
)
self.store.commit()

assert self.store.resolve_bare_tested_by_sources() == 0
assert self._edge_endpoints("TESTED_BY") == [
("ghost", "tests/test_app.py::test_ghost"),
]

def test_calls_targets_still_resolve(self):
"""The shared helper keeps the original CALLS behavior intact."""
self._func("helper", "src/util.py")
self._func("caller", "src/app.py")
self._edge("CALLS", "src/app.py::caller", "helper", "src/app.py")
self.store.commit()

assert self.store.resolve_bare_call_targets() == 1
assert self._edge_endpoints("CALLS") == [
("src/app.py::caller", "src/util.py::helper"),
]
67 changes: 67 additions & 0 deletions tests/test_postprocessing.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Tests for the shared post-processing pipeline."""

import sqlite3
import tempfile
from pathlib import Path
from unittest.mock import MagicMock, patch
Expand Down Expand Up @@ -325,3 +326,69 @@ def run_watch():
callback.assert_not_called()
finally:
store.close()


class TestResolveBareEndpointsStep:
"""run_post_processing resolves bare edge endpoints before derived steps."""

def setup_method(self):
self.tmp = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
self.tmp.close() # close the OS handle so Windows can unlink in teardown
self.store = GraphStore(self.tmp.name)

def teardown_method(self):
self.store.close()
Path(self.tmp.name).unlink(missing_ok=True)

def _seed_bare_edges(self):
for name, path, is_test in [
("parse", "src/app.py", False),
("helper", "src/util.py", False),
("test_parse", "tests/test_app.py", True),
]:
self.store.upsert_node(NodeInfo(
kind="Test" if is_test else "Function",
name=name, file_path=path,
line_start=1, line_end=10, language="python",
is_test=is_test,
))
self.store.upsert_edge(EdgeInfo(
kind="CALLS", source="src/app.py::parse", target="helper",
file_path="src/app.py", line=1,
))
self.store.upsert_edge(EdgeInfo(
kind="TESTED_BY", source="parse",
target="tests/test_app.py::test_parse",
file_path="tests/test_app.py", line=1,
))
self.store.commit()

def test_resolves_bare_endpoints_and_reports_count(self):
self._seed_bare_edges()

result = run_post_processing(self.store)

assert result["bare_edges_resolved"] == 2
rows = self.store._conn.execute(
"SELECT source_qualified, target_qualified, kind FROM edges "
"WHERE kind IN ('CALLS', 'TESTED_BY')"
).fetchall()
by_kind = {r["kind"]: (r["source_qualified"], r["target_qualified"]) for r in rows}
assert by_kind["CALLS"] == ("src/app.py::parse", "src/util.py::helper")
assert by_kind["TESTED_BY"] == (
"src/app.py::parse", "tests/test_app.py::test_parse",
)

def test_resolution_failure_is_isolated(self):
"""A resolution failure becomes a warning, not a crashed pipeline."""
self._seed_bare_edges()
with patch.object(
GraphStore, "resolve_bare_call_targets",
side_effect=sqlite3.OperationalError("boom"),
):
result = run_post_processing(self.store)

assert "bare_edges_resolved" not in result
assert any("Bare-endpoint resolution" in w for w in result["warnings"])
# Derived steps still ran
assert "communities_detected" in result
Loading