-
Notifications
You must be signed in to change notification settings - Fork 183
Expand file tree
/
Copy pathobjectfifo.py
More file actions
834 lines (724 loc) · 34.9 KB
/
objectfifo.py
File metadata and controls
834 lines (724 loc) · 34.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
# objectfifo.py -*- Python -*-
#
# This file is licensed under the Apache License v2.0 with LLVM Exceptions.
# See https://llvm.org/LICENSE.txt for license information.
# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
#
# (c) Copyright 2024 Advanced Micro Devices, Inc.
from __future__ import annotations
import numpy as np
from typing import Sequence
from ... import ir # type: ignore
from ...dialects._aie_enum_gen import AIETileType, ObjectFifoPort # type: ignore
from ...dialects._aie_ops_gen import ObjectFifoCreateOp # type: ignore
from ...dialects.aie import object_fifo, object_fifo_link
from ...helpers.util import (
np_ndarray_type_to_memref_type,
np_ndarray_type_get_dtype,
np_ndarray_type_get_shape,
)
from ...util import single_elem_or_list_to_list
from ..resolvable import Resolvable, NotResolvedError
from .endpoint import ObjectFifoEndpoint
from ..device import Device, Tile, AnyMemTile
class ObjectFifo(Resolvable):
"""An ObjectFifo is a method of representing synchronized, explicit dataflow between
IRON program components such as Workers and the Runtime.
Internally, it is a circular buffer with a given depth and type of buffer. The
users of an ObjectFifo are explicitly either a Producer or a Consumer, and each
user has a Placeable endpoint.
"""
# Used to generate unique ObjectFifo names when none is provided.
__of_index = 0
def __init__(
self,
obj_type: type[np.ndarray],
depth: int | None = 2,
name: str | None = None,
dims_to_stream: list[Sequence[int]] | None = None,
dims_from_stream_per_cons: list[Sequence[int]] | None = None,
plio: bool = False,
pad_dimensions: list[Sequence[int]] | None = None,
disable_synchronization: bool = False,
repeat_count: int | None = None,
delegate_tile: PlacementTile | None = None,
via_DMA: bool = False,
init_values: list[np.ndarray] | None = None,
):
"""Construct an ObjectFifo.
Args:
obj_type (type[np.ndarray]): The type of each buffer in the ObjectFifo
depth (int | None, optional): The default depth of the ObjectFifo endpoints. Defaults to 2.
name (str | None, optional): The name of the ObjectFifo. If None is given, a unique name will be generated. Defaults to None.
dims_to_stream (list[Sequence[int]] | None, optional): Data layout transformations applied when data is pushed onto the AXI stream, described as pairs of (size, stride) from highest to lowest dimension. Defaults to None.
dims_from_stream_per_cons (list[Sequence[int]] | None, optional): List of data layout transformations applied by each consumer when data is read from the AXI stream, described as pairs of (size, stride) from highest to lowest dimension. Defaults to None.
plio (bool, optional): Whether the ObjectFifo uses PLIO connections. Defaults to False.
disable_synchronization (bool, optional): When True, disables lock-based synchronization on the ObjectFifo. Defaults to False.
repeat_count (int | None, optional): If set, causes the MemTile DMA to replay the buffer descriptor this many times without a new DMA transfer from L3. Distinct from ``iter_count`` (BD-chain iteration count). Defaults to None.
delegate_tile (PlacementTile | None, optional): Shared-memory delegate tile. When set, the ObjectFifo's underlying buffer pool is allocated on this tile's memory module instead of the default placement. Lowers to ``aie.objectfifo.allocate``. *Only valid when both producer and consumer have shared-memory access to the delegate tile* (e.g. self-loop fifos where prod == cons, or fifos between adjacent tiles spilling to a neighboring MemTile). The delegate is the storage location, not a producer- or consumer-side concept; the underlying op verifier rejects this if either endpoint cannot share memory with the delegate. Defaults to None.
init_values (list[np.ndarray] | None, optional): Per-buffer static initial values for the producer endpoint. One ndarray per producer-side buffer; the producer tile must be able to hold static data at design startup (e.g. a MemTile). Lowers to the ``initValues`` attribute on the underlying ``aie.objectfifo`` op. Defaults to None.
Raises:
ValueError: If ``depth`` is provided and is less than 1.
"""
self._depth = depth
if self._depth is not None and self._depth < 1:
raise ValueError(
f"Default ObjectFifo depth must be > 0, but got {self._depth}"
)
self._obj_type = obj_type
self._dims_to_stream = dims_to_stream
self._dims_from_stream_per_cons = dims_from_stream_per_cons
self._plio = plio
self._pad_dimensions = pad_dimensions
if name is None:
self.name = f"of{ObjectFifo.__get_index()}"
else:
self.name = name
self._op: ObjectFifoCreateOp | None = None
self._prod: ObjectFifoHandle | None = None
self._cons: list[ObjectFifoHandle] = []
self._resolving = False
self._iter_count: int | None = None
self._repeat_count: int | None = repeat_count
self._disable_synchronization: bool = disable_synchronization
# Delegate tile for shared-memory buffer placement (lowers to aie.objectfifo.allocate).
# Must be resolved before resolve() runs — Program.resolve() picks this up via
# ObjectFifo._delegate_tile when collecting tiles to assign MLIR ops to.
self._delegate_tile: PlacementTile | None = delegate_tile
self._via_DMA: bool = via_DMA
self._init_values: list[np.ndarray] | None = init_values
@classmethod
def __get_index(cls) -> int:
idx = cls.__of_index
cls.__of_index += 1
return idx
@property
def depth(self) -> int:
"""The default depth of the ObjectFifo. This may be overridden by an ObjectFifoHandle upon construction."""
return self._depth
@property
def dims_from_stream_per_cons(self) -> list[Sequence[int]]:
"""The default dimensions from stream per consumer value. This may be overridden by an ObjectFifoHandle of type consumer."""
return self._dims_from_stream_per_cons
@property
def dims_to_stream(self) -> list[Sequence[int]]:
"""The dimensions to stream value. This will be shared by the ObjectFifoHandle of type producer."""
return self._dims_to_stream
@property
def op(self) -> ObjectFifoCreateOp:
if self._op is None:
raise NotResolvedError()
return self._op
@property
def shape(self) -> Sequence[int]:
"""The shape of each buffer belonging to the ObjectFifo"""
return np_ndarray_type_get_shape(self._obj_type)
@property
def dtype(self) -> np.dtype:
"""The per-element data type of each element in each buffer belonging to the ObjectFifo"""
return np_ndarray_type_get_dtype(self._obj_type)
@property
def obj_type(self) -> type[np.ndarray]:
"""The tensor type of each buffer belonging to the ObjectFifo"""
return self._obj_type
def set_iter_count(self, iter_count: int):
"""Set iteration count for DMA BD (Buffer Descriptor) chaining on MemTile for the ObjectFifo.
Args:
iter_count (int): Number of forward chain iterations.
- Must be in range [1, 256]: Forward chain with specified number of iterations
Raises:
ValueError: If iter_count is outside the valid range [1, 256]
"""
if not iter_count or iter_count < 1 or iter_count > 256:
raise ValueError("Iter count must be in [1, 256] range.")
self._iter_count = iter_count
def __str__(self) -> str:
prod_endpoint = None
if self._prod:
prod_endpoint = self._prod.endpoint
return (
f"{self.__class__.__name__}({self._obj_type}, "
f"depth={self.depth}, name='{self.name}', "
f"prod={prod_endpoint}, cons={[c.endpoint for c in self._cons]})"
)
def prod(self, depth: int | None = None) -> ObjectFifoHandle:
"""Returns an ObjectFifoHandle of type producer. Each ObjectFifo may have only one producer
handle, so if one already exists, a new reference to this handle will be returned.
Args:
depth (int | None, optional): The depth of the buffers at the endpoint corresponding to the producer handle. Defaults to None.
Raises:
ValueError: Arguments are validated
ValueError: If depth was not specified on ObjectFifo construction, depth must be specified here.
Returns:
ObjectFifoHandle: The producer handle to this ObjectFifo.
"""
if self._prod:
if depth is None:
if self._depth is None:
raise ValueError(f"If depth is None, then depth must be specified.")
else:
depth = self._depth
elif depth < 1:
raise ValueError(f"Depth must be > 1, but got {depth}")
else:
self._prod = ObjectFifoHandle(self, True, depth)
return self._prod
def cons(
self,
depth: int | None = None,
dims_from_stream: list[Sequence[int]] | None = None,
) -> ObjectFifoHandle:
"""Returns an ObjectFifoHandle of type consumer. Each ObjectFifo may have multiple consumers, so this
will return a new consumer handle every time it is called.
Args:
depth (int | None, optional): The depth of the buffers at the endpoint corresponding to this consumer handle. Defaults to None.
dims_from_stream (list[Sequence[int]] | None, optional): Dimensions from stream for this consumer. Defaults to None.
Raises:
ValueError: Arguments are validated
Returns:
ObjectFifoHandle: A consumer handle to this ObjectFifo.
"""
if depth is None:
if self._depth is None:
raise ValueError(f"If depth is None, then depth must be specified.")
else:
depth = self._depth
if dims_from_stream is None:
dims_from_stream = self._dims_from_stream_per_cons
self._cons.append(
ObjectFifoHandle(
self, is_prod=False, depth=depth, dims_from_stream=dims_from_stream
)
)
return self._cons[-1]
def tiles(self, cons_only: bool = False) -> list[Tile]:
"""The list of placement tiles corresponding to the endpoints of all handles of this ObjectFifo
Raises:
ValueError: A producer handle must be constructed.
ValueError: At least one consumer handle must be constructed.
Returns:
list[Tile]: A list of tiles of the endpoints of this ObjectFifo.
"""
tiles = []
if not cons_only:
if self._prod == None:
raise ValueError(
"Cannot return prod.tile.op because prod was not created."
)
tiles += [self._prod.endpoint.tile]
if self._cons == []:
raise ValueError("Cannot return cons tiles because cons were not created.")
tiles += [cons.endpoint.tile for cons in self._cons]
return tiles
def can_used_shared_mem(self, device: Device, cons_only: bool = False) -> bool:
"""Check if all endpoints have legal memory affinity."""
tiles = self.tiles(cons_only=cons_only)
for t in tiles:
if device.is_mem_accessible(t, tiles):
return True
return False
def _prod_tile_op(self) -> Tile:
if self._prod == None:
raise ValueError(
f"Cannot return prod.tile.op for ObjectFifo {self.name} because prod was not created."
)
return self._prod.endpoint.tile.op
def _cons_tiles_ops(self) -> list[Tile]:
if len(self._cons) < 1:
raise ValueError(
f"Cannot return cons.tile.op for ObjectFifo {self.name} because no consumers were created."
)
return [cons.endpoint.tile.op for cons in self._cons]
def _get_depths(self) -> int | list[int]:
if not self._prod:
raise ValueError(
"Cannot return depths since prod ObjectFifoHandle is not created."
)
if len(self._cons) == 0:
raise ValueError(
"Cannot return depths since no cons ObjectFifoHandles are created."
)
depths = [self._prod.depth] + [con.depth for con in self._cons]
if len(set(depths)) == 1:
return depths[0]
return depths
def _get_endpoint(self, is_prod: bool) -> list[ObjectFifoEndpoint]:
if is_prod:
if self._prod:
return [self._prod.endpoint]
else:
raise ValueError(f"Prod endpoint not set for {self}")
else:
if len(self._cons) < 1:
raise ValueError(f"Cons endpoint not set for {self}")
return [con.endpoint for con in self._cons]
def resolve(
self,
loc: ir.Location | None = None,
ip: ir.InsertionPoint | None = None,
) -> None:
if not self._resolving:
self._resolving = True
dims_from_stream_per_cons = [
con.dims_from_stream if con.dims_from_stream else []
for con in self._cons
]
self._op = object_fifo(
self.name,
self._prod_tile_op(),
self._cons_tiles_ops(),
self._get_depths(),
np_ndarray_type_to_memref_type(self._obj_type),
dimensionsToStream=self._dims_to_stream,
dimensionsFromStreamPerConsumer=dims_from_stream_per_cons,
plio=self._plio,
padDimensions=self._pad_dimensions,
iter_count=self._iter_count,
disable_synchronization=self._disable_synchronization or None,
via_DMA=self._via_DMA or None,
initValues=self._init_values,
)
if self._repeat_count is not None:
self._op.set_repeat_count(self._repeat_count)
# Shared-memory delegate: redirect the fifo's buffer pool to a tile
# whose memory module is shared with both prod and cons. See the
# delegate_tile docstring on ObjectFifo for the constraint.
if self._delegate_tile is not None:
self._op.allocate(self._delegate_tile.op)
if isinstance(self._prod.endpoint, ObjectFifoLink):
self._prod.endpoint.resolve()
for con in self._cons:
if isinstance(con.endpoint, ObjectFifoLink):
con.endpoint.resolve()
def _acquire(
self,
port: ObjectFifoPort,
num_elem: int,
):
if num_elem < 1:
raise ValueError("Must consume at least one element")
return self.op.acquire(port, num_elem)
def _release(
self,
port: ObjectFifoPort,
num_elem: int,
):
if num_elem < 1:
raise ValueError("Must produce at least one element")
self.op.release(port, num_elem)
class ObjectFifoHandle(Resolvable):
"""This class represents a handle to an ObjectFifo. A handle may be of type Producer or type Consumer."""
def __init__(
self,
of: ObjectFifo,
is_prod: bool,
depth: int | None = None,
dims_from_stream: list[Sequence[int]] | None = None,
):
"""Construct an ObjectFifoHandle
Args:
of (ObjectFifo): The ObjectFifo to construct the handle for.
is_prod (bool): Whether the handle should be producer or consumer handle.
depth (int | None, optional): The depth of the ObjectFifo at this endpoint. Defaults to None.
dims_from_stream (list[Sequence[int]] | None, optional): A unique dimensions from stream. This is only valid for consumer handles. Defaults to None.
Raises:
ValueError: Arguments are validated.
"""
if depth is None:
if of.depth:
depth = of.depth
else:
raise ValueError(
"Must specify either ObjectFifoHandle depth or ObjectFifo default depth; both are None."
)
if depth < 1:
raise ValueError(f"Depth must be > 0 but is {depth}")
self._port: ObjectFifoPort = (
ObjectFifoPort.Produce if is_prod else ObjectFifoPort.Consume
)
if is_prod and dims_from_stream:
raise ValueError("Can only specify dims_from_stream for cons handles")
elif not is_prod and not dims_from_stream:
dims_from_stream = of.dims_from_stream_per_cons
self._is_prod = is_prod
self._object_fifo = of
self._depth = depth
self._endpoint = None
self._dims_from_stream = dims_from_stream
def acquire(
self,
num_elem: int,
) -> list:
"""Acquire access to some elements of the ObjectFifo using ObjectFifo synchronization to moderate access.
Args:
num_elem (int): Number of elements to acquire. If some elements are already acquired, it will only require the additional elements needed
to acquire a total of num_elem.
Raises:
ValueError: Number of elements cannot exceed ObjectFifo depth.
Returns:
list: A indexable handle to the acquired elements.
"""
if self._depth < num_elem:
raise ValueError(
f"Number of elements to acquire {num_elem} must be smaller than depth {self._depth}"
)
return self._object_fifo._acquire(self._port, num_elem)
def release(
self,
num_elem: int,
) -> None:
"""Release access to some elements of the ObjectFifo. This the other endpoint of the ObjectFifo to acquire them.
Args:
num_elem (int): Number of elements to release.
Raises:
ValueError: Number of elements cannot exceed ObjectFifo depth.
"""
if self._depth < num_elem:
raise ValueError(
f"Number of elements to release {num_elem} must be smaller than depth {self._depth}"
)
self._object_fifo._release(self._port, num_elem)
@property
def name(self) -> str:
"""The name of the ObjectFifo"""
return self._object_fifo.name
@property
def op(self) -> ObjectFifoCreateOp:
return self._object_fifo.op
@property
def obj_type(self) -> type[np.ndarray]:
"""The per-buffer type of the ObjectFifo"""
return self._object_fifo.obj_type
@property
def shape(self) -> Sequence[int]:
"""The per-buffer shape of the ObjectFifo"""
return self._object_fifo.shape
@property
def dtype(self) -> np.dtype:
"""The per-element datatype of the ObjectFifo"""
return self._object_fifo.dtype
@property
def handle_type(self) -> str:
"""A string referencing the type of this ObjectFifoHandle."""
if self._is_prod:
return "prod"
return "cons"
@property
def depth(self) -> int:
"""The depth of this ObjectFifoHandle"""
return self._depth
@property
def dims_from_stream(self) -> list[Sequence[int]]:
"""The dimensions from stream of a consumer ObjectFifoHandle"""
if self._is_prod:
raise ValueError("prod ObjectFifoHandles cannot have dims_from_stream")
return self._dims_from_stream
@property
def endpoint(self) -> ObjectFifoEndpoint | None:
"""The endpoint of this ObjectFifoHandle"""
return self._endpoint
def __str__(self) -> str:
my_str = f"ObjectFifoHandle({self.handle_type}, {self.depth}, "
if not self._is_prod:
my_str += f"{self.dims_from_stream}, "
my_str += f"{self._object_fifo})"
return my_str
@endpoint.setter
def endpoint(self, endpoint: ObjectFifoEndpoint) -> None:
if self._endpoint and self._endpoint != endpoint:
raise ValueError(
f"Endpoint already set for ObjectFifoHandle {self.name}.{self.handle_type}: "
f"Set to {self._endpoint}, trying to set to {endpoint}"
)
self._endpoint = endpoint
def all_of_endpoints(self) -> list[ObjectFifoEndpoint]:
"""All endpoints belonging to an ObjectFifo"""
return self._object_fifo._get_endpoint(
is_prod=True
) + self._object_fifo._get_endpoint(is_prod=False)
def join(
self,
offsets: list[int],
tile: Tile = AnyMemTile,
depths: list[int] | None = None,
obj_types: list[type[np.ndarray]] = None,
names: list[str] | None = None,
dims_to_stream: list[list[Sequence[int] | None]] | None = None,
dims_from_stream: list[list[Sequence[int] | None]] | None = None,
plio: bool = False,
repeat_counts: list[int | None] | None = None,
) -> list[ObjectFifo]:
"""Construct multiple ObjectFifos which feed data into a ObjectFifoHandle.
Note that this function is only valid for producer ObjectFifoHandles.
Args:
offsets (list[int]): Offsets into the current producer, each corresponding to a new consumer.
tile (Tile, optional): The tile where the Join operation occurs. Defaults to AnyMemTile.
depths (list[int] | None, optional): The depth of each new ObjectFifo. Defaults to None.
obj_types (list[type[np.ndarray]], optional): The type of the buffers corresponding to each new ObjectFifo. Defaults to None.
names (list[str] | None, optional): The name of each new ObjectFifo. If not given, unique names will be generated. Defaults to None.
dims_to_stream (list[list[Sequence[int] | None]] | None, optional): The dimensionsToStream to assign to each new ObjectFifo. Defaults to None.
dims_from_stream (list[list[Sequence[int] | None]] | None, optional): The dimensionsFromStream to assign to each new ObjectFifo consumer. Defaults to None.
plio (bool, optional): Set plio on each new ObjectFifo. Defaults to False.
repeat_counts (list[int | None] | None, optional): Per-sub-fifo MemTile DMA repeat count (see ObjectFifo.repeat_count). Defaults to None.
Raises:
ValueError: Arguments are validated
Returns:
list[ObjectFifo]: A list of newly constructed ObjectFifos whose consumers are used in this join() operation.
"""
if not self._is_prod:
raise ValueError(f"Cannot join() a {self.handle_type} ObjectFifoHandle")
num_subfifos = len(offsets)
if depths is None:
depths = [self.depth] * num_subfifos
elif len(depths) != num_subfifos:
raise ValueError("Number of depths does not match number of offsets")
if obj_types is None:
obj_types = [self._object_fifo.obj_type] * num_subfifos
elif len(obj_types) != num_subfifos:
raise ValueError("Number of obj_types does not match number of offsets")
if names is None:
names = [self._object_fifo.name + f"_join{i}" for i in range(num_subfifos)]
elif len(names) != num_subfifos:
raise ValueError("Number of names does not match number of offsets")
if dims_to_stream is None:
dims_to_stream = [[]] * num_subfifos
elif len(dims_to_stream) != num_subfifos:
raise ValueError(
"Number of dims to stream does not match number of offsets"
)
if dims_from_stream is None:
dims_from_stream = [[]] * num_subfifos
elif dims_from_stream and len(dims_from_stream) != num_subfifos:
raise ValueError(
"Number of dims_from_stream does not match number of offsets"
)
if repeat_counts is None:
repeat_counts = [None] * num_subfifos
elif len(repeat_counts) != num_subfifos:
raise ValueError("Number of repeat_counts does not match number of offsets")
# Create subfifos
subfifos = []
for i in range(num_subfifos):
subfifos.append(
ObjectFifo(
obj_types[i],
name=names[i],
depth=depths[i],
dims_to_stream=dims_to_stream[i],
plio=plio,
repeat_count=repeat_counts[i],
)
)
subfifo_cons = [
s.cons(depth=depths[i], dims_from_stream=dims_from_stream[i])
for s in subfifos
]
_ = ObjectFifoLink(subfifo_cons, self, tile, offsets, [])
return subfifos
def split(
self,
offsets: list[int],
tile: Tile = AnyMemTile,
depths: list[int] | None = None,
obj_types: list[type[np.ndarray]] = None,
names: list[str] | None = None,
dims_to_stream: list[list[Sequence[int]]] | None = None,
dims_from_stream: list[list[Sequence[int]]] | None = None,
plio: bool = False,
repeat_counts: list[int | None] | None = None,
) -> list[ObjectFifo]:
"""Split the data from an ObjectFifoConsumer handle by sending it to producers in N newly constructed ObjectFifos.
Note this operation is only valid for ObjectFifoHandles of type consumer.
Args:
offsets (list[int]): The offset into the current consumer for each new ObjectFifo producer.
tile (Tile, optional): The tile where the Split operation takes place. Defaults to AnyMemTile.
depths (list[int] | None, optional): The depth of each new ObjectFifo. Defaults to None.
obj_types (list[type[np.ndarray]], optional): The buffer type of each new ObjectFifo. Defaults to None.
names (list[str] | None, optional): The name of each new ObjectFifo. If not given, a unique name will be generated. Defaults to None.
dims_to_stream (list[list[Sequence[int]]] | None, optional): The dimensions to stream for each new ObjectFifo. Defaults to None.
dims_from_stream (list[list[Sequence[int]]] | None, optional): The dimensions from stream for each new ObjectFifo. Defaults to None.
plio (bool, optional): Set plio on each new ObjectFifo. Defaults to False.
repeat_counts (list[int | None] | None, optional): Per-sub-fifo MemTile DMA repeat count (see ObjectFifo.repeat_count). Defaults to None.
Raises:
ValueError: Arguments are validated.
Returns:
list[ObjectFifo]: A list of newly constructed ObjectFifos whose producers are used in this split() operation.
"""
if self._is_prod:
raise ValueError(f"Cannot split() a {self.handle_type} ObjectFifoHandle")
num_subfifos = len(offsets)
if depths is None:
depths = [self.depth] * num_subfifos
elif len(depths) != num_subfifos:
raise ValueError("Number of depths does not match number of offsets")
if obj_types is None:
obj_types = [self._object_fifo.obj_type] * num_subfifos
elif len(obj_types) != num_subfifos:
raise ValueError("Number of obj_types does not match number of offsets")
if names is None:
names = [self._object_fifo.name + f"_split{i}" for i in range(num_subfifos)]
elif len(names) != num_subfifos:
raise ValueError("Number of names does not match number of offsets")
if dims_to_stream is None:
dims_to_stream = [[]] * num_subfifos
elif len(dims_to_stream) != num_subfifos:
raise ValueError(
"Number of dims_to_stream arrays does not match number of offsets"
)
if dims_from_stream is None:
dims_from_stream = [[]] * num_subfifos
elif len(dims_from_stream) != num_subfifos:
raise ValueError(
"Number of dims_from_stream arrays does not match number of offsets"
)
if repeat_counts is None:
repeat_counts = [None] * num_subfifos
elif len(repeat_counts) != num_subfifos:
raise ValueError("Number of repeat_counts does not match number of offsets")
# Create subfifos
subfifos = []
for i in range(num_subfifos):
subfifos.append(
ObjectFifo(
obj_types[i],
name=names[i],
depth=depths[i],
dims_to_stream=dims_to_stream[i],
dims_from_stream_per_cons=dims_from_stream[i],
plio=plio,
repeat_count=repeat_counts[i],
)
)
# Create link and set it as endpoints
subfifo_prods = [s.prod() for s in subfifos]
_ = ObjectFifoLink(self, subfifo_prods, tile, [], offsets)
return subfifos
def forward(
self,
tile: Tile = AnyMemTile,
obj_type: type[np.ndarray] | None = None,
depth: int | None = None,
name: str | None = None,
dims_to_stream: list[Sequence[int]] | None = None,
dims_from_stream: list[Sequence[int]] | None = None,
plio: bool = False,
repeat_count: int | None = None,
) -> ObjectFifo:
"""This is a special case of the split() operation where an ObjectFifoHandle of type consumer
is forwarded to the producer of a newly-constructed ObjectFifo.
Args:
tile (Tile, optional): The tile for the Forward operation. Defaults to AnyMemTile.
obj_type (type[np.ndarray] | None, optional): The object type of the new ObjectFifo. Defaults to None.
depth (int | None, optional): The depth of the new ObjectFifo. Defaults to None.
name (str | None, optional): The name of the new ObjectFifo. If None is given, a unique name will be generated. Defaults to None.
dims_to_stream (list[Sequence[int]] | None, optional): The dimensions to stream for the new ObjectFifo. Defaults to None.
dims_from_stream (list[Sequence[int]] | None, optional): The dimensions from stream for the new ObjectFifo. Defaults to None.
plio (bool, optional): Set plio on each new ObjectFifo. Defaults to False.
repeat_count (int | None, optional): MemTile DMA repeat count for the new ObjectFifo (see ObjectFifo.repeat_count). Defaults to None.
Raises:
ValueError: Arguments are Validated
Returns:
ObjectFifo: A newly constructed ObjectFifo whose producer used in this forward() operation.
"""
if self._is_prod:
raise ValueError(f"Cannot forward a {self.handle_type} ObjectFifoHandle")
if obj_type:
obj_type = [obj_type]
if depth:
depth = [depth]
if name:
name = [name]
else:
name = [self._object_fifo.name + "_fwd"]
if dims_to_stream:
dims_to_stream = [dims_to_stream]
if dims_from_stream:
dims_from_stream = [dims_from_stream]
forward_fifo = self.split(
[0],
tile=tile,
obj_types=obj_type,
depths=depth,
names=name,
dims_to_stream=dims_to_stream,
dims_from_stream=dims_from_stream,
plio=plio,
repeat_counts=[repeat_count] if repeat_count is not None else None,
)
return forward_fifo[0]
def resolve(
self,
loc: ir.Location | None = None,
ip: ir.InsertionPoint | None = None,
) -> None:
self._object_fifo.resolve(loc=loc, ip=ip)
class ObjectFifoLink(ObjectFifoEndpoint, Resolvable):
"""This is an object used internally by split(), join() and forward() operations."""
def __init__(
self,
srcs: list[ObjectFifoHandle] | ObjectFifoHandle,
dsts: list[ObjectFifoHandle] | ObjectFifoHandle,
tile: Tile = AnyMemTile,
src_offsets: list[int] = [],
dst_offsets: list[int] = [],
):
"""Construct an ObjectFifoLink. This is either a many-to-one, one-to-many, or one-to-one operation.
Args:
srcs (list[ObjectFifoHandle] | ObjectFifoHandle): A list of consumer ObjectFifoHandles to link.
dsts (list[ObjectFifoHandle] | ObjectFifoHandle): A list of producer ObjectFifoHandles to link.
tile (Tile, optional): The tile where the link occurs. Defaults to AnyMemTile.
src_offsets (list[int], optional): If many sources, one offset per source is required to split the destination. Defaults to [].
dst_offsets (list[int], optional): If many destinations, one offset per destination is required to split the source. Defaults to [].
Raises:
ValueError: Arguments are validated.
"""
self._srcs = single_elem_or_list_to_list(srcs)
self._dsts = single_elem_or_list_to_list(dsts)
self._src_offsets = src_offsets
self._dst_offsets = dst_offsets
self._resolving = False
if len(self._srcs) < 1:
raise ValueError("An ObjectFifoLink must have at least one source")
if len(self._dsts) < 1:
raise ValueError("An ObjectFifoLink must have at least one destination")
if len(self._srcs) != 1 and len(self._dsts) != 1:
raise ValueError(
"An ObjectFifoLink may only have > 1 of either sources or destinations, but not both"
)
if len(self._src_offsets) > 0 and len(self._src_offsets) != len(self._srcs):
raise ValueError(
"The number of source offsets does not match the number of sources"
)
if len(self._dst_offsets) > 0 and len(self._dst_offsets) != len(self._dsts):
raise ValueError(
"The number of destination offsets does not match the number of destinations"
)
self._op = None
for s in self._srcs:
s.endpoint = self
for d in self._dsts:
d.endpoint = self
tile = tile.copy()
if tile.tile_type is None:
tile.tile_type = AIETileType.MemTile
ObjectFifoEndpoint.__init__(self, tile)
def resolve(
self,
loc: ir.Location | None = None,
ip: ir.InsertionPoint | None = None,
) -> None:
if not self._resolving:
self._resolving = True
# This function may be re-entrant as resolving sources/destinations
# may call resolve on the object fifo endpoints, e.g., this function
# We solve this be marking as _resolving BEFORE calling resolve on
# sources or destinations.
for s in self._srcs:
s.resolve()
for d in self._dsts:
d.resolve()
src_ops = [s.op for s in self._srcs]
dst_ops = [d.op for d in self._dsts]
self._op = object_fifo_link(
src_ops, dst_ops, self._src_offsets, self._dst_offsets
)