forked from Ekin-Kahraman/scanpy
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathtest_utils.py
More file actions
274 lines (225 loc) · 9.22 KB
/
test_utils.py
File metadata and controls
274 lines (225 loc) · 9.22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
from __future__ import annotations
import itertools
import string
from contextlib import suppress
from operator import mul, truediv
from types import ModuleType
from typing import TYPE_CHECKING
import numba
import numpy as np
import pytest
from anndata.tests.helpers import asarray
from scipy import sparse
from scanpy._compat import CSBase, DaskArray
from scanpy._utils import (
_numba_thread_limit,
axis_mul_or_truediv,
check_nonnegative_integers,
descend_classes_and_funcs,
)
from scanpy._utils.random import _LegacyRng, ith_k_tuple, random_k_tuples, random_str
from testing.scanpy._pytest.params import (
ARRAY_TYPES,
ARRAY_TYPES_DASK,
ARRAY_TYPES_SPARSE,
)
if TYPE_CHECKING:
from collections.abc import Callable
from typing import Any
def test_descend_classes_and_funcs():
# create module hierarchy
a = ModuleType("a")
a.b = ModuleType("a.b")
# populate with classes
a.A = type("A", (), {})
a.A.__module__ = a.__name__
a.b.B = type("B", (), {})
a.b.B.__module__ = a.b.__name__
# create a loop to check if that gets caught
a.b.a = a
assert {a.A, a.b.B} == set(descend_classes_and_funcs(a, "a"))
def test_axis_mul_or_truediv_badop():
dividend = np.array([[0, 1.0, 1.0], [1.0, 0, 1.0]])
divisor = np.array([0.1, 0.2])
with pytest.raises(ValueError, match=r"not one of truediv or mul"):
axis_mul_or_truediv(dividend, divisor, op=np.add, axis=0)
def test_axis_mul_or_truediv_bad_out():
dividend = sparse.csr_matrix(np.array([[0, 1.0, 1.0], [1.0, 0, 1.0]])) # noqa: TID251
divisor = np.array([0.1, 0.2])
with pytest.raises(ValueError, match="`out` argument provided but not equal to X"):
axis_mul_or_truediv(dividend, divisor, op=truediv, out=dividend.copy(), axis=0)
@pytest.mark.parametrize("array_type", ARRAY_TYPES)
@pytest.mark.parametrize("op", [truediv, mul])
def test_scale_row(array_type, op):
dividend = array_type(asarray([[0, 1.0, 1.0], [1.0, 0, 1.0]]))
divisor = np.array([0.1, 0.2])
if op is mul:
divisor = 1 / divisor
expd = np.array([[0, 10.0, 10.0], [5.0, 0, 5.0]])
out = dividend if isinstance(dividend, CSBase | np.ndarray) else None
res = asarray(axis_mul_or_truediv(dividend, divisor, op=op, axis=0, out=out))
np.testing.assert_array_equal(res, expd)
@pytest.mark.parametrize("array_type", ARRAY_TYPES)
@pytest.mark.parametrize("op", [truediv, mul])
def test_scale_column(array_type, op):
dividend = array_type(asarray([[0, 1.0, 2.0], [3.0, 0, 4.0]]))
divisor = np.array([0.1, 0.2, 0.5])
if op is mul:
divisor = 1 / divisor
expd = np.array([[0, 5.0, 4.0], [30.0, 0, 8.0]])
out = dividend if isinstance(dividend, CSBase | np.ndarray) else None
res = asarray(axis_mul_or_truediv(dividend, divisor, op=op, axis=1, out=out))
np.testing.assert_array_equal(res, expd)
@pytest.mark.filterwarnings("ignore:divide by zero encountered:RuntimeWarning")
@pytest.mark.parametrize("array_type", ARRAY_TYPES)
def test_divide_by_zero(array_type):
dividend = array_type(asarray([[0, 1.0, 2.0], [3.0, 0, 4.0]]))
divisor = np.array([0.1, 0.2, 0.0])
expd = np.array([[0, 5.0, 2.0], [30.0, 0, 4.0]])
res = asarray(
axis_mul_or_truediv(
dividend, divisor, op=truediv, axis=1, allow_divide_by_zero=False
)
)
np.testing.assert_array_equal(res, expd)
res = asarray(
axis_mul_or_truediv(
dividend, divisor, op=truediv, axis=1, allow_divide_by_zero=True
)
)
expd = np.array([[0, 5.0, np.inf], [30.0, 0, np.inf]])
np.testing.assert_array_equal(res, expd)
@pytest.mark.parametrize("array_type", ARRAY_TYPES_SPARSE)
def test_scale_out_with_dask_or_sparse_raises(array_type: Callable):
dividend = array_type(asarray([[0, 1.0, 2.0], [3.0, 0, 4.0]]))
divisor = np.array([0.1, 0.2, 0.5])
if isinstance(dividend, DaskArray):
with pytest.raises(
TypeError if "dask" in array_type.__name__ else ValueError,
match="`out`",
):
axis_mul_or_truediv(dividend, divisor, op=truediv, axis=1, out=dividend)
@pytest.mark.parametrize("array_type", ARRAY_TYPES_DASK)
@pytest.mark.parametrize("axis", [0, 1])
@pytest.mark.parametrize("op", [truediv, mul])
def test_scale_rechunk(array_type, axis, op):
import dask.array as da
dividend = array_type(
asarray([[0, 1.0, 2.0], [3.0, 0, 4.0], [3.0, 0, 4.0]])
).rechunk(((3,), (3,)))
divisor = da.from_array(np.array([0.1, 0.2, 0.5]), chunks=(1,))
if op is mul:
divisor = 1 / divisor
if axis == 1:
expd = np.array([[0, 5.0, 4.0], [30.0, 0, 8.0], [30.0, 0, 8.0]])
else:
expd = np.array([[0, 10.0, 20.0], [15.0, 0, 20.0], [6.0, 0, 8.0]])
out = dividend if isinstance(dividend, CSBase | np.ndarray) else None
with pytest.warns(UserWarning, match="Rechunking scaling_array*"):
res = asarray(axis_mul_or_truediv(dividend, divisor, op=op, axis=axis, out=out))
np.testing.assert_array_equal(res, expd)
@pytest.mark.parametrize("array_type", ARRAY_TYPES)
@pytest.mark.parametrize(
("array_value", "expected"),
[
pytest.param(
np.random.poisson(size=(100, 100)).astype(np.float64),
True,
id="poisson-float64",
),
pytest.param(
np.random.poisson(size=(100, 100)).astype(np.uint32),
True,
id="poisson-uint32",
),
pytest.param(np.random.normal(size=(100, 100)), False, id="normal"),
pytest.param(np.array([[0, 0, 0], [0, -1, 0], [0, 0, 0]]), False, id="middle"),
],
)
def test_check_nonnegative_integers(array_type, array_value, expected):
x = array_type(array_value)
received = check_nonnegative_integers(x)
if isinstance(x, DaskArray):
assert isinstance(received, DaskArray)
# compute
received = received.compute()
assert not isinstance(received, DaskArray)
if isinstance(received, np.bool):
# convert to python bool
received = received.item()
assert received is expected
@pytest.mark.parametrize("seed", [0, 1, 1256712675])
@pytest.mark.parametrize("pass_seed", [True, False], ids=["pass_seed", "set_seed"])
@pytest.mark.parametrize("func", ["choice"])
def test_legacy_numpy_gen(*, seed: int, pass_seed: bool, func: str):
np.random.seed(seed)
state_before = np.random.get_state(legacy=False)
arrs: dict[bool, np.ndarray] = {}
states_after: dict[bool, dict[str, Any]] = {}
for direct in [True, False]:
if not pass_seed:
np.random.seed(seed)
arrs[direct] = _mk_random(func, direct=direct, seed=seed if pass_seed else None)
states_after[direct] = np.random.get_state(legacy=False)
np.testing.assert_array_equal(arrs[True], arrs[False])
np.testing.assert_equal(
*states_after.values(), err_msg="both should affect global state the same"
)
# they should affect the global state
with pytest.raises(AssertionError):
np.testing.assert_equal(states_after[True], state_before)
def _mk_random(func: str, *, direct: bool, seed: int | None) -> np.ndarray:
if direct and seed is not None:
np.random.seed(seed)
gen = np.random if direct else _LegacyRng.wrap_global(seed)
match func:
case "choice":
arr = np.arange(1000)
return gen.choice(arr, size=(100, 100))
case _:
pytest.fail(f"Unknown {func=}")
def test_ith_k_tuple() -> None:
"""Test that the k-tuples appear in the expected order."""
np.testing.assert_equal(
ith_k_tuple(np.arange(2**3), n=2, k=3),
list(itertools.product(range(2), repeat=3)),
)
def test_random_k_tuples() -> None:
"""Test that random k-tuples are unique."""
tups = random_k_tuples(n=26, k=6, size=10_000)
assert tups.shape == (10_000, 6)
assert tups.dtype == np.int64
unique = np.unique(tups, axis=0)
assert len(unique) == len(tups)
def test_random_str_0d() -> None:
string = random_str(length=3, alphabet="01")
assert string.shape == ()
assert string.dtype == np.dtype("U3")
assert str(string) in {"000", "001", "010", "011", "100", "101", "110", "111"}
def test_random_str() -> None:
strings = random_str(size=26**2, length=2, alphabet=string.ascii_lowercase)
assert strings.shape == (26**2,)
assert strings.dtype == np.dtype("U2")
unique = np.unique(strings, axis=0)
assert len(unique) == len(strings)
@pytest.mark.parametrize("success", [True, False], ids=["success", "exception"])
def test_numba_thread_limit_restores_previous_value(
*, monkeypatch: pytest.MonkeyPatch, success: bool
) -> None:
was_set_to = []
monkeypatch.setattr(numba, "get_num_threads", lambda: 8)
monkeypatch.setattr(numba, "set_num_threads", was_set_to.append)
with suppress(RuntimeError), _numba_thread_limit(2):
if not success:
raise RuntimeError
assert was_set_to == [2, 8]
def test_numba_thread_limit_clamps_to_configured_maximum(
monkeypatch: pytest.MonkeyPatch,
) -> None:
was_set_to = []
monkeypatch.setattr(numba, "get_num_threads", lambda: 3)
monkeypatch.setattr(numba, "set_num_threads", was_set_to.append)
monkeypatch.setattr(numba.config, "NUMBA_NUM_THREADS", 4)
with _numba_thread_limit(99):
pass
assert was_set_to == [4, 3]