Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 127 additions & 0 deletions sima/sequence.py
Original file line number Diff line number Diff line change
Expand Up @@ -1039,6 +1039,84 @@ def _todict(self, savedir=None):
}


class _MIPSequence(_WrapperSequence):
"""Sequence for applying a NaN Max along one of the dimensions.

Parameters
----------
base : Sequence
axis : axis to perform nanmean along

"""

def __init__(self, base, axis=0):
super(_MIPSequence, self).__init__(base)
self._axis = axis
self._shape = self._base.shape[:axis+1] + (1,) + \
self._base.shape[axis+2:]

def _get_frame(self, t):
frame = self._base._get_frame(t)
return np.nanmax(frame, axis=self._axis, keepdims=True)

def __iter__(self):
for frame in self._base:
yield np.nanmax(frame, axis=self._axis, keepdims=True)

@property
def shape(self):
return self._shape

def __len__(self):
return len(self._base)

def _todict(self, savedir=None):
return {
'__class__': self.__class__,
'base': self._base._todict(savedir),
'axis': self._axis
}


class _NaNMeanSequence(_WrapperSequence):
"""Sequence for applying a NaN Mean along one of the dimensions.

Parameters
----------
base : Sequence
axis : axis to perform nanmean along

"""

def __init__(self, base, axis=0):
super(_NaNMeanSequence, self).__init__(base)
self._axis = axis
self._shape = self._base.shape[:axis+1] + (1,) + \
self._base.shape[axis+2:]

def _get_frame(self, t):
frame = self._base._get_frame(t)
return np.nanmean(frame, axis=self._axis, keepdims=True)

def __iter__(self):
for frame in self._base:
yield np.nanmean(frame, axis=self._axis, keepdims=True)

@property
def shape(self):
return self._shape

def __len__(self):
return len(self._base)

def _todict(self, savedir=None):
return {
'__class__': self.__class__,
'base': self._base._todict(savedir),
'axis': self._axis
}


class _MaskedSequence(_WrapperSequence):
"""Sequence for masking invalid data with NaN's.

Expand Down Expand Up @@ -1148,6 +1226,55 @@ def _todict(self, savedir=None):
}


class _SplicedSequence(_WrapperSequence):
"""Sequence for splicing together nonconsecutive frames

Parameters
----------
base : Sequence

times : list of frame indices of the base sequence

"""
def __init__(self, base, times):
super(_SplicedSequence, self).__init__(base)
self._base_len = len(base)
self._times = times

def __iter__(self):
try:
for t in self._times:
# Not sure if np.copy is needed here (see _IndexedSequence)
yield np.copy(self._base._get_frame(t))
except NotImplementedError:
if self._indices[0].step < 0:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure where the self._indices comes from.
What about if np.any(np.diff(self._times) <= 0): ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I *think this was copied in error over from indexed sequence. I can clean up the logic and then resubmit.

raise NotImplementedError(
'Iterating backwards not supported by the base class')
idx = 0
for t, frame in enumerate(self._base):
try:
whether_yield = t == self._times[idx]
except IndexError:
raise StopIteration
if whether_yield:
# Not sure if np.copy is needed here (see _IndexedSequence)
yield np.copy(frame)
idx += 1

def _get_frame(self, t):
return self._base._get_frame(self._times[t])

def __len__(self):
return len(self._times)

def _todict(self, savedir=None):
return {
'__class__': self.__class__,
'base': self._base._todict(savedir),
'times': self._times
}


class _IndexedSequence(_WrapperSequence):

def __init__(self, base, indices):
Expand Down
60 changes: 59 additions & 1 deletion sima/tests/test_sequence.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,6 @@ def test_3d_mask(self):
assert_(np.all(np.isnan(masked)[self.masked_mask]))
assert_(np.all(np.isfinite(masked)[~self.masked_mask]))


def test_static_and_single_frame_mask(self):
frame_mask = np.random.random((128, 256))
frame_mask = frame_mask > 0.5
Expand Down Expand Up @@ -313,5 +312,64 @@ def test_all_mask_types(self):
assert_(np.all(np.isfinite(masked)[~self.masked_mask]))


class TestSplicedSequence(object):

def setup(self):
self.tiff_seq = sima.Sequence.create('TIFF', example_tiff(), 2, 2)
self.spliced_sequence = sima.sequence._SplicedSequence(
self.tiff_seq, [1, 3, 4])

def test_init(self):
assert_equal(len(self.spliced_sequence), 3)
assert_equal(
self.tiff_seq._get_frame(1), self.spliced_sequence._get_frame(0))
assert_equal(
self.tiff_seq._get_frame(4), self.spliced_sequence._get_frame(2))

def test_to_dict(self):
dict_ = self.spliced_sequence._todict()
loaded_sequence = dict_.pop('__class__')._from_dict(dict_)
assert_equal(
np.array(loaded_sequence), np.array(self.spliced_sequence))


class TestMIPSequence(object):

def setup(self):
self.tiff_seq = sima.Sequence.create('TIFF', example_tiff(), 2, 2)
self.mip_sequence = sima.sequence._MIPSequence(self.tiff_seq)

def test_init(self):
assert_equal(len(self.mip_sequence), len(self.tiff_seq))
assert_equal(self.mip_sequence.shape[1], 1)
assert_equal(
np.nanmax(self.tiff_seq._get_frame(1), 0, keepdims=True),
self.mip_sequence._get_frame(1))

def test_to_dict(self):
dict_ = self.mip_sequence._todict()
loaded_sequence = dict_.pop('__class__')._from_dict(dict_)
assert_equal(np.array(loaded_sequence), np.array(self.mip_sequence))


class TestNaNMeanSequence(object):

def setup(self):
self.tiff_seq = sima.Sequence.create('TIFF', example_tiff(), 2, 2)
self.mean_sequence = sima.sequence._NaNMeanSequence(self.tiff_seq)

def test_init(self):
assert_equal(len(self.mean_sequence), len(self.tiff_seq))
assert_equal(self.mean_sequence.shape[1], 1)
assert_equal(
np.nanmean(self.tiff_seq._get_frame(1), 0, keepdims=True),
self.mean_sequence._get_frame(1))

def test_to_dict(self):
dict_ = self.mean_sequence._todict()
loaded_sequence = dict_.pop('__class__')._from_dict(dict_)
assert_equal(np.array(loaded_sequence), np.array(self.mean_sequence))


if __name__ == "__main__":
run_module_suite()