-
Notifications
You must be signed in to change notification settings - Fork 87
Expand file tree
/
Copy pathutils.py
More file actions
351 lines (300 loc) · 11.4 KB
/
utils.py
File metadata and controls
351 lines (300 loc) · 11.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
# type: ignore
"""
This utility module contains functions that are used in the benchmarks.
Functions that make running and debugging benchmarks easier:
- class Skip is used to skip benchmarks based on environment variables.
- function run_benchmark_from_module is used to run benchmarks from a module.
- function run_benchmark is used to run the benchmarks.
Performant dataset generation functions so the benchmarks run fast even for large artificial datasets.
The object is to generate a dataset containing many cells. By copying the same cell values instead of
doing gaussian blur on the whole image, we can generate the same dataset in a fraction of the time.
- function labeled_particles is used to generate labeled blobs.
- function _generate_ball is used to generate a ball of given radius and dimension.
- function _generate_density is used to generate gaussian density of given radius and dimension.
- function cluster_blobs is used to generate a SpatialData object with blobs.
- function _structure_at_coordinates is used to update data with structure at given coordinates.
- function _get_slices_at is used to get slices at a given point.
- function _update_data_with_mask is used to update data with struct where struct is nonzero.
"""
import itertools
import os
from collections.abc import Sequence
from functools import lru_cache
from types import ModuleType
from typing import Callable, Literal, Optional, Union, overload
import anndata as ad
import numpy as np
import pandas as pd
from skimage import morphology
import spatialdata as sd
from spatialdata import SpatialData
from spatialdata.models import Image2DModel, TableModel
from spatialdata.transformations import Identity
def always_false(*_):
return False
class Skip:
def __init__(
self,
if_in_pr: Callable[..., bool] = always_false,
if_on_ci: Callable[..., bool] = always_false,
always: Callable[..., bool] = always_false,
):
self.func_pr = if_in_pr if "PR" in os.environ else always_false
self.func_ci = if_on_ci if "CI" in os.environ else always_false
self.func_always = always
def __contains__(self, item):
return self.func_pr(*item) or self.func_ci(*item) or self.func_always(*item)
def _generate_ball(radius: int, ndim: int) -> np.ndarray:
"""Generate a ball of given radius and dimension.
Parameters
----------
radius : int
Radius of the ball.
ndim : int
Dimension of the ball.
Returns
-------
ball : ndarray of uint8
Binary array of the hyper ball.
"""
if ndim == 2:
return morphology.disk(radius)
if ndim == 3:
return morphology.ball(radius)
shape = (2 * radius + 1,) * ndim
radius_sq = radius**2
coords = np.indices(shape) - radius
return (np.sum(coords**2, axis=0) <= radius_sq).astype(np.uint8)
def _generate_density(radius: int, ndim: int) -> np.ndarray:
"""Generate gaussian density of given radius and dimension."""
shape = (2 * radius + 1,) * ndim
coords = np.indices(shape) - radius
dist = np.sqrt(np.sum(coords**2 / ((radius / 4) ** 2), axis=0))
res = np.exp(-dist)
res[res < 0.02] = 0
return res
def _structure_at_coordinates(
shape: tuple[int],
coordinates: np.ndarray,
structure: np.ndarray,
*,
multipliers: Sequence = itertools.repeat(1),
dtype=None,
reduce_fn: Callable[[np.ndarray, np.ndarray, Optional[np.ndarray]], np.ndarray],
):
"""Update data with structure at given coordinates.
Parameters
----------
data : ndarray
Array to update.
coordinates : ndarray
Coordinates of the points. The structures will be added at these
points (center).
structure : ndarray
Array with encoded structure. For example, ball (boolean) or density
(0,1) float.
multipliers : ndarray
These values are multiplied by the values in the structure before
updating the array. Can be used to generate different labels, or to
vary the intensity of floating point gaussian densities.
reduce_fn : function
Function with which to update the array at a particular position. It
should take two arrays as input and an optional output array.
"""
radius = (structure.shape[0] - 1) // 2
data = np.zeros(shape, dtype=dtype)
for point, value in zip(coordinates, multipliers):
slice_im, slice_ball = _get_slices_at(shape, point, radius)
reduce_fn(data[slice_im], value * structure[slice_ball], out=data[slice_im])
return data
def _get_slices_at(shape, point, radius):
slice_im = []
slice_ball = []
for i, p in enumerate(point):
slice_im.append(slice(max(0, p - radius), min(shape[i], p + radius + 1)))
ball_start = max(0, radius - p)
ball_stop = slice_im[-1].stop - slice_im[-1].start + ball_start
slice_ball.append(slice(ball_start, ball_stop))
return tuple(slice_im), tuple(slice_ball)
def _update_data_with_mask(data, struct, out=None):
"""Update ``data`` with ``struct`` where ``struct`` is nonzero."""
# these branches are needed because np.where does not support
# an out= keyword argument
if out is None:
return np.where(struct, struct, data)
else: # noqa: RET505
nz = struct != 0
out[nz] = struct[nz]
return out
def _smallest_dtype(n: int) -> np.dtype:
"""Find the smallest dtype that can hold n values."""
for dtype in [np.uint8, np.uint16, np.uint32, np.uint64]:
if np.iinfo(dtype).max >= n:
return dtype
break
else:
raise ValueError(f"{n=} is too large for any dtype.")
@overload
def labeled_particles(
shape: Sequence[int],
dtype: Optional[np.dtype] = None,
n: int = 144,
seed: Optional[int] = None,
return_density: Literal[False] = False,
) -> np.ndarray: ...
@overload
def labeled_particles(
shape: Sequence[int],
dtype: Optional[np.dtype] = None,
n: int = 144,
seed: Optional[int] = None,
return_density: Literal[True] = True,
) -> tuple[np.ndarray, np.ndarray, np.ndarray]: ...
@lru_cache
def labeled_particles(
shape: Sequence[int],
dtype: Optional[np.dtype] = None,
n: int = 144,
seed: Optional[int] = None,
return_density: bool = False,
) -> Union[np.ndarray, tuple[np.ndarray, np.ndarray, np.ndarray]]:
"""Generate labeled blobs of given shape and dtype.
Parameters
----------
shape : Sequence[int]
Shape of the resulting array.
dtype : Optional[np.dtype]
Dtype of the resulting array.
n : int
Number of blobs to generate.
seed : Optional[int]
Seed for the random number generator.
return_density : bool
Whether to return the density array and center coordinates.
"""
if dtype is None:
dtype = _smallest_dtype(n)
rng = np.random.default_rng(seed)
ndim = len(shape)
points = rng.integers(shape, size=(n, ndim))
# create values from 1 to max of number of points
values = np.linspace(1, n, n, dtype=dtype)
rng.shuffle(values)
# values = rng.integers(
# np.iinfo(dtype).min + 1, np.iinfo(dtype).max, size=n, dtype=dtype
# )
sigma = int(max(shape) / (4.0 * n ** (1 / ndim)))
ball = _generate_ball(sigma, ndim)
labels = _structure_at_coordinates(
shape,
points,
ball,
multipliers=values,
reduce_fn=_update_data_with_mask,
dtype=dtype,
)
if return_density:
dens = _generate_density(sigma * 2, ndim)
densities = _structure_at_coordinates(shape, points, dens, reduce_fn=np.maximum, dtype=np.float32)
return labels, densities, points, values
else: # noqa: RET505
return labels
def run_benchmark_from_module(module: ModuleType, klass_name: str, method_name: str):
klass = getattr(module, klass_name)
if getattr(klass, "params", None):
skip_if = getattr(klass, "skip_params", {})
if isinstance(klass.params[0], Sequence):
params = itertools.product(*klass.params)
else:
params = ((i,) for i in klass.params)
for param in params:
if param in skip_if:
continue
obj = klass()
try:
obj.setup(*param)
except NotImplementedError:
continue
getattr(obj, method_name)(*param)
getattr(obj, "teardown", lambda: None)()
else:
obj = klass()
try:
obj.setup()
except NotImplementedError:
return
getattr(obj, method_name)()
getattr(obj, "teardown", lambda: None)()
def run_benchmark():
import argparse
import inspect
parser = argparse.ArgumentParser(description="Run benchmark")
parser.add_argument("benchmark", type=str, help="Name of the benchmark to run", default="")
args = parser.parse_args()
benchmark_selection = args.benchmark.split(".")
# get module of parent frame
call_module = inspect.getmodule(inspect.currentframe().f_back)
run_benchmark_from_module(call_module, *benchmark_selection)
# TODO: merge functionality of this cluster_blobs with the one in SpatialData https://github.com/scverse/spatialdata/issues/796
@lru_cache
def cluster_blobs(
length=512,
n_cells=None,
region_key="region_key",
instance_key="instance_key",
image_name="blobs_image",
labels_name="blobs_labels",
points_name="blobs_points",
n_transcripts_per_cell=None,
table_name="table",
coordinate_system="global",
):
"""Faster `spatialdata.datasets.make_blobs` using napari.datasets code."""
if n_cells is None:
n_cells = length
# cells
labels, density, points, values = labeled_particles((length, length), return_density=True, n=n_cells)
im_el = Image2DModel.parse(
data=density[None, ...],
dims="cyx",
transformations={coordinate_system: Identity()},
)
label_el = sd.models.Labels2DModel.parse(labels, dims="yx", transformations={coordinate_system: Identity()})
points_cells_el = sd.models.PointsModel.parse(points, transformations={coordinate_system: Identity()})
# generate dummy table
adata = ad.AnnData(X=np.ones((length, 10)))
adata.obs[region_key] = pd.Categorical([labels_name] * len(adata))
# adata.obs_names = values.astype(np.uint64)
adata.obs[instance_key] = adata.obs_names.values
adata.obs.index = adata.obs.index.astype(str)
adata.obs.index.name = instance_key
# del adata.uns[TableModel.ATTRS_KEY]
table = TableModel.parse(
adata,
region=labels_name,
region_key=region_key,
instance_key=instance_key,
)
sdata = SpatialData(
images={
image_name: im_el,
},
labels={
labels_name: label_el,
},
points={points_name: points_cells_el},
tables={table_name: table},
)
if n_transcripts_per_cell:
# transcript points
# generate 100 transcripts per cell
rng = np.random.default_rng(None)
points_transcripts = rng.integers(length, size=(n_cells * n_transcripts_per_cell, 2))
points_transcripts_el = sd.models.PointsModel.parse(
points_transcripts, transformations={coordinate_system: Identity()}
)
sdata["transcripts_" + points_name] = points_transcripts_el
# if shapes_name:
# sdata[shapes_name] = sd.to_circles(sdata[labels_name])
# add_regionprop_features(sdata, labels_layer=labels_name, table_layer=table_name)
return sdata