Skip to content

Commit 0f37b44

Browse files
committed
feature: Add script to generate event graph
1 parent bd08931 commit 0f37b44

1 file changed

Lines changed: 312 additions & 0 deletions

File tree

scripts/get_event_graph.py

Lines changed: 312 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,312 @@
1+
# -*- coding: utf-8 -*-
2+
"""
3+
Build Manager <-> Event graph by static AST scanning.
4+
5+
Usage:
6+
python tools/build_event_graph.py
7+
8+
If graphviz 'dot' is available, PNG will be rendered automatically when --png is provided.
9+
"""
10+
11+
from __future__ import annotations
12+
13+
import ast
14+
from dataclasses import dataclass, field
15+
from pathlib import Path
16+
from typing import Dict, List, Optional, Set, Tuple
17+
18+
19+
# ----------------------------
20+
# Helpers
21+
# ----------------------------
22+
23+
24+
def qname(node: ast.AST) -> Optional[str]:
25+
"""Best-effort qualified name for Name/Attribute chains: a.b.c"""
26+
if isinstance(node, ast.Name):
27+
return node.id
28+
if isinstance(node, ast.Attribute):
29+
base = qname(node.value)
30+
if not base:
31+
return node.attr
32+
return f"{base}.{node.attr}"
33+
return None
34+
35+
36+
def simple_name(node: ast.AST) -> Optional[str]:
37+
"""Return last segment of qname() (e.g., a.b.C -> C)."""
38+
n = qname(node)
39+
if not n:
40+
return None
41+
return n.split(".")[-1]
42+
43+
44+
def is_manager_event_handler_decorator(dec: ast.AST) -> bool:
45+
"""
46+
Match: @Manager.event_handler(...)
47+
"""
48+
if not isinstance(dec, ast.Call):
49+
return False
50+
fn = dec.func
51+
if not isinstance(fn, ast.Attribute):
52+
return False
53+
if fn.attr != "event_handler":
54+
return False
55+
base = qname(fn.value)
56+
return base == "Manager"
57+
58+
59+
def extract_event_from_event_handler(dec: ast.Call) -> Optional[str]:
60+
"""
61+
@Manager.event_handler(SomeEvent, ...)
62+
"""
63+
if not dec.args:
64+
return None
65+
return simple_name(dec.args[0])
66+
67+
68+
def is_self_event_bus_publish(call: ast.Call) -> bool:
69+
"""
70+
Match: self.event_bus.publish(...)
71+
"""
72+
fn = call.func
73+
if not isinstance(fn, ast.Attribute):
74+
return False
75+
if fn.attr != "publish":
76+
return False
77+
base = qname(fn.value)
78+
return base == "self.event_bus"
79+
80+
81+
def extract_event_from_publish_arg(
82+
arg0: ast.AST, local_ctor_map: Dict[str, str]
83+
) -> Optional[str]:
84+
"""
85+
publish(SomeEvent(...)) -> SomeEvent
86+
publish(evt) where evt = SomeEvent(...) earlier -> SomeEvent
87+
"""
88+
# Direct constructor call: SomeEvent(...)
89+
if isinstance(arg0, ast.Call):
90+
return simple_name(arg0.func)
91+
92+
# publish(var)
93+
if isinstance(arg0, ast.Name):
94+
return local_ctor_map.get(arg0.id)
95+
96+
return None
97+
98+
99+
# ----------------------------
100+
# Data Model
101+
# ----------------------------
102+
103+
104+
@dataclass
105+
class ManagerInfo:
106+
manager_name: str
107+
file_path: str
108+
handles: Set[str] = field(default_factory=set) # Events received
109+
publishes: Set[str] = field(default_factory=set) # Events published
110+
111+
112+
# ----------------------------
113+
# AST Visitor
114+
# ----------------------------
115+
116+
117+
class FunctionPublishScanner(ast.NodeVisitor):
118+
"""
119+
Scan a function body to infer what event types are published.
120+
121+
Heuristic:
122+
- Track simple assignments: x = SomeEvent(...)
123+
- Detect self.event_bus.publish(arg0) and map arg0 -> event type
124+
"""
125+
126+
def __init__(self) -> None:
127+
self.local_ctor_map: Dict[str, str] = {} # var -> EventClass
128+
self.published: Set[str] = set()
129+
130+
def visit_Assign(self, node: ast.Assign) -> None:
131+
# x = SomeEvent(...)
132+
if isinstance(node.value, ast.Call):
133+
evt = simple_name(node.value.func)
134+
if evt:
135+
for t in node.targets:
136+
if isinstance(t, ast.Name):
137+
self.local_ctor_map[t.id] = evt
138+
self.generic_visit(node)
139+
140+
def visit_AnnAssign(self, node: ast.AnnAssign) -> None:
141+
# x: Type = SomeEvent(...)
142+
if node.value and isinstance(node.value, ast.Call):
143+
evt = simple_name(node.value.func)
144+
if evt and isinstance(node.target, ast.Name):
145+
self.local_ctor_map[node.target.id] = evt
146+
self.generic_visit(node)
147+
148+
def visit_Call(self, node: ast.Call) -> None:
149+
# self.event_bus.publish(...)
150+
if is_self_event_bus_publish(node) and node.args:
151+
evt = extract_event_from_publish_arg(node.args[0], self.local_ctor_map)
152+
if evt:
153+
self.published.add(evt)
154+
self.generic_visit(node)
155+
156+
157+
class ModuleScanner(ast.NodeVisitor):
158+
def __init__(self, file_path: str) -> None:
159+
self.file_path = file_path
160+
self.managers: List[ManagerInfo] = []
161+
162+
def visit_ClassDef(self, node: ast.ClassDef) -> None:
163+
# Identify Manager subclasses (best-effort): class X(Manager) or class X(..., Manager, ...)
164+
is_manager_subclass = any(simple_name(b) == "Manager" for b in node.bases)
165+
if not is_manager_subclass:
166+
self.generic_visit(node)
167+
return
168+
169+
info = ManagerInfo(
170+
manager_name=node.name,
171+
file_path=self.file_path,
172+
)
173+
174+
# 1) event handlers from decorators
175+
for item in node.body:
176+
if isinstance(item, (ast.FunctionDef, ast.AsyncFunctionDef)):
177+
for dec in item.decorator_list:
178+
if is_manager_event_handler_decorator(dec):
179+
evt = extract_event_from_event_handler(dec) # type: ignore[arg-type]
180+
if evt:
181+
info.handles.add(evt)
182+
183+
# 2) published events from functions
184+
for item in node.body:
185+
if isinstance(item, (ast.FunctionDef, ast.AsyncFunctionDef)):
186+
fps = FunctionPublishScanner()
187+
fps.visit(item)
188+
info.publishes |= fps.published
189+
190+
self.managers.append(info)
191+
self.generic_visit(node)
192+
193+
194+
# ----------------------------
195+
# Graph building
196+
# ----------------------------
197+
198+
199+
def scan_python_file(path: Path) -> List[ManagerInfo]:
200+
try:
201+
src = path.read_text(encoding="utf-8")
202+
except UnicodeDecodeError:
203+
src = path.read_text(encoding="utf-8", errors="ignore")
204+
205+
try:
206+
tree = ast.parse(src, filename=str(path))
207+
except SyntaxError:
208+
return []
209+
210+
scanner = ModuleScanner(str(path))
211+
scanner.visit(tree)
212+
return scanner.managers
213+
214+
215+
def scan_root(root: Path) -> List[ManagerInfo]:
216+
managers: List[ManagerInfo] = []
217+
for p in root.rglob("*.py"):
218+
# skip __pycache__ etc
219+
if any(part.startswith(".") for part in p.parts):
220+
continue
221+
if "__pycache__" in p.parts:
222+
continue
223+
managers.extend(scan_python_file(p))
224+
return managers
225+
226+
227+
def build_edges(managers: List[ManagerInfo]) -> List[Tuple[str, str, str]]:
228+
"""
229+
Return edges: (producer_manager, consumer_manager, event_label)
230+
Based on shared event types: producer publishes E, consumer handles E.
231+
"""
232+
producers: Dict[str, Set[str]] = {}
233+
consumers: Dict[str, Set[str]] = {}
234+
235+
for m in managers:
236+
for e in m.publishes:
237+
producers.setdefault(e, set()).add(m.manager_name)
238+
for e in m.handles:
239+
consumers.setdefault(e, set()).add(m.manager_name)
240+
241+
edges: List[Tuple[str, str, str]] = []
242+
for e, ps in producers.items():
243+
cs = consumers.get(e, set())
244+
for p in ps:
245+
for c in cs:
246+
edges.append((p, c, e))
247+
return edges
248+
249+
250+
def to_dot(managers: List[ManagerInfo], edges: List[Tuple[str, str, str]]) -> str:
251+
"""
252+
DOT graph:
253+
- Managers are nodes
254+
- Edges labeled by Event
255+
- Also include per-manager tooltip with file path
256+
"""
257+
# Node ids must be stable and safe; use manager names directly (quoted)
258+
lines: List[str] = []
259+
lines.append("digraph EventGraph {")
260+
lines.append(" rankdir=LR;")
261+
lines.append(' node [shape=box, style="rounded,filled", fillcolor="#f7f7f7"];')
262+
lines.append(" edge [fontsize=10];")
263+
lines.append("")
264+
265+
# Nodes
266+
for m in sorted(managers, key=lambda x: x.manager_name):
267+
label = m.manager_name
268+
tooltip = m.file_path.replace("\\", "/")
269+
lines.append(f' "{m.manager_name}" [label="{label}", tooltip="{tooltip}"];')
270+
271+
lines.append("")
272+
273+
# Edges: group same (p,c) with multiple events into one label for readability
274+
grouped: Dict[Tuple[str, str], List[str]] = {}
275+
for p, c, e in edges:
276+
grouped.setdefault((p, c), []).append(e)
277+
278+
for (p, c), evts in sorted(grouped.items(), key=lambda x: (x[0][0], x[0][1])):
279+
evts_sorted = sorted(set(evts))
280+
# Keep label not too long; if too many, truncate.
281+
if len(evts_sorted) > 6:
282+
label = "\\n".join(evts_sorted[:6]) + f"\\n...(+{len(evts_sorted)-6})"
283+
else:
284+
label = "\\n".join(evts_sorted)
285+
lines.append(f' "{p}" -> "{c}" [label="{label}"];')
286+
287+
lines.append("}")
288+
return "\n".join(lines)
289+
290+
291+
# ----------------------------
292+
# CLI
293+
# ----------------------------
294+
295+
296+
def main():
297+
298+
root = Path("src/xtalk/serving/modules").resolve()
299+
out = Path("logs/event_graph.dot").resolve()
300+
301+
managers = scan_root(root)
302+
edges = build_edges(managers)
303+
dot = to_dot(managers, edges)
304+
out.write_text(dot, encoding="utf-8")
305+
306+
print(f"[ok] scanned managers: {len(managers)}")
307+
print(f"[ok] inferred edges: {len(edges)}")
308+
print(f"[ok] wrote dot: {out}")
309+
310+
311+
if __name__ == "__main__":
312+
main()

0 commit comments

Comments
 (0)