diff --git a/code_review_graph/communities.py b/code_review_graph/communities.py index c8a3de8b..e0199ea5 100644 --- a/code_review_graph/communities.py +++ b/code_review_graph/communities.py @@ -57,6 +57,11 @@ "add", "remove", "make", "build", "from", "to", "for", "with", "the", "and", "test", "main", "run", "do", "is", "has", "on", "of", "in", "at", "by", "my", "this", "that", "all", "none", + # BDD test-name grammar ("test_should_return_x_when_y") — frequent + # in test-heavy communities but carries no domain meaning. + "should", "when", "then", "given", "return", "returns", "raise", + "raises", "expect", "expected", "assert", "tests", "be", "it", "if", + "not", }) @@ -65,6 +70,11 @@ # --------------------------------------------------------------------------- +def _is_test_node(node: GraphNode) -> bool: + """True when *node* is a test (by kind or the parser's is_test flag).""" + return node.kind == "Test" or node.is_test + + def _generate_community_name(members: list[GraphNode]) -> str: """Generate a meaningful name for a community of nodes. @@ -77,6 +87,14 @@ def _generate_community_name(members: list[GraphNode]) -> str: if not members: return "empty" + # Mixed communities are named from their production members only: + # tests parrot the subject's vocabulary through BDD grammar and live + # under tests/ directories, which pollutes both the keyword and the + # file-prefix signals. + production = [m for m in members if not _is_test_node(m)] + if production and len(production) < len(members): + members = production + # 1. Find common file prefix file_paths = [m.file_path for m in members] prefix = _extract_file_prefix(file_paths) @@ -233,6 +251,87 @@ def _compute_cohesion( # --------------------------------------------------------------------------- +def _reassign_test_nodes( + clusters: list[list[int]], + idx_to_node: dict[int, GraphNode], + qn_to_idx: dict[str, int], + edges: list[GraphEdge], +) -> list[list[int]]: + """Move Test nodes into the community of the code they test. + + Shared fixtures and helpers give test files dense internal CALLS + edges, so Leiden tends to cluster tests with each other instead of + with their subjects, producing test-only blobs. Reviews want tests + next to the code they cover, so each Test node is reassigned to the + community holding the majority of its TESTED_BY partners (its own + community counts as a vote, so a test whose subjects mostly live + with it already stays put; ties also prefer the current cluster, + and otherwise resolve to the lowest cluster index so the outcome + never depends on edge order). + + The test endpoint of a TESTED_BY edge is identified by node kind, + not edge direction, so this works regardless of the direction the + parser emits. + """ + vertex_cluster: dict[int, int] = {} + for ci, cluster in enumerate(clusters): + for v in cluster: + vertex_cluster[v] = ci + + # TESTED_BY endpoints inherited from unresolved cross-file calls can + # be bare names rather than qualified ones; resolve those by node + # name when the name is unambiguous so their tests still vote. + name_to_idx: dict[str, int | None] = {} + for i, node in idx_to_node.items(): + name_to_idx[node.name] = None if node.name in name_to_idx else i + + def _resolve(qualified: str) -> int | None: + idx = qn_to_idx.get(qualified) + if idx is not None: + return idx + return name_to_idx.get(qualified) + + votes: dict[int, Counter[int]] = {} + for e in edges: + if e.kind != "TESTED_BY": + continue + a = _resolve(e.source_qualified) + b = _resolve(e.target_qualified) + if a is None or b is None: + continue + a_is_test = _is_test_node(idx_to_node[a]) + b_is_test = _is_test_node(idx_to_node[b]) + if a_is_test == b_is_test: + continue + test_v, subject_v = (a, b) if a_is_test else (b, a) + subject_cluster = vertex_cluster.get(subject_v) + if subject_cluster is None: + continue + votes.setdefault(test_v, Counter())[subject_cluster] += 1 + + if not votes: + return clusters + + moved = 0 + new_clusters: list[list[int]] = [[] for _ in clusters] + for ci, cluster in enumerate(clusters): + for v in cluster: + target = ci + if v in votes: + counter = votes[v] + best = max(counter.values()) + tied = [c for c, n in counter.items() if n == best] + target = ci if ci in tied else min(tied) + if target != ci: + moved += 1 + new_clusters[target].append(v) + if moved: + logger.info( + "Reassigned %d test nodes to their subject communities", moved + ) + return new_clusters + + def _detect_leiden( nodes: list[GraphNode], edges: list[GraphEdge], @@ -309,8 +408,12 @@ def _detect_leiden( len(partition), ) + clusters = _reassign_test_nodes( + [list(c) for c in partition], idx_to_node, qn_to_idx, edges + ) + pending: list[tuple[list[GraphNode], set[str]]] = [] - for cluster_ids in partition: + for cluster_ids in clusters: if len(cluster_ids) < min_size: continue members = [idx_to_node[i] for i in cluster_ids if i in idx_to_node] diff --git a/tests/test_communities.py b/tests/test_communities.py index 7d7b2dca..348e8e2b 100644 --- a/tests/test_communities.py +++ b/tests/test_communities.py @@ -10,7 +10,9 @@ _compute_cohesion, _compute_cohesion_batch, _detect_file_based, + _detect_leiden, _generate_community_name, + _reassign_test_nodes, detect_communities, get_architecture_overview, get_communities, @@ -590,3 +592,300 @@ def test_incremental_detect_redetects_affected(self): # Pass a file that IS part of existing communities result = incremental_detect_communities(self.store, ["auth.py"]) assert result > 0 + + +class TestNamingIgnoresTestVocabulary: + """Naming must not be hijacked by test members (BDD grammar, tests/ dirs).""" + + def _node( + self, nid: int, kind: str, name: str, fp: str, is_test: bool = False + ) -> GraphNode: + return GraphNode( + id=nid, kind=kind, name=name, qualified_name=f"{fp}::{name}", + file_path=fp, line_start=1, line_end=10, language="python", + parent_name=None, params=None, return_type=None, is_test=is_test, + file_hash="x", extra={}, + ) + + def test_mixed_community_named_from_production_members(self): + """Tests outnumbering production must not steal the prefix or keyword.""" + members = [ + self._node(1, "Function", "parse_invoice", "src/billing/invoice.py"), + self._node(2, "Function", "total_invoice", "src/billing/invoice.py"), + self._node( + 3, "Test", "test_should_parse_invoice_when_valid", + "tests/test_invoice.py", is_test=True, + ), + self._node( + 4, "Test", "test_should_raise_when_invoice_missing", + "tests/test_invoice.py", is_test=True, + ), + self._node( + 5, "Test", "test_should_return_zero_when_empty", + "tests/test_invoice.py", is_test=True, + ), + ] + name = _generate_community_name(members) + assert name.startswith("billing") + assert "invoice" in name + assert "should" not in name + assert "tests" not in name + + def test_pure_test_community_still_gets_a_name(self): + """All-test communities keep working; BDD grammar is just filtered.""" + members = [ + self._node( + 1, "Test", "test_should_parse_invoice", + "tests/test_invoice.py", is_test=True, + ), + self._node( + 2, "Test", "test_should_total_invoice", + "tests/test_invoice.py", is_test=True, + ), + ] + name = _generate_community_name(members) + assert name + assert "invoice" in name + assert "should" not in name + + def test_bdd_grammar_words_are_stop_words(self): + """'should'/'when'/'return' never become the community keyword.""" + members = [ + self._node( + i, "Test", f"test_should_return_widget_when_ready_{i}x", + "tests/test_w.py", is_test=True, + ) + for i in range(1, 4) + ] + name = _generate_community_name(members) + assert "should" not in name + assert "when" not in name + assert "widget" in name + + +class TestReassignTestNodes: + """_reassign_test_nodes moves Test vertices to their subjects' cluster.""" + + def _node( + self, nid: int, kind: str, name: str, fp: str, is_test: bool = False + ) -> GraphNode: + return GraphNode( + id=nid, kind=kind, name=name, qualified_name=f"{fp}::{name}", + file_path=fp, line_start=1, line_end=10, language="python", + parent_name=None, params=None, return_type=None, is_test=is_test, + file_hash="x", extra={}, + ) + + def _edge(self, eid: int, kind: str, src: str, tgt: str) -> GraphEdge: + return GraphEdge( + id=eid, kind=kind, source_qualified=src, target_qualified=tgt, + file_path="f.py", line=1, extra={}, + ) + + def _two_cluster_fixture(self): + """Production (a, b) in cluster 0, their tests (t1, t2) in cluster 1.""" + nodes = [ + self._node(0, "Function", "parse", "src/app.py"), + self._node(1, "Function", "render", "src/app.py"), + self._node(2, "Test", "test_parse", "tests/test_app.py", is_test=True), + self._node(3, "Test", "test_render", "tests/test_app.py", is_test=True), + ] + qn_to_idx = {n.qualified_name: i for i, n in enumerate(nodes)} + idx_to_node = dict(enumerate(nodes)) + return nodes, qn_to_idx, idx_to_node + + def test_tests_move_to_subject_cluster(self): + """Tests clustered apart from their subjects get pulled over.""" + nodes, qn_to_idx, idx_to_node = self._two_cluster_fixture() + clusters = [[0, 1], [2, 3]] + edges = [ + self._edge(1, "TESTED_BY", nodes[0].qualified_name, nodes[2].qualified_name), + self._edge(2, "TESTED_BY", nodes[1].qualified_name, nodes[3].qualified_name), + ] + result = _reassign_test_nodes(clusters, idx_to_node, qn_to_idx, edges) + assert sorted(result[0]) == [0, 1, 2, 3] + assert result[1] == [] + + def test_direction_agnostic(self): + """Same outcome when the parser emits test -> production edges.""" + nodes, qn_to_idx, idx_to_node = self._two_cluster_fixture() + clusters = [[0, 1], [2, 3]] + edges = [ + self._edge(1, "TESTED_BY", nodes[2].qualified_name, nodes[0].qualified_name), + self._edge(2, "TESTED_BY", nodes[3].qualified_name, nodes[1].qualified_name), + ] + result = _reassign_test_nodes(clusters, idx_to_node, qn_to_idx, edges) + assert sorted(result[0]) == [0, 1, 2, 3] + assert result[1] == [] + + def test_majority_vote_wins(self): + """A test covering subjects in several communities follows the majority.""" + nodes = [ + self._node(0, "Function", "parse", "src/app.py"), + self._node(1, "Function", "render", "src/app.py"), + self._node(2, "Function", "connect", "src/db.py"), + self._node(3, "Test", "test_everything", "tests/test_all.py", is_test=True), + ] + qn_to_idx = {n.qualified_name: i for i, n in enumerate(nodes)} + idx_to_node = dict(enumerate(nodes)) + clusters = [[0, 1], [2], [3]] + edges = [ + self._edge(1, "TESTED_BY", nodes[0].qualified_name, nodes[3].qualified_name), + self._edge(2, "TESTED_BY", nodes[1].qualified_name, nodes[3].qualified_name), + self._edge(3, "TESTED_BY", nodes[2].qualified_name, nodes[3].qualified_name), + ] + result = _reassign_test_nodes(clusters, idx_to_node, qn_to_idx, edges) + assert 3 in result[0] + assert result[2] == [] + + def test_own_cluster_votes_keep_test_in_place(self): + """A test whose subjects mostly live with it already stays put.""" + nodes = [ + self._node(0, "Function", "parse", "src/app.py"), + self._node(1, "Function", "render", "src/app.py"), + self._node(2, "Function", "connect", "src/db.py"), + self._node(3, "Test", "test_app", "tests/test_app.py", is_test=True), + ] + qn_to_idx = {n.qualified_name: i for i, n in enumerate(nodes)} + idx_to_node = dict(enumerate(nodes)) + clusters = [[0, 1, 3], [2]] + edges = [ + self._edge(1, "TESTED_BY", nodes[0].qualified_name, nodes[3].qualified_name), + self._edge(2, "TESTED_BY", nodes[1].qualified_name, nodes[3].qualified_name), + self._edge(3, "TESTED_BY", nodes[2].qualified_name, nodes[3].qualified_name), + ] + result = _reassign_test_nodes(clusters, idx_to_node, qn_to_idx, edges) + assert 3 in result[0] + assert result[1] == [2] + + def test_no_tested_by_edges_is_a_noop(self): + """CALLS-only graphs come back with the exact same clusters.""" + nodes, qn_to_idx, idx_to_node = self._two_cluster_fixture() + clusters = [[0, 1], [2, 3]] + edges = [ + self._edge(1, "CALLS", nodes[0].qualified_name, nodes[1].qualified_name), + ] + result = _reassign_test_nodes(clusters, idx_to_node, qn_to_idx, edges) + assert result == [[0, 1], [2, 3]] + + def test_edges_between_two_tests_are_ignored(self): + """A TESTED_BY edge with no production endpoint casts no vote.""" + nodes, qn_to_idx, idx_to_node = self._two_cluster_fixture() + clusters = [[0, 1], [2, 3]] + edges = [ + self._edge(1, "TESTED_BY", nodes[2].qualified_name, nodes[3].qualified_name), + ] + result = _reassign_test_nodes(clusters, idx_to_node, qn_to_idx, edges) + assert result == [[0, 1], [2, 3]] + + def test_bare_endpoint_resolves_by_unique_node_name(self): + """TESTED_BY sources inherited from unresolved calls can be bare names.""" + nodes, qn_to_idx, idx_to_node = self._two_cluster_fixture() + clusters = [[0, 1], [2, 3]] + edges = [ + # Source is the bare name "parse", not "src/app.py::parse". + self._edge(1, "TESTED_BY", "parse", nodes[2].qualified_name), + ] + result = _reassign_test_nodes(clusters, idx_to_node, qn_to_idx, edges) + assert 2 in result[0] + assert result[1] == [3] + + def test_ambiguous_bare_endpoint_casts_no_vote(self): + """A bare name shared by several nodes cannot be resolved safely.""" + nodes = [ + self._node(0, "Function", "parse", "src/app.py"), + self._node(1, "Function", "parse", "src/other.py"), + self._node(2, "Test", "test_parse", "tests/test_app.py", is_test=True), + ] + qn_to_idx = {n.qualified_name: i for i, n in enumerate(nodes)} + idx_to_node = dict(enumerate(nodes)) + clusters = [[0], [1], [2]] + edges = [ + self._edge(1, "TESTED_BY", "parse", nodes[2].qualified_name), + ] + result = _reassign_test_nodes(clusters, idx_to_node, qn_to_idx, edges) + assert result == [[0], [1], [2]] + + def test_tie_prefers_current_cluster(self): + """A test split evenly between home and elsewhere stays home.""" + nodes = [ + self._node(0, "Function", "parse", "src/app.py"), + self._node(1, "Function", "render", "src/ui.py"), + self._node(2, "Test", "test_both", "tests/test_all.py", is_test=True), + ] + qn_to_idx = {n.qualified_name: i for i, n in enumerate(nodes)} + idx_to_node = dict(enumerate(nodes)) + clusters = [[0], [1, 2]] + edges = [ + self._edge(1, "TESTED_BY", nodes[0].qualified_name, nodes[2].qualified_name), + self._edge(2, "TESTED_BY", nodes[1].qualified_name, nodes[2].qualified_name), + ] + result = _reassign_test_nodes(clusters, idx_to_node, qn_to_idx, edges) + assert result == [[0], [1, 2]] + + def test_tie_between_foreign_clusters_is_deterministic(self): + """Ties away from home resolve to the lowest cluster, whatever the edge order.""" + nodes = [ + self._node(0, "Function", "parse", "src/app.py"), + self._node(1, "Function", "render", "src/ui.py"), + self._node(2, "Test", "test_both", "tests/test_all.py", is_test=True), + ] + qn_to_idx = {n.qualified_name: i for i, n in enumerate(nodes)} + idx_to_node = dict(enumerate(nodes)) + clusters = [[0], [1], [2]] + edges = [ + self._edge(1, "TESTED_BY", nodes[0].qualified_name, nodes[2].qualified_name), + self._edge(2, "TESTED_BY", nodes[1].qualified_name, nodes[2].qualified_name), + ] + assert _reassign_test_nodes( + clusters, idx_to_node, qn_to_idx, edges + ) == [[0, 2], [1], []] + assert _reassign_test_nodes( + clusters, idx_to_node, qn_to_idx, list(reversed(edges)) + ) == [[0, 2], [1], []] + + @pytest.mark.skipif(not IGRAPH_AVAILABLE, reason="igraph not installed") + def test_leiden_communities_include_tests_with_their_subjects(self): + """End to end: Leiden output places tests with the code they cover.""" + prod = [ + self._node(0, "Function", "parse", "src/app.py"), + self._node(1, "Function", "render", "src/app.py"), + self._node(2, "Function", "layout", "src/app.py"), + ] + tests = [ + self._node(3, "Test", "test_parse", "tests/test_app.py", is_test=True), + self._node(4, "Test", "test_render", "tests/test_app.py", is_test=True), + self._node(5, "Test", "test_layout", "tests/test_app.py", is_test=True), + ] + nodes = prod + tests + eid = 0 + edges = [] + # Dense CALLS inside each group so Leiden separates them... + for group in (prod, tests): + for i in range(len(group)): + for j in range(i + 1, len(group)): + eid += 1 + edges.append(self._edge( + eid, "CALLS", + group[i].qualified_name, group[j].qualified_name, + )) + # ...linked only by weak TESTED_BY pairs. + for p, t in zip(prod, tests): + eid += 1 + edges.append(self._edge( + eid, "TESTED_BY", p.qualified_name, t.qualified_name, + )) + + communities = _detect_leiden(nodes, edges, min_size=2) + + assert communities + by_member = { + m: set(c["members"]) for c in communities for m in c["members"] + } + assert "tests/test_app.py::test_parse" in by_member.get( + "src/app.py::parse", set() + ) + for comm in communities: + members = set(comm["members"]) + if any(m.startswith("tests/") for m in members): + assert any(m.startswith("src/") for m in members)