Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ jobs:
&& conda create --yes -n test python==${{ matrix.python }} \
&& conda activate test \
&& conda install --yes --file packaging/conda_build_requirements.txt
if test ${{ matrix.python }} = "3.9"; then conda install libxcrypt; fi

- name: Install
run: |
Expand Down
2 changes: 1 addition & 1 deletion docs/docs/install.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ create an environment called "flacarray". First create the env with all
dependencies and activate it (FIXME, add a requirements file for dev):

conda create -n flacarray \
c_compiler numpy libflac cython meson-python pkgconfig
c-compiler numpy libflac cython meson-python pkgconfig

conda activate flacarray

Expand Down
155 changes: 104 additions & 51 deletions src/flacarray/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ def __init__(
shape=None,
global_shape=None,
compressed=None,
dtype=None,
stream_starts=None,
stream_nbytes=None,
stream_offsets=None,
Expand All @@ -84,6 +85,7 @@ def __init__(
self._shape = copy.deepcopy(other._shape)
self._global_shape = copy.deepcopy(other._global_shape)
self._compressed = copy.deepcopy(other._compressed)
self._dtype = np.dtype(other._dtype)
self._stream_starts = copy.deepcopy(other._stream_starts)
self._stream_nbytes = copy.deepcopy(other._stream_nbytes)
self._stream_offsets = copy.deepcopy(other._stream_offsets)
Expand All @@ -97,6 +99,7 @@ def __init__(
self._shape = shape
self._global_shape = global_shape
self._compressed = compressed
self._dtype = np.dtype(dtype)
self._stream_starts = stream_starts
self._stream_nbytes = stream_nbytes
self._stream_offsets = stream_offsets
Expand All @@ -120,19 +123,23 @@ def _init_params(self):
else:
self._global_leading_shape = self._global_shape[:-1]
self._global_nstreams = np.prod(self._global_leading_shape)
# For reference, record the type of the original data.
if self._stream_offsets is not None:
if self._stream_gains is not None:
# This is floating point data
if self._stream_gains.dtype == np.dtype(np.float64):
self._typestr = "float64"
else:
self._typestr = "float32"
else:
# This is int64 data
self._typestr = "int64"
# For reference, record the type string of the original data.
self._typestr = self._dtype_str(self._dtype)

@staticmethod
def _dtype_str(dt):
if dt == np.dtype(np.float64):
return "float64"
elif dt == np.dtype(np.float32):
return "float32"
elif dt == np.dtype(np.int64):
return "int64"
elif dt == np.dtype(np.int32):
return "int32"
else:
self._typestr = "int32"
msg = f"Unsupported dtype '{dt}'"
raise RuntimeError(msg)
return None

# Shapes of decompressed array

Expand Down Expand Up @@ -233,63 +240,109 @@ def mpi_dist(self):
"""The range of the leading dimension assigned to each MPI process."""
return self._mpi_dist

@property
def dtype(self):
"""The dtype of the uncompressed array."""
return self._dtype

def _keep_view(self, key):
if len(key) != len(self._leading_shape):
raise ValueError("view size does not match leading dimensions")
view = np.zeros(self._leading_shape, dtype=bool)
view[key] = True
return view

def __getitem__(self, key):
def _slice_nelem(self, slc, dim):
start, stop, step = slc.indices(dim)
nslc = (stop - start) // step
if nslc < 0:
nslc = 0
return nslc

def __getitem__(self, raw_key):
"""Decompress a slice of data on the fly."""
first = None
last = None
keep = None
if isinstance(key, tuple):
# We are slicing on multiple dimensions
if len(key) == len(self._shape):
# Slicing on the sample dimension too
keep = self._keep_view(key[:-1])
samp_key = key[-1]
if isinstance(samp_key, slice):
ndim = len(self._shape)
output_shape = list()
sample_shape = (self._shape[-1],)
if isinstance(raw_key, tuple):
key = raw_key
else:
key = (raw_key,)
keep_slice = list()
for axis, axkey in enumerate(key):
if axis < ndim - 1:
# One of the leading dimensions
keep_slice.append(axkey)
if not isinstance(axkey, (int, np.integer)):
# Some kind of slice, do not compress this dimension. Compute
# the number of elements in the output shape.
nslc = self._slice_nelem(axkey, self._shape[axis])
output_shape.append(nslc)
else:
# This is the sample axis. Special handling to ensure that the
# selected samples are contiguous.
if isinstance(axkey, slice):
# A slice
if samp_key.step is not None and samp_key.step != 1:
raise ValueError("Only stride==1 supported on stream slices")
first = samp_key.start
last = samp_key.stop
elif isinstance(samp_key, (int, np.integer)):
if (axkey.step is not None and axkey.step != 1):
msg = "Only stride==1 supported on stream slices"
raise ValueError(msg)
if (
axkey.start is not None
and axkey.stop is not None
and axkey.stop < axkey.start
):
msg = "Only increasing slices supported on streams"
raise ValueError(msg)
first = axkey.start
last = axkey.stop
if first is None or first < 0:
first = 0
if first > self._shape[-1] - 1:
first = self._shape[-1] - 1
if last is None or last > self._shape[-1]:
last = self._shape[-1]
if last < 1:
last = 1
sample_shape = (last - first,)
elif isinstance(axkey, (int, np.integer)):
# Just a scalar
first = samp_key
last = samp_key + 1
first = axkey
last = axkey + 1
sample_shape = ()
else:
raise ValueError(
"Only contiguous slices supported on the stream dimension"
)
else:
# Only slicing the leading dimensions
vw = list(key)
vw.extend(
[slice(None) for x in range(len(self._leading_shape) - len(key))]
)
keep = self._keep_view(tuple(vw))
else:
# We are slicing / indexing only the leading dimension
vw = [slice(None) for x in range(len(self._leading_shape))]
vw[0] = key
keep = self._keep_view(tuple(vw))

arr, _ = array_decompress_slice(
self._compressed,
self._stream_size,
self._stream_starts,
self._stream_nbytes,
stream_offsets=self._stream_offsets,
stream_gains=self._stream_gains,
keep=keep,
first_stream_sample=first,
last_stream_sample=last,
keep_slice.extend(
[slice(None) for x in range(len(self._leading_shape) - len(key))]
)
return arr
output_shape.extend(
[x for x in self._leading_shape[len(key):]]
)
keep = self._keep_view(tuple(keep_slice))
output_shape = tuple(output_shape)
full_shape = output_shape + sample_shape
n_total = np.prod(full_shape)
if n_total == 0:
# At least one dimension was zero, return empty array
return np.zeros(full_shape, dtype=self._dtype)
else:
# We have some samples
arr, strm_indices = array_decompress_slice(
self._compressed,
self._stream_size,
self._stream_starts,
self._stream_nbytes,
stream_offsets=self._stream_offsets,
stream_gains=self._stream_gains,
keep=keep,
first_stream_sample=first,
last_stream_sample=last,
)
return arr.reshape(full_shape)

def __delitem__(self, key):
raise RuntimeError("Cannot delete individual streams")
Expand Down
60 changes: 55 additions & 5 deletions src/flacarray/tests/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ def test_helpers(self):
.astype(np.int32)
)

comp_i32, starts_i32, nbytes_i32, off_i32, gain_i32 = array_compress(data_i32, level=5)
comp_i32, starts_i32, nbytes_i32, off_i32, gain_i32 = array_compress(
data_i32, level=5
)
self.assertTrue(off_i32 is None)
self.assertTrue(gain_i32 is None)

Expand All @@ -59,7 +61,9 @@ def test_helpers(self):
low=-(2**30), high=2**29, size=flatsize, dtype=np.int64
).reshape(data_shape)

comp_i64, starts_i64, nbytes_i64, off_i64, gain_i64 = array_compress(data_i64, level=5)
comp_i64, starts_i64, nbytes_i64, off_i64, gain_i64 = array_compress(
data_i64, level=5
)
self.assertTrue(gain_i64 is None)

check_i64 = array_decompress(
Expand All @@ -85,7 +89,9 @@ def test_helpers(self):
).reshape(data_shape)

try:
comp_i64, starts_i64, nbytes_i64, off_i64, gain_i64 = array_compress(data_i64, level=5)
comp_i64, starts_i64, nbytes_i64, off_i64, gain_i64 = array_compress(
data_i64, level=5
)
print("Failed to catch truncation of int64 data")
self.assertTrue(False)
except RuntimeError:
Expand All @@ -94,7 +100,9 @@ def test_helpers(self):
# float32 data

data_f32 = create_fake_data(data_shape, 1.0).astype(np.float32)
comp_f32, starts_f32, nbytes_f32, off_f32, gain_f32 = array_compress(data_f32, level=5)
comp_f32, starts_f32, nbytes_f32, off_f32, gain_f32 = array_compress(
data_f32, level=5
)
check_f32 = array_decompress(
comp_f32,
data_shape[-1],
Expand Down Expand Up @@ -123,7 +131,9 @@ def test_helpers(self):

data_f64 = create_fake_data(data_shape, 1.0)

comp_f64, starts_f64, nbytes_f64, off_f64, gain_f64 = array_compress(data_f64, level=5)
comp_f64, starts_f64, nbytes_f64, off_f64, gain_f64 = array_compress(
data_f64, level=5
)
check_f64 = array_decompress(
comp_f64,
data_shape[-1],
Expand Down Expand Up @@ -163,3 +173,43 @@ def test_array_memory(self):
self.assertTrue(
np.allclose(check_slc_f64, data_f64[:, :, first:last], rtol=1e-5, atol=1e-5)
)

def test_slicing_shape(self):
data_shape = (4, 3, 10, 100)
flatsize = np.prod(data_shape)
rng = np.random.default_rng()
data_i32 = (
rng.integers(low=-(2**27), high=2**30, size=flatsize, dtype=np.int32)
.reshape(data_shape)
.astype(np.int32)
)

farray = FlacArray.from_array(data_i32)

# Try some slices and verify expected result shape.
for dslc in [
(slice(0)),
(slice(1, 3)),
(slice(3, 1)),
(slice(3, 1, -1)),
(1, 2, 5, 50),
(1, 2, 5),
(2, slice(0, 1, 1), slice(0, 1, 1), slice(None)),
(1, slice(1, 3, 1), slice(6, 8, 1), 50),
(slice(1, 3, 1), 2, slice(6, 8, 1), slice(60, 80, 1)),
(2, 1, slice(2, 8, 2), slice(80, 120, 1)),
(2, 1, slice(2, 8, 2), slice(80, None)),
(2, 1, slice(2, 8, 2), slice(None, 10)),
]:
# Slice of the original numpy array
check = data_i32[dslc]
# Slice of the FlacArray
fcheck = farray[dslc]

# Compare the shapes
if fcheck.shape != check.shape:
print(
f"Array[{dslc}] shape: {fcheck.shape} != {check.shape}",
flush=True,
)
raise RuntimeError("Failed slice shape check")