|
| 1 | +from typing import TYPE_CHECKING, Dict, List, Tuple, Any, Optional |
| 2 | +from collections import defaultdict |
| 3 | + |
| 4 | +from systemrdl.node import RegNode, RegfileNode, AddressableNode, Node |
| 5 | +from systemrdl.walker import RDLListener, RDLWalker |
| 6 | + |
| 7 | +if TYPE_CHECKING: |
| 8 | + from ..exporter import RegblockExporter |
| 9 | + |
| 10 | + |
| 11 | +class BroadcastLogic: |
| 12 | + """ |
| 13 | + Manages broadcast logic for registers and regfiles. |
| 14 | +
|
| 15 | + A broadcaster register/regfile does not have hardware storage. |
| 16 | + Instead, writes to the broadcaster address generate write strobes |
| 17 | + for all target registers. |
| 18 | + """ |
| 19 | + |
| 20 | + def __init__(self, exp: 'RegblockExporter') -> None: |
| 21 | + self.exp = exp |
| 22 | + |
| 23 | + # Maps broadcaster node to list of target nodes |
| 24 | + # List of tuples: (broadcaster_node, [target_nodes]) |
| 25 | + self.broadcast_map: List[Tuple[AddressableNode, List[RegNode]]] = [] |
| 26 | + |
| 27 | + # List of all broadcaster nodes (no hardware implementation) |
| 28 | + self.broadcasters: List[AddressableNode] = [] |
| 29 | + |
| 30 | + # List of all target nodes (receive broadcast writes) |
| 31 | + self.targets: List[RegNode] = [] |
| 32 | + |
| 33 | + # Scan the design to build the broadcast map |
| 34 | + self._scan_design() |
| 35 | + |
| 36 | + def _scan_design(self) -> None: |
| 37 | + """Scan the design to identify broadcasters and their targets""" |
| 38 | + listener = BroadcastScanner(self) |
| 39 | + RDLWalker(skip_not_present=False).walk(self.exp.ds.top_node, listener) |
| 40 | + |
| 41 | + def is_broadcaster(self, node: AddressableNode) -> bool: |
| 42 | + """Check if a node is a broadcaster""" |
| 43 | + return node in self.broadcasters |
| 44 | + |
| 45 | + def is_in_broadcast_scope(self, node: AddressableNode) -> bool: |
| 46 | + """ |
| 47 | + Check if a node is within a broadcast scope. |
| 48 | + This includes: |
| 49 | + 1. The node itself is a broadcaster |
| 50 | + 2. The node is a child of a broadcaster (e.g. reg in a broadcast regfile) |
| 51 | + """ |
| 52 | + curr: Optional[Node] = node |
| 53 | + while curr is not None: |
| 54 | + if isinstance(curr, AddressableNode) and self.is_broadcaster(curr): |
| 55 | + return True |
| 56 | + curr = curr.parent |
| 57 | + return False |
| 58 | + |
| 59 | + def is_target(self, node: RegNode) -> bool: |
| 60 | + """Check if a node is a broadcast target""" |
| 61 | + return node in self.targets |
| 62 | + |
| 63 | + def get_broadcasters_for_target(self, target: RegNode) -> List[AddressableNode]: |
| 64 | + """ |
| 65 | + Get all broadcaster nodes that write to this target. |
| 66 | + """ |
| 67 | + broadcasters: List[AddressableNode] = [] |
| 68 | + |
| 69 | + # Calculate the total number of instances this target represents |
| 70 | + # (including its own array dimensions and any parent array dimensions) |
| 71 | + expected_count = 1 |
| 72 | + curr: Optional[Node] = target |
| 73 | + while curr is not None: |
| 74 | + if isinstance(curr, AddressableNode) and curr.is_array: |
| 75 | + expected_count *= curr.n_elements |
| 76 | + curr = curr.parent |
| 77 | + |
| 78 | + for broadcaster_node, targets in self.broadcast_map: |
| 79 | + # 1. Exact match check |
| 80 | + if target in targets: |
| 81 | + broadcasters.append(broadcaster_node) |
| 82 | + continue |
| 83 | + |
| 84 | + # 2. Array iteration match check |
| 85 | + # If target is a canonical node inside an array loop, it won't be in the map. |
| 86 | + # But its unrolled instances (sharing the same underlying Component 'inst') might be. |
| 87 | + |
| 88 | + # Count how many targets share the same underlying component instance |
| 89 | + match_count = 0 |
| 90 | + for t in targets: |
| 91 | + if t.inst == target.inst: |
| 92 | + # If current_idx is None, this node represents the entire array (all elements) |
| 93 | + if t.current_idx is None: |
| 94 | + match_count += t.n_elements |
| 95 | + else: |
| 96 | + # Otherwise it represents a single element |
| 97 | + match_count += 1 |
| 98 | + |
| 99 | + if match_count == expected_count and expected_count > 0: |
| 100 | + broadcasters.append(broadcaster_node) |
| 101 | + |
| 102 | + return broadcasters |
| 103 | + |
| 104 | + |
| 105 | +class BroadcastScanner(RDLListener): |
| 106 | + """Listener that scans the design to build the broadcast mapping""" |
| 107 | + |
| 108 | + def __init__(self, broadcast_logic: BroadcastLogic) -> None: |
| 109 | + self.bl = broadcast_logic |
| 110 | + # Stack of active broadcaster scopes: List[Tuple[BroadcasterNode, List[TargetNode]]] |
| 111 | + self.scope_stack: List[Tuple[AddressableNode, List[RegfileNode]]] = [] |
| 112 | + |
| 113 | + def enter_Component(self, node: Node) -> None: |
| 114 | + |
| 115 | + # 1. Check if we are inside an active broadcaster scope (Structural Broadcast) |
| 116 | + if self.scope_stack and isinstance(node, RegNode): |
| 117 | + broadcaster_scope, target_scopes = self.scope_stack[-1] |
| 118 | + |
| 119 | + # Calculate relative path from the broadcaster scope |
| 120 | + b_path = broadcaster_scope.get_path() |
| 121 | + node_path = node.get_path() |
| 122 | + |
| 123 | + if node_path.startswith(b_path + '.'): |
| 124 | + rel_path = node_path[len(b_path)+1:] |
| 125 | + |
| 126 | + # Map to each target scope |
| 127 | + for target_scope in target_scopes: |
| 128 | + # Find corresponding node in target |
| 129 | + target_reg = target_scope.find_by_path(rel_path) |
| 130 | + |
| 131 | + if target_reg and isinstance(target_reg, RegNode): |
| 132 | + self._add_broadcast_map(node, target_reg) |
| 133 | + |
| 134 | + # 2. Check if this node defines a new broadcast (is a Broadcaster) |
| 135 | + if isinstance(node, (RegNode, RegfileNode)): |
| 136 | + broadcast_targets = node.get_property('broadcast') |
| 137 | + |
| 138 | + if broadcast_targets is not None: |
| 139 | + self.bl.broadcasters.append(node) |
| 140 | + |
| 141 | + if isinstance(node, RegfileNode): |
| 142 | + # Structural Broadcast: Push to stack to handle children |
| 143 | + # Filter targets to only RegfileNodes (validation ensures they are compatible) |
| 144 | + rf_targets = [t for t in broadcast_targets if isinstance(t, RegfileNode)] |
| 145 | + if rf_targets: |
| 146 | + self.scope_stack.append((node, rf_targets)) |
| 147 | + |
| 148 | + elif isinstance(node, RegNode): |
| 149 | + # Direct Register Broadcast: Map immediately |
| 150 | + for target in broadcast_targets: |
| 151 | + if isinstance(target, RegNode): |
| 152 | + self._add_broadcast_map(node, target) |
| 153 | + |
| 154 | + def exit_Component(self, node: Node) -> None: |
| 155 | + # Pop scope if we are exiting the current broadcaster scope |
| 156 | + if self.scope_stack and self.scope_stack[-1][0] == node: |
| 157 | + self.scope_stack.pop() |
| 158 | + |
| 159 | + def _add_broadcast_map(self, broadcaster: AddressableNode, target: RegNode) -> None: |
| 160 | + """Helper to add a broadcaster -> target mapping""" |
| 161 | + |
| 162 | + # Find existing entry for this broadcaster |
| 163 | + found = False |
| 164 | + for b_node, t_list in self.bl.broadcast_map: |
| 165 | + if b_node == broadcaster: |
| 166 | + if target not in t_list: |
| 167 | + t_list.append(target) |
| 168 | + found = True |
| 169 | + break |
| 170 | + |
| 171 | + if not found: |
| 172 | + self.bl.broadcast_map.append((broadcaster, [target])) |
| 173 | + |
| 174 | + # Also track in global targets list |
| 175 | + if target not in self.bl.targets: |
| 176 | + self.bl.targets.append(target) |
| 177 | + |
| 178 | + def _expand_array(self, node: AddressableNode) -> List[AddressableNode]: |
| 179 | + """ |
| 180 | + For broadcast targets, we expect explicit array indexing in the RDL. |
| 181 | + Just return the node as-is since it's already the specific element. |
| 182 | + """ |
| 183 | + # Simply return the node - RDL references should already be explicit |
| 184 | + return [node] |
0 commit comments