Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 104 additions & 1 deletion code_review_graph/communities.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@
"add", "remove", "make", "build", "from", "to", "for", "with",
"the", "and", "test", "main", "run", "do", "is", "has", "on",
"of", "in", "at", "by", "my", "this", "that", "all", "none",
# BDD test-name grammar ("test_should_return_x_when_y") — frequent
# in test-heavy communities but carries no domain meaning.
"should", "when", "then", "given", "return", "returns", "raise",
"raises", "expect", "expected", "assert", "tests", "be", "it", "if",
"not",
})


Expand All @@ -65,6 +70,11 @@
# ---------------------------------------------------------------------------


def _is_test_node(node: GraphNode) -> bool:
"""True when *node* is a test (by kind or the parser's is_test flag)."""
return node.kind == "Test" or node.is_test


def _generate_community_name(members: list[GraphNode]) -> str:
"""Generate a meaningful name for a community of nodes.

Expand All @@ -77,6 +87,14 @@ def _generate_community_name(members: list[GraphNode]) -> str:
if not members:
return "empty"

# Mixed communities are named from their production members only:
# tests parrot the subject's vocabulary through BDD grammar and live
# under tests/ directories, which pollutes both the keyword and the
# file-prefix signals.
production = [m for m in members if not _is_test_node(m)]
if production and len(production) < len(members):
members = production

# 1. Find common file prefix
file_paths = [m.file_path for m in members]
prefix = _extract_file_prefix(file_paths)
Expand Down Expand Up @@ -233,6 +251,87 @@ def _compute_cohesion(
# ---------------------------------------------------------------------------


def _reassign_test_nodes(
clusters: list[list[int]],
idx_to_node: dict[int, GraphNode],
qn_to_idx: dict[str, int],
edges: list[GraphEdge],
) -> list[list[int]]:
"""Move Test nodes into the community of the code they test.

Shared fixtures and helpers give test files dense internal CALLS
edges, so Leiden tends to cluster tests with each other instead of
with their subjects, producing test-only blobs. Reviews want tests
next to the code they cover, so each Test node is reassigned to the
community holding the majority of its TESTED_BY partners (its own
community counts as a vote, so a test whose subjects mostly live
with it already stays put; ties also prefer the current cluster,
and otherwise resolve to the lowest cluster index so the outcome
never depends on edge order).

The test endpoint of a TESTED_BY edge is identified by node kind,
not edge direction, so this works regardless of the direction the
parser emits.
"""
vertex_cluster: dict[int, int] = {}
for ci, cluster in enumerate(clusters):
for v in cluster:
vertex_cluster[v] = ci

# TESTED_BY endpoints inherited from unresolved cross-file calls can
# be bare names rather than qualified ones; resolve those by node
# name when the name is unambiguous so their tests still vote.
name_to_idx: dict[str, int | None] = {}
for i, node in idx_to_node.items():
name_to_idx[node.name] = None if node.name in name_to_idx else i

def _resolve(qualified: str) -> int | None:
idx = qn_to_idx.get(qualified)
if idx is not None:
return idx
return name_to_idx.get(qualified)

votes: dict[int, Counter[int]] = {}
for e in edges:
if e.kind != "TESTED_BY":
continue
a = _resolve(e.source_qualified)
b = _resolve(e.target_qualified)
if a is None or b is None:
continue
a_is_test = _is_test_node(idx_to_node[a])
b_is_test = _is_test_node(idx_to_node[b])
if a_is_test == b_is_test:
continue
test_v, subject_v = (a, b) if a_is_test else (b, a)
subject_cluster = vertex_cluster.get(subject_v)
if subject_cluster is None:
continue
votes.setdefault(test_v, Counter())[subject_cluster] += 1

if not votes:
return clusters

moved = 0
new_clusters: list[list[int]] = [[] for _ in clusters]
for ci, cluster in enumerate(clusters):
for v in cluster:
target = ci
if v in votes:
counter = votes[v]
best = max(counter.values())
tied = [c for c, n in counter.items() if n == best]
target = ci if ci in tied else min(tied)
if target != ci:
moved += 1
new_clusters[target].append(v)
if moved:
logger.info(
"Reassigned %d test nodes to their subject communities", moved
)
return new_clusters


def _detect_leiden(
nodes: list[GraphNode],
edges: list[GraphEdge],
Expand Down Expand Up @@ -309,8 +408,12 @@ def _detect_leiden(
len(partition),
)

clusters = _reassign_test_nodes(
[list(c) for c in partition], idx_to_node, qn_to_idx, edges
)

pending: list[tuple[list[GraphNode], set[str]]] = []
for cluster_ids in partition:
for cluster_ids in clusters:
if len(cluster_ids) < min_size:
continue
members = [idx_to_node[i] for i in cluster_ids if i in idx_to_node]
Expand Down
Loading