From f415ddb3a9e3e914b860f016cc863e6240134140 Mon Sep 17 00:00:00 2001 From: Stefan Hudici Date: Sat, 4 Jul 2026 15:31:05 +0300 Subject: [PATCH] feat: rank impact radius by weighted best-path score get_impact_radius treated every edge and depth equally: a depth-2 IMPORTS_FROM hop counted the same as a depth-1 CALLS hop, and when the result exceeded max_nodes, truncation kept arbitrary scan-order nodes. On a ~4.8k-node production graph a single-module change returned 500 flat nodes whose "key entities" were alphabetical shell scripts. Each reached node now gets a score: the best path from any seed, where every hop multiplies by an edge-kind weight (IMPACT_EDGE_WEIGHTS: CALLS 1.0 down to CONTAINS 0.3, unknown kinds 0.5) and a per-hop decay (0.6). Paths whose score falls below IMPACT_SCORE_FLOOR (0.05) stop expanding. impacted_nodes come back ordered best-first, truncation keeps the highest-signal nodes, and a new additive impact_scores map (plus impact_score on each node dict at the tool layer) exposes the ranking. SQLite cannot aggregate inside a recursive CTE, so the recursion may revisit a node once per distinct path score; the depth guard plus the score floor bound the expansion and the outer GROUP BY keeps MAX(score). The legacy NetworkX engine implements the same scoring (with better-path revisits, since a deep CALLS chain can outscore a shallow CONTAINS hop) and stays set- and score-aligned with the SQL engine. Weights follow the precedent of communities.EDGE_WEIGHTS but model review-risk propagation rather than clustering affinity, so the values intentionally differ. --- code_review_graph/constants.py | 28 +++++ code_review_graph/graph.py | 170 ++++++++++++++++++++++++------- code_review_graph/tools/query.py | 10 +- tests/test_graph.py | 147 ++++++++++++++++++++++++++ tests/test_tools.py | 80 +++++++++++++++ 5 files changed, 400 insertions(+), 35 deletions(-) diff --git a/code_review_graph/constants.py b/code_review_graph/constants.py index 2985a5e1..3510a9a2 100644 --- a/code_review_graph/constants.py +++ b/code_review_graph/constants.py @@ -21,3 +21,31 @@ # BFS engine: "sql" (SQLite recursive CTE) or "networkx" (Python-side BFS) BFS_ENGINE = os.environ.get("CRG_BFS_ENGINE", "sql") + +# --------------------------------------------------------------------------- +# Impact-radius scoring +# --------------------------------------------------------------------------- +# Change risk does not propagate equally across edge kinds: a direct call is +# a stronger coupling than an import, which is stronger than mere file +# membership. Each traversal hop multiplies the running score by the edge's +# weight and by IMPACT_DEPTH_DECAY; paths whose score falls below +# IMPACT_SCORE_FLOOR are not expanded further. Impacted nodes are ranked +# (and truncated) by their best-path score instead of arbitrary scan order. +# +# These weights model risk propagation for reviews; communities.EDGE_WEIGHTS +# models clustering affinity and intentionally differs (e.g. TESTED_BY is a +# weak clustering signal but a strong "this test is affected" signal). +IMPACT_EDGE_WEIGHTS: dict[str, float] = { + "CALLS": 1.0, + "INHERITS": 0.9, + "OVERRIDES": 0.9, + "IMPLEMENTS": 0.9, + "TESTED_BY": 0.7, + "REFERENCES": 0.6, + "DEPENDS_ON": 0.6, + "IMPORTS_FROM": 0.5, + "CONTAINS": 0.3, +} +IMPACT_DEFAULT_EDGE_WEIGHT = 0.5 +IMPACT_DEPTH_DECAY = float(os.environ.get("CRG_IMPACT_DEPTH_DECAY", "0.6")) +IMPACT_SCORE_FLOOR = float(os.environ.get("CRG_IMPACT_SCORE_FLOOR", "0.05")) diff --git a/code_review_graph/graph.py b/code_review_graph/graph.py index 4ed3c7fc..89fde8c0 100644 --- a/code_review_graph/graph.py +++ b/code_review_graph/graph.py @@ -19,7 +19,15 @@ import networkx as nx -from .constants import BFS_ENGINE, MAX_IMPACT_DEPTH, MAX_IMPACT_NODES +from .constants import ( + BFS_ENGINE, + IMPACT_DEFAULT_EDGE_WEIGHT, + IMPACT_DEPTH_DECAY, + IMPACT_EDGE_WEIGHTS, + IMPACT_SCORE_FLOOR, + MAX_IMPACT_DEPTH, + MAX_IMPACT_NODES, +) from .migrations import get_schema_version, run_migrations from .parser import EdgeInfo, NodeInfo @@ -623,9 +631,12 @@ def get_impact_radius( Returns dict with: - changed_nodes: nodes in changed files - - impacted_nodes: nodes reachable via edges + - impacted_nodes: nodes reachable via edges, ordered by + impact score (best-path score, highest first) - impacted_files: unique set of affected files - edges: connecting edges + - impact_scores: qualified_name -> best-path score for each + returned impacted node (edge-kind weight x depth decay per hop) """ if BFS_ENGINE == "networkx": return self._get_impact_radius_networkx( @@ -656,6 +667,7 @@ def get_impact_radius_sql( "edges": [], "truncated": False, "total_impacted": 0, + "impact_scores": {}, } # Seed qualified names @@ -673,6 +685,7 @@ def get_impact_radius_sql( "edges": [], "truncated": False, "total_impacted": 0, + "impact_scores": {}, } # Build recursive CTE — use a temp table for the seed set to @@ -692,42 +705,87 @@ def get_impact_radius_sql( batch, ) + # Edge-kind weights live in a temp table so the recursive CTE can + # join them per hop. + self._conn.execute( + "CREATE TEMP TABLE IF NOT EXISTS _impact_weights " + "(kind TEXT PRIMARY KEY, weight REAL NOT NULL)" + ) + self._conn.execute("DELETE FROM _impact_weights") + self._conn.executemany( + "INSERT INTO _impact_weights (kind, weight) VALUES (?, ?)", + list(IMPACT_EDGE_WEIGHTS.items()), + ) + + # Each hop multiplies the running score by the edge weight and the + # depth decay; paths below the score floor stop expanding. SQLite + # cannot aggregate inside the recursive term, so a node may be + # visited once per distinct (depth, score) path; the outer GROUP BY + # keeps its best score, and ORDER BY makes LIMIT keep the + # highest-signal nodes instead of arbitrary scan order. Seeds score + # 1.0 and every hop multiplies by < 1, so seeds always sort first + # and the LIMIT still admits up to max_nodes non-seed nodes. + # Edge endpoints without a node row ("ghost" qualified names from + # unresolved targets) stay in the recursion as traversal bridges but + # are excluded from the final selection so they cannot consume LIMIT + # slots that _batch_get_nodes would silently drop. cte_sql = """ - WITH RECURSIVE impacted(node_qn, depth) AS ( - SELECT qn, 0 FROM _impact_seeds + WITH RECURSIVE impacted(node_qn, depth, score) AS ( + SELECT qn, 0, 1.0 FROM _impact_seeds UNION - SELECT e.target_qualified, i.depth + 1 + SELECT e.target_qualified, i.depth + 1, + i.score * COALESCE(w.weight, ?) * ? FROM impacted i JOIN edges e ON e.source_qualified = i.node_qn + LEFT JOIN _impact_weights w ON w.kind = e.kind WHERE i.depth < ? + AND i.score * COALESCE(w.weight, ?) * ? > ? UNION - SELECT e.source_qualified, i.depth + 1 + SELECT e.source_qualified, i.depth + 1, + i.score * COALESCE(w.weight, ?) * ? FROM impacted i JOIN edges e ON e.target_qualified = i.node_qn + LEFT JOIN _impact_weights w ON w.kind = e.kind WHERE i.depth < ? + AND i.score * COALESCE(w.weight, ?) * ? > ? ) - SELECT DISTINCT node_qn, MIN(depth) AS min_depth + SELECT node_qn, MIN(depth) AS min_depth, MAX(score) AS impact_score FROM impacted + WHERE node_qn IN (SELECT qualified_name FROM nodes) GROUP BY node_qn + ORDER BY impact_score DESC, node_qn LIMIT ? """ + hop = ( + IMPACT_DEFAULT_EDGE_WEIGHT, IMPACT_DEPTH_DECAY, + max_depth, + IMPACT_DEFAULT_EDGE_WEIGHT, IMPACT_DEPTH_DECAY, + IMPACT_SCORE_FLOOR, + ) rows = self._conn.execute( - cte_sql, (max_depth, max_depth, max_nodes + len(seeds)), + cte_sql, hop + hop + (max_nodes + len(seeds),), ).fetchall() - # Split into seeds vs impacted + # Split into seeds vs impacted, keeping each node's best-path score. + score_by_qn: dict[str, float] = {} impacted_qns: set[str] = set() for r in rows: qn = r[0] + score_by_qn[qn] = r[2] if qn not in seeds: impacted_qns.add(qn) - # Batch-fetch nodes + # Batch-fetch nodes, then restore best-score-first order. changed_nodes = self._batch_get_nodes(seeds) impacted_nodes = self._batch_get_nodes(impacted_qns) + impacted_nodes.sort( + key=lambda n: (-score_by_qn.get(n.qualified_name, 0.0), n.qualified_name), + ) total_impacted = len(impacted_nodes) - truncated = total_impacted > max_nodes + # The LIMIT above already capped non-seed rows at max_nodes, so a + # saturated result means more nodes may exist beyond the cutoff. + truncated = total_impacted >= max_nodes if truncated: impacted_nodes = impacted_nodes[:max_nodes] @@ -745,6 +803,12 @@ def get_impact_radius_sql( "edges": relevant_edges, "truncated": truncated, "total_impacted": total_impacted, + "impact_scores": { + n.qualified_name: round( + score_by_qn.get(n.qualified_name, 0.0), 4, + ) + for n in impacted_nodes + }, } # -- NetworkX BFS version (legacy) ------------------------------------ @@ -764,37 +828,51 @@ def _get_impact_radius_networkx( for n in nodes: seeds.add(n.qualified_name) - visited: set[str] = set() - frontier = seeds.copy() + # Weighted BFS mirroring the SQL engine: each hop multiplies the + # running score by the edge weight and the depth decay, sub-floor + # paths stop expanding, and a node re-enters the frontier whenever a + # better-scoring path reaches it (a deep CALLS chain can outscore a + # shallow CONTAINS hop, so first-visit BFS is not enough). + best: dict[str, float] = dict.fromkeys(seeds, 1.0) + frontier: dict[str, float] = dict(best) depth = 0 - impacted: set[str] = set() while frontier and depth < max_depth: - visited.update(frontier) - next_frontier: set[str] = set() - for qn in frontier: - if qn in nxg: - for neighbor in nxg.neighbors(qn): - if neighbor not in visited: - next_frontier.add(neighbor) - impacted.add(neighbor) - if qn in nxg: - for pred in nxg.predecessors(qn): - if pred not in visited: - next_frontier.add(pred) - impacted.add(pred) - next_frontier -= visited - if len(visited) + len(next_frontier) > max_nodes: - break + next_frontier: dict[str, float] = {} + for qn, score in frontier.items(): + if qn not in nxg: + continue + out_edges = nxg.out_edges(qn, data=True) + in_edges = nxg.in_edges(qn, data=True) + neighbors = [ + (target, data) for _, target, data in out_edges + ] + [ + (source, data) for source, _, data in in_edges + ] + for other_qn, data in neighbors: + weight = IMPACT_EDGE_WEIGHTS.get( + data.get("kind", ""), IMPACT_DEFAULT_EDGE_WEIGHT, + ) + new_score = score * weight * IMPACT_DEPTH_DECAY + if new_score <= IMPACT_SCORE_FLOOR: + continue + if new_score > best.get(other_qn, 0.0): + best[other_qn] = new_score + next_frontier[other_qn] = new_score frontier = next_frontier depth += 1 changed_nodes = self._batch_get_nodes(seeds) - impacted_qns = impacted - seeds + impacted_qns = set(best) - seeds impacted_nodes = self._batch_get_nodes(impacted_qns) + impacted_nodes.sort( + key=lambda n: (-best.get(n.qualified_name, 0.0), n.qualified_name), + ) total_impacted = len(impacted_nodes) - truncated = total_impacted > max_nodes + # Same saturation semantics as the SQL engine: a full result means + # more nodes may exist beyond the cutoff. + truncated = total_impacted >= max_nodes if truncated: impacted_nodes = impacted_nodes[:max_nodes] @@ -812,6 +890,10 @@ def _get_impact_radius_networkx( "edges": relevant_edges, "truncated": truncated, "total_impacted": total_impacted, + "impact_scores": { + n.qualified_name: round(best.get(n.qualified_name, 0.0), 4) + for n in impacted_nodes + }, } def get_subgraph(self, qualified_names: list[str]) -> dict[str, Any]: @@ -1282,14 +1364,34 @@ def load_flow_adjacency(self) -> "FlowAdjacency": # --- Internal helpers --- def _build_networkx_graph(self) -> nx.DiGraph: - """Build (or return cached) in-memory NetworkX directed graph from all edges.""" + """Build (or return cached) in-memory NetworkX directed graph from all edges. + + DiGraph keeps a single edge per (source, target) pair. When parallel + edges of different kinds exist (e.g. a file both CONTAINS and CALLS a + function), keep the kind with the highest impact weight so weighted + traversals see the same best-path scores as the SQL engine, which + reads every edge row. Consumers that ignore ``kind`` (betweenness + centrality) only depend on topology and are unaffected. + """ with self._cache_lock: if self._nxg_cache is not None: return self._nxg_cache g: nx.DiGraph = nx.DiGraph() rows = self._conn.execute("SELECT * FROM edges").fetchall() for r in rows: - g.add_edge(r["source_qualified"], r["target_qualified"], kind=r["kind"]) + source = r["source_qualified"] + target = r["target_qualified"] + kind = r["kind"] + if g.has_edge(source, target): + existing = g[source][target].get("kind", "") + if ( + IMPACT_EDGE_WEIGHTS.get(kind, IMPACT_DEFAULT_EDGE_WEIGHT) + <= IMPACT_EDGE_WEIGHTS.get( + existing, IMPACT_DEFAULT_EDGE_WEIGHT, + ) + ): + continue + g.add_edge(source, target, kind=kind) self._nxg_cache = g return g diff --git a/code_review_graph/tools/query.py b/code_review_graph/tools/query.py index 3b442f8a..2a2c6d8e 100644 --- a/code_review_graph/tools/query.py +++ b/code_review_graph/tools/query.py @@ -80,8 +80,16 @@ def get_impact_radius( abs_files, max_depth=max_depth, max_nodes=max_results ) + impact_scores = result.get("impact_scores", {}) + changed_dicts = [node_to_dict(n) for n in result["changed_nodes"]] - impacted_dicts = [node_to_dict(n) for n in result["impacted_nodes"]] + impacted_dicts = [] + for n in result["impacted_nodes"]: + d = node_to_dict(n) + score = impact_scores.get(n.qualified_name) + if score is not None: + d["impact_score"] = score + impacted_dicts.append(d) edge_dicts = [edge_to_dict(e) for e in result["edges"]] truncated = result["truncated"] total_impacted = result["total_impacted"] diff --git a/tests/test_graph.py b/tests/test_graph.py index fb407810..3e55112c 100644 --- a/tests/test_graph.py +++ b/tests/test_graph.py @@ -5,6 +5,8 @@ import tempfile from pathlib import Path +import pytest + from code_review_graph.graph import GraphStore from code_review_graph.parser import EdgeInfo, NodeInfo @@ -414,3 +416,148 @@ def test_uncapped_small_frontier_unchanged(self): assert len(indirect_default) == 1 assert len(indirect_capped) == 1 assert indirect_default[0]["name"] == indirect_capped[0]["name"] + + +class TestWeightedImpactScoring: + """Weighted impact-radius scoring: edge weights, decay, floor, ordering.""" + + def setup_method(self): + self.tmp = tempfile.NamedTemporaryFile(suffix=".db", delete=False) + self.tmp.close() + self.store = GraphStore(self.tmp.name) + + def teardown_method(self): + self.store.close() + Path(self.tmp.name).unlink(missing_ok=True) + + def _add_func(self, name: str, path: str) -> str: + self.store.upsert_node(NodeInfo( + kind="Function", name=name, file_path=path, + line_start=1, line_end=10, language="python", + )) + return f"{path}::{name}" + + def _add_edge(self, kind: str, source: str, target: str) -> None: + self.store.upsert_edge(EdgeInfo( + kind=kind, source=source, target=target, + file_path="/a.py", line=1, + )) + + def _scores(self, result) -> dict: + return result["impact_scores"] + + def test_calls_scores_higher_than_imports(self): + a = self._add_func("func_a", "/a.py") + b = self._add_func("func_b", "/b.py") + c = self._add_func("func_c", "/c.py") + self._add_edge("CALLS", a, b) + self._add_edge("IMPORTS_FROM", a, c) + self.store.commit() + + result = self.store.get_impact_radius_sql(["/a.py"], max_depth=2) + scores = self._scores(result) + assert scores[b] == pytest.approx(0.6) # 1.0 (CALLS) * 0.6 decay + assert scores[c] == pytest.approx(0.3) # 0.5 (IMPORTS_FROM) * 0.6 + ordered = [n.qualified_name for n in result["impacted_nodes"]] + assert ordered == [b, c] + + def test_depth_decay_lowers_score_per_hop(self): + a = self._add_func("func_a", "/a.py") + b = self._add_func("func_b", "/b.py") + c = self._add_func("func_c", "/c.py") + self._add_edge("CALLS", a, b) + self._add_edge("CALLS", b, c) + self.store.commit() + + result = self.store.get_impact_radius_sql(["/a.py"], max_depth=2) + scores = self._scores(result) + assert scores[b] == pytest.approx(0.6) + assert scores[c] == pytest.approx(0.36) + + def test_truncation_keeps_highest_scoring_nodes(self): + a = self._add_func("func_a", "/a.py") + b = self._add_func("func_b", "/b.py") + c = self._add_func("func_c", "/c.py") + self._add_edge("CALLS", a, b) + self._add_edge("IMPORTS_FROM", a, c) + self.store.commit() + + result = self.store.get_impact_radius_sql( + ["/a.py"], max_depth=2, max_nodes=1, + ) + kept = [n.qualified_name for n in result["impacted_nodes"]] + assert kept == [b] # CALLS edge outranks IMPORTS_FROM + + def test_deeper_strong_path_beats_shallow_weak_path(self): + a = self._add_func("func_a", "/a.py") + m = self._add_func("func_m", "/m.py") + x = self._add_func("func_x", "/x.py") + self._add_edge("CONTAINS", a, x) # depth 1: 0.3 * 0.6 = 0.18 + self._add_edge("CALLS", a, m) + self._add_edge("CALLS", m, x) # depth 2: 0.6 * 0.6 = 0.36 + self.store.commit() + + sql = self.store.get_impact_radius_sql(["/a.py"], max_depth=2) + nx_r = self.store._get_impact_radius_networkx(["/a.py"], max_depth=2) + assert self._scores(sql)[x] == pytest.approx(0.36) + assert self._scores(nx_r)[x] == pytest.approx(0.36) + + def test_score_floor_stops_expansion(self): + # CALLS chain decays 0.6 per hop; hop 6 scores 0.047 <= floor 0.05. + qns = [self._add_func(f"f{i}", f"/f{i}.py") for i in range(8)] + for src, tgt in zip(qns, qns[1:]): + self._add_edge("CALLS", src, tgt) + self.store.commit() + + result = self.store.get_impact_radius_sql(["/f0.py"], max_depth=8) + scores = self._scores(result) + assert qns[5] in scores # 0.6**5 = 0.0778 > floor + assert qns[6] not in scores # 0.6**6 = 0.0467 <= floor + + def test_sql_and_networkx_scores_match(self): + a = self._add_func("func_a", "/a.py") + b = self._add_func("func_b", "/b.py") + c = self._add_func("func_c", "/c.py") + t = self._add_func("test_b", "/tests.py") + self._add_edge("CALLS", a, b) + self._add_edge("IMPORTS_FROM", b, c) + self._add_edge("TESTED_BY", b, t) + self.store.commit() + + sql = self.store.get_impact_radius_sql(["/a.py"], max_depth=3) + nx_r = self.store._get_impact_radius_networkx(["/a.py"], max_depth=3) + assert self._scores(sql) == self._scores(nx_r) + sql_order = [n.qualified_name for n in sql["impacted_nodes"]] + nx_order = [n.qualified_name for n in nx_r["impacted_nodes"]] + assert sql_order == nx_order + + def test_unknown_edge_kind_uses_default_weight(self): + a = self._add_func("func_a", "/a.py") + b = self._add_func("func_b", "/b.py") + self._add_edge("MYSTERY_KIND", a, b) + self.store.commit() + + result = self.store.get_impact_radius_sql(["/a.py"], max_depth=1) + assert self._scores(result)[b] == pytest.approx(0.3) # 0.5 * 0.6 + + def test_empty_changed_files_returns_empty_scores(self): + result = self.store.get_impact_radius_sql([], max_depth=2) + assert result["impact_scores"] == {} + + def test_parallel_edge_kinds_keep_engines_aligned(self): + # Two edges of different kinds between the same pair: DiGraph keeps + # one edge per pair, and before the strongest-kind collapse the last + # inserted kind won arbitrarily. CALLS first, CONTAINS second makes + # the old behavior score b at 0.18 instead of 0.6. + a = self._add_func("func_a", "/a.py") + b = self._add_func("func_b", "/b.py") + c = self._add_func("func_c", "/c.py") + self._add_edge("CALLS", a, b) + self._add_edge("CONTAINS", a, b) + self._add_edge("CALLS", b, c) + self.store.commit() + + sql = self.store.get_impact_radius_sql(["/a.py"], max_depth=2) + nx_r = self.store._get_impact_radius_networkx(["/a.py"], max_depth=2) + assert self._scores(nx_r)[b] == pytest.approx(0.6) + assert self._scores(sql) == self._scores(nx_r) diff --git a/tests/test_tools.py b/tests/test_tools.py index 578536d4..68da070f 100644 --- a/tests/test_tools.py +++ b/tests/test_tools.py @@ -1563,3 +1563,83 @@ def test_task_routing_refactor(self): task="refactor auth module", repo_root=str(self.root), ) assert "refactor" in result["next_tool_suggestions"] + + +def _seed_scored_impact_graph(root: Path) -> None: + """Seed a graph where one impact target is called and one is imported.""" + graph_dir = root / ".code-review-graph" + graph_dir.mkdir() + store = GraphStore(graph_dir / "graph.db") + app = "fixtures/sample_repo/src/app.py" + callee = "fixtures/sample_repo/src/callee.py" + imported = "fixtures/sample_repo/src/imported.py" + try: + store.upsert_node(NodeInfo( + kind="File", name=app, file_path=app, + line_start=1, line_end=6, language="python", + )) + store.upsert_node(NodeInfo( + kind="Function", name="handle", file_path=app, + line_start=1, line_end=3, language="python", + )) + store.upsert_node(NodeInfo( + kind="Function", name="callee", file_path=callee, + line_start=1, line_end=3, language="python", + )) + store.upsert_node(NodeInfo( + kind="Function", name="imported", file_path=imported, + line_start=1, line_end=3, language="python", + )) + store.upsert_edge(EdgeInfo( + kind="CALLS", source=f"{app}::handle", + target=f"{callee}::callee", file_path=app, line=2, + )) + store.upsert_edge(EdgeInfo( + kind="IMPORTS_FROM", source=f"{app}::handle", + target=f"{imported}::imported", file_path=app, line=1, + )) + store.commit() + finally: + store.close() + + +class TestImpactRadiusScoring: + """impact_score surfaces on impacted nodes, best-first.""" + + def _repo(self, tmp_path: Path) -> Path: + repo = tmp_path / "fixtures" / "sample_repo" + repo.mkdir(parents=True) + (repo / ".git").mkdir() + (repo / "src").mkdir() + (repo / "src" / "app.py").write_text( + "def handle():\n return 'ok'\n", + encoding="utf-8", + ) + _seed_scored_impact_graph(repo) + return repo + + def test_impacted_nodes_carry_scores_sorted_best_first(self, tmp_path): + repo = self._repo(tmp_path) + + result = get_impact_radius( + changed_files=["src/app.py"], + repo_root=str(repo), + ) + + impacted = result["impacted_nodes"] + assert impacted, "expected impacted nodes" + scores = [n["impact_score"] for n in impacted] + assert scores == sorted(scores, reverse=True) + by_name = {n["name"]: n["impact_score"] for n in impacted} + assert by_name["callee"] > by_name["imported"] + + def test_minimal_key_entities_lead_with_top_scored_node(self, tmp_path): + repo = self._repo(tmp_path) + + result = get_impact_radius( + changed_files=["src/app.py"], + repo_root=str(repo), + detail_level="minimal", + ) + + assert result["key_entities"][0] == "callee"