-
Notifications
You must be signed in to change notification settings - Fork 183
Expand file tree
/
Copy pathworker.py
More file actions
249 lines (215 loc) · 10.3 KB
/
worker.py
File metadata and controls
249 lines (215 loc) · 10.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
# worker.py -*- Python -*-
#
# This file is licensed under the Apache License v2.0 with LLVM Exceptions.
# See https://llvm.org/LICENSE.txt for license information.
# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
#
# (c) Copyright 2024 Advanced Micro Devices, Inc.
"""Worker and WorkerRuntimeBarrier: compute-core tasks and runtime synchronization primitives."""
import sys
from typing import Callable
from .. import ir # type: ignore
from ..dialects.aie import core, lock, use_lock
from ..dialects.aiex import set_lock_value, LockAction
from ..helpers.dialects.scf import _for as range_
from .device import Tile, AnyComputeTile
from ..dialects._aie_enum_gen import AIETileType # type: ignore
from .dataflow.objectfifo import ObjectFifoHandle, ObjectFifo
from .dataflow.endpoint import ObjectFifoEndpoint
from .buffer import Buffer
from .resolvable import Resolvable
class Worker(ObjectFifoEndpoint):
"""A task to be run on an AIE compute core.
A Worker takes a ``core_fn`` callable and the arguments it needs (ObjectFIFO handles,
Buffers, Kernels, etc.). Each Worker is placed on a single compute tile, either
explicitly via ``tile`` or automatically by the ``--aie-place-tiles`` compiler pass.
"""
def __init__(
self,
core_fn: Callable | None,
fn_args: list = [],
tile: Tile = AnyComputeTile,
while_true: bool = True,
stack_size: int = None,
allocation_scheme: str = None,
trace: int = None,
trace_events: list = None,
dynamic_objfifo_lowering: bool | None = None,
):
"""Construct a Worker
Args:
core_fn (Callable | None): The task to run on a core. If None, a busy-loop (`while(true): pass`) core will be generated.
fn_args (list, optional): Pointers to arguments, which should include all context the core_fn needs to run. Defaults to [].
tile (Tile, optional): The compute tile for the Worker. Defaults to AnyComputeTile.
while_true (bool, optional): If true, will wrap the core_fn in a while(true) loop to ensure it runs until reconfiguration. Defaults to True.
stack_size (int, optional): The stack_size in bytes to be allocated for the worker. Defaults to 1024 bytes.
allocation_scheme (str, optional): The memory allocation scheme to use for the Worker, either 'basic-sequential' or 'bank-aware'. If None, defaults to bank-aware.
Will override any allocation scheme set on the tile.
trace (int, optional): If >0, enable tracing for this worker.
trace_events (list | None, optional): Custom list of trace events for this worker. Defaults to None.
dynamic_objfifo_lowering (bool | None, optional): Per-core override for the
``aie-objectFifo-stateful-transform`` pass's lowering choice. ``True`` forces
dynamic (loop-preserving) lowering for this core; ``False`` forces static
LCM-based unrolling. ``None`` (default) leaves the choice to the compiler's
global ``--dynamic-objFifos`` flag. Note: the per-core attribute is only
honored when the global flag is ``false``; when global is ``true`` the
attribute is ignored. Defaults to None.
Raises:
ValueError: Parameters are validated.
"""
tile = tile.copy()
if tile.tile_type is not None and tile.tile_type != AIETileType.CoreTile:
raise ValueError(
f"Worker requires a compute tile, but got tile_type={tile.tile_type}"
)
tile.tile_type = AIETileType.CoreTile
self._tile = tile
self._while_true = while_true
self.stack_size = stack_size
self.allocation_scheme = allocation_scheme
self._dynamic_objfifo_lowering = dynamic_objfifo_lowering
if allocation_scheme:
self._tile.allocation_scheme = allocation_scheme
self.trace = trace
self.trace_events = trace_events
# If no core_fn is given, make a simple while(true) loop.
if core_fn is None:
def do_nothing_core_fun(*args) -> None:
for _ in range_(sys.maxsize):
pass
self.core_fn = do_nothing_core_fun
else:
self.core_fn = core_fn
self.fn_args = fn_args
self._fifos = []
self._buffers = []
self._barriers = []
# CascadeFlow objects whose source is this Worker. Populated by
# CascadeFlow(src, dst).__init__ and consumed by Program.resolve()
# to emit aie.cascade_flow ops after worker placement.
self._outgoing_cascades: list = []
# Check arguments to the core. Some information is saved for resolution.
for arg in self.fn_args:
if isinstance(arg, ObjectFifoHandle):
arg.endpoint = self
self._fifos.append(arg)
elif isinstance(arg, Buffer):
self._buffers.append(arg)
# Buffers are placed on the same tile as the Worker
if arg._tile is not None and arg._tile is not self._tile:
raise ValueError(
f"Buffer '{arg._name}' is already placed on {arg._tile}; "
f"cannot reassign to {self._tile}"
)
arg._tile = self._tile
elif isinstance(arg, ObjectFifo):
# This is an easy error to make, so we catch it early
raise ValueError(
"Cannot give an ObjectFifo directly to a worker; "
"must give an ObjectFifoHandle obtained through "
"ObjectFifo.prod() or ObjectFifo.cons()"
)
elif isinstance(arg, WorkerRuntimeBarrier):
self._barriers.append(arg)
# Kernel/ExternalFunction instances are valid fn_args — they resolve to
# func.call ops when invoked inside core_fn and carry link_with on their
# func.func declaration. Other unrecognized args are assumed to be
# metaprogramming values (Python scalars, etc.).
@property
def fifos(self) -> list[ObjectFifoHandle]:
"""Returns a list of ObjectFifoHandles given to the Worker via fn_args.
Returns:
list[ObjectFifoHandle]: ObjectFifoHandles used by the Worker.
"""
return self._fifos.copy()
@property
def buffers(self) -> list[Buffer]:
"""Returns a list of Buffers given to the Worker via fn_args.
Returns:
list[Buffer]: Buffer used by the Worker.
"""
return self._buffers.copy()
def resolve(
self,
loc: ir.Location | None = None,
ip: ir.InsertionPoint | None = None,
) -> None:
if not self._tile:
raise ValueError("Must place Worker before it can be resolved.")
my_tile = self._tile.op
# Create the necessary locks for the core operation to synchronize with the runtime sequence
# and register them in the corresponding barriers.
for barrier in self._barriers:
l = lock(my_tile)
barrier._add_worker_lock(l)
@core(
my_tile,
stack_size=self.stack_size,
dynamic_objfifo_lowering=self._dynamic_objfifo_lowering,
)
def core_body():
# Always wrap in an scf.for so the lowered MLIR matches expectations
# downstream (placed-API uses the same pattern with bound=1 for
# single-shot workers). Using Python range(1) here would emit the
# body inline with no scf.for wrapper, which the dataflow lowerer
# treats differently and can cause runtime hangs.
for _ in range_(sys.maxsize if self._while_true else 1):
self.core_fn(*self.fn_args)
class WorkerRuntimeBarrier:
"""A barrier allowing individual workers to synchronize with the runtime sequence."""
def __init__(self, initial_value: int = 0):
"""Initialize a WorkerRuntimeBarrier.
Args:
initial_value (int, optional): The initial lock value. Defaults to 0.
"""
self.initial_value = initial_value
self.worker_locks = []
def wait_for_value(self, value: int):
"""
Should be called from inside a core function.
Wait for the barrier to be set to `value`.
Args:
value (int): The value to wait for.
"""
# Here this is assuming that the we are currently placing the last added lock
# And therefore that wait_for_value operations are placed just after their corresponding Worker...
# This is a pretty bad assumption, think about an alternative way to solve this
if len(self.worker_locks) == 0:
raise ValueError(
"No workers have been registered for this barrier. Need to pass the barrier as an argument to the worker."
)
use_lock(self.worker_locks[-1], LockAction.Acquire, value=value)
def _add_worker_lock(self, lock):
"""Register an additional lock in the barrier."""
self.worker_locks.append(lock)
def _set_barrier_value(self, value: int):
"""Set the value of the barrier."""
for lock in self.worker_locks:
set_lock_value(lock, value)
def release_with_value(self, value: int):
"""
Release and decrement the barrier by `value` inside the core.
Args:
value (int): The value to decrement by in Release.
"""
if len(self.worker_locks) == 0:
raise ValueError(
"No workers have been registered for this barrier. Need to pass the barrier as an argument to the worker."
)
use_lock(self.worker_locks[-1], LockAction.Release, value=value)
class _BarrierSetOp(Resolvable):
"""A resolvable instance of a WorkerRuntimeBarrier. This should not be used directly."""
def __init__(self, barrier: WorkerRuntimeBarrier, value: int):
"""Construct a _BarrierSetOp.
Args:
barrier (WorkerRuntimeBarrier): The barrier whose value will be set.
value (int): The value to set.
"""
self.barrier: WorkerRuntimeBarrier = barrier
self.value: int = value
def resolve(
self,
loc: ir.Location | None = None,
ip: ir.InsertionPoint | None = None,
) -> None:
self.barrier._set_barrier_value(self.value)