-
Notifications
You must be signed in to change notification settings - Fork 68
Expand file tree
/
Copy pathlib.py
More file actions
92 lines (77 loc) · 2.54 KB
/
lib.py
File metadata and controls
92 lines (77 loc) · 2.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import operator
import time
from contextlib import contextmanager
from dataclasses import dataclass, field
from itertools import product, repeat
import numpy as np
import pandas as pd
import toolz as tlz
def stats(data):
q = np.quantile(data, q=[0.25, 0.5, 0.75])
_, trend = np.polyfit(np.arange(data.size), data, deg=1)
return {
"rounds": data.size,
"max": data.max(),
"trend": trend,
"mean": data.mean(),
"iqr": q[2] - q[0],
"min": data.min(),
"median": q[1],
"q1": q[0],
"q3": q[2],
"stddev": np.std(data),
}
def slices_from_chunks(shape: tuple[int, ...], chunks: tuple[int, ...]):
"""slightly modified from dask.array.core.slices_from_chunks to be lazy"""
extras = ((s % c,) if s % c > 0 else () for s, c in zip(shape, chunks, strict=True))
# need this twice
chunks = tuple(
tuple(tlz.concatv(repeat(c, s // c), e))
for s, c, e in zip(shape, chunks, extras, strict=True)
)
cumdims = (tlz.accumulate(operator.add, bds[:-1], 0) for bds in chunks)
slices = (
(slice(s, s + dim) for s, dim in zip(starts, shapes, strict=True))
for starts, shapes in zip(cumdims, chunks, strict=True)
)
return product(*slices)
def normalize_chunks(
*, shape: tuple[int, ...], chunks: tuple[int, ...]
) -> tuple[int, ...]:
assert len(shape) == len(chunks)
chunks = tuple(s if c == -1 else c for s, c in zip(shape, chunks, strict=True))
return chunks
def get_task_chunk_shape(
*, task_nchunks: int, shape: tuple[int, ...], chunks: tuple[int, ...]
) -> tuple[int, ...]:
left = task_nchunks
task_chunk_shape = []
for s, c in zip(shape, chunks, strict=True):
if c == s or left is None:
task_chunk_shape.append(c)
else:
q, _ = divmod(s, c)
if q > left:
task_chunk_shape.append(left * c)
else:
task_chunk_shape.append(q * c)
left /= q
print(f"{task_chunk_shape=!r}")
return task_chunk_shape
@dataclass
class Timer:
diagnostics: list = field(default_factory=list)
@contextmanager
def time(self, **kwargs):
tic = time.perf_counter()
yield
toc = time.perf_counter()
kwargs["runtime"] = toc - tic
self.diagnostics.append(kwargs)
def as_dict(self) -> dict:
out = {}
for item in self.diagnostics:
out[item["op"]] = item["runtime"]
return out
def dataframe(self):
return pd.DataFrame(self.diagnostics)