forked from NOAA-GFDL/NDSL
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbuffer.py
More file actions
194 lines (161 loc) · 6.01 KB
/
buffer.py
File metadata and controls
194 lines (161 loc) · 6.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
from __future__ import annotations
import contextlib
from collections.abc import Callable, Generator, Iterable
from typing import Any
import numpy as np
import numpy.typing as npt
from ndsl.performance.timer import NullTimer, Timer
from ndsl.types import Allocator
from ndsl.utils import (
device_synchronize,
is_c_contiguous,
safe_assign_array,
safe_mpi_allocate,
)
BufferKey = tuple[Callable, Iterable[int], npt.DTypeLike]
BUFFER_CACHE: dict[BufferKey, list["Buffer"]] = {}
class Buffer:
"""A buffer cached by default.
_key: key into cache storage to allow easy re-caching
array: ndarray allocated
"""
array: np.ndarray
def __init__(self, key: BufferKey, array: np.ndarray):
"""Init a cacheable buffer.
Args:
key: a cache key made out of tuple of Allocator, shape and dtype
array: ndarray of actual data
"""
self._key = key
self.array = array
@classmethod
def pop_from_cache(
cls, allocator: Allocator, shape: Iterable[int], dtype: npt.DTypeLike
) -> Buffer:
"""Retrieve or insert then retrieve of buffer from cache.
Args:
allocator: used to allocate memory
shape: shape of array
dtype: type of array elements
Return:
a buffer wrapping an allocated array
"""
key = (allocator, shape, dtype)
if key in BUFFER_CACHE and len(BUFFER_CACHE[key]) > 0:
return BUFFER_CACHE[key].pop()
if key not in BUFFER_CACHE:
BUFFER_CACHE[key] = []
array = safe_mpi_allocate(allocator, shape, dtype=dtype)
assert is_c_contiguous(array)
return cls(key, array)
@staticmethod
def push_to_cache(buffer: Buffer) -> None:
"""Push the buffer back into the cache.
Args:
buffer: buffer to push back in cache, using internal key
"""
BUFFER_CACHE[buffer._key].append(buffer)
def finalize_memory_transfer(self) -> None:
"""Finalize any memory transfer"""
device_synchronize()
def assign_to(
self,
destination_array: np.ndarray,
buffer_slice: Any = np.index_exp[:],
buffer_reshape: Any | None = None,
) -> None:
"""Assign internal array to destination_array.
Args:
destination_array: target ndarray
"""
if buffer_reshape is None:
safe_assign_array(destination_array, self.array[buffer_slice])
else:
safe_assign_array(
destination_array,
np.reshape(self.array[buffer_slice], buffer_reshape, order="C"),
)
def assign_from(
self, source_array: np.ndarray, buffer_slice: Any = np.index_exp[:]
) -> None:
"""Assign source_array to internal array.
Args:
source_array: source ndarray
"""
safe_assign_array(self.array[buffer_slice], source_array)
@contextlib.contextmanager
def array_buffer(
allocator: Allocator, shape: Iterable[int], dtype: npt.DTypeLike
) -> Generator[Buffer, Buffer, None]:
"""
A context manager providing a contiguous array, which may be re-used between calls.
Args:
allocator: a function with the same signature as numpy.zeros which returns
an ndarray
shape: the shape of the desired array
dtype: the dtype of the desired array
Yields:
buffer_array: an ndarray created according to the specification in the args.
May be retained and re-used in subsequent calls.
"""
buffer = Buffer.pop_from_cache(allocator, shape, dtype)
yield buffer
Buffer.push_to_cache(buffer)
@contextlib.contextmanager
def send_buffer(
allocator: Callable,
array: np.ndarray,
timer: Timer | None = None,
) -> Generator[np.ndarray]:
"""A context manager ensuring that `array` is contiguous in a context where it is
being sent as data, copying into a recycled buffer array if necessary.
Args:
allocator: used to allocate memory
array: a possibly non-contiguous array for which to provide a buffer
timer: object to accumulate timings for "pack"
Yields:
buffer_array: if array is non-contiguous, a contiguous buffer array containing
the data from array. Otherwise, yields array.
"""
if timer is None:
timer = NullTimer()
if array is None or is_c_contiguous(array):
yield array
else:
timer.start("pack")
with array_buffer(allocator, array.shape, array.dtype) as sendbuf:
sendbuf.assign_from(array)
# this is a little dangerous, because if there is an exception in the two
# lines above the timer may be started but never stopped. However, it
# cannot be avoided because we cannot put those two lines in a with or
# try block without also including the yield line.
timer.stop("pack")
yield sendbuf.array
@contextlib.contextmanager
def recv_buffer(
allocator: Callable,
array: np.ndarray,
timer: Timer | None = None,
) -> Generator[np.ndarray]:
"""A context manager ensuring that array is contiguous in a context where it is
being used to receive data, using a recycled buffer array and then copying the
result into array if necessary.
Args:
allocator: used to allocate memory
array: a possibly non-contiguous array for which to provide a buffer
timer: object to accumulate timings for "unpack"
Yields:
buffer_array: if array is non-contiguous, a contiguous buffer array which is
copied into array when the context is exited. Otherwise, yields array.
"""
if timer is None:
timer = NullTimer()
if array is None or is_c_contiguous(array):
yield array
else:
timer.start("unpack")
with array_buffer(allocator, array.shape, array.dtype) as recvbuf:
timer.stop("unpack")
yield recvbuf.array
with timer.clock("unpack"):
recvbuf.assign_to(array)