diff --git a/.travis.yml b/.travis.yml index e857fccc..94e5048f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -32,6 +32,6 @@ install: script: - export SPARK_HOME=`pwd`/spark-1.6.0-bin-hadoop2.6 - export PYTHONPATH=$SPARK_HOME/python:$PYTHONPATH - - export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.9-src.zip:$PYTHONPATH + - export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH - py.test - py.test --engine=spark diff --git a/README.md b/README.md index ed818ee5..2d1bebfa 100644 --- a/README.md +++ b/README.md @@ -3,8 +3,6 @@ [![Latest Version](https://img.shields.io/pypi/v/thunder-python.svg?style=flat-square)](https://pypi.python.org/pypi/thunder-python) [![Build Status](https://img.shields.io/travis/thunder-project/thunder/master.svg?style=flat-square)](https://travis-ci.org/thunder-project/thunder) [![Gitter](https://img.shields.io/gitter/room/thunder-project/thunder.svg?style=flat-square)](https://gitter.im/thunder-project/thunder) -[![Binder](https://img.shields.io/badge/launch-binder-red.svg?style=flat-square)](http://mybinder.org/repo/thunder-project/thunder-docs) - > scalable analysis of image and time series analysis in python @@ -85,6 +83,6 @@ Once you have a running cluster with a valid `SparkContext` — this is created Thunder is a community effort! The codebase so far is due to the excellent work of the following individuals: -> Andrew Osheroff, Ben Poole, Chris Stock, Davis Bennett, Jascha Swisher, Jason Wittenbach, Jeremy Freeman, Josh Rosen, Kunal Lillaney, Logan Grosenick, Matt Conlen, Michael Broxton, Noah Young, Ognen Duzlevski, Richard Hofer, Owen Kahn, Ted Fujimoto, Tom Sainsbury, Uri Laseron, W J Liddy +> Andrew Osheroff, Ben Poole, Chris Stock, Davis Bennett, Jascha Swisher, Jason Wittenbach, Jeremy Freeman, Josh Rosen, Kunal Lillaney, Logan Grosenick, Matt Conlen, Michael Broxton, Noah Young, Ognen Duzlevski, Richard Hofer, Owen Kahn, Ted Fujimoto, Tom Sainsbury, Uri Laseron If you run into a problem, have a feature request, or want to contribute, submit an issue or a pull request, or come talk to us in the [chatroom](https://gitter.im/thunder-project/thunder)! diff --git a/requirements.txt b/requirements.txt index f2c25f15..910084f5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,4 +2,4 @@ numpy scipy scikit-image boto -bolt-python >= 0.7.0 +bolt-python >= 0.5.1 diff --git a/setup.py b/setup.py index b659ebf8..1a121a28 100755 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ from setuptools import setup -version = '1.2.0' +version = '1.0.0' setup( name='thunder-python', diff --git a/test/test_base.py b/test/test_base.py index 850d6f4a..787d74c3 100644 --- a/test/test_base.py +++ b/test/test_base.py @@ -1,5 +1,5 @@ import pytest -from numpy import allclose, array, asarray, add, ndarray, generic +from numpy import allclose, array, asarray, add from thunder import series, images @@ -101,25 +101,4 @@ def test_map_with_keys(eng): data = images.fromlist([array([[1, 1], [1, 1]]), array([[2, 2], [2, 2]])], engine=eng) mapped = data.map(lambda kv: kv[0] + kv[1], with_keys=True) assert allclose(mapped.shape, [2, 2, 2]) - assert allclose(mapped.toarray(), [[[1, 1], [1, 1]], [[3, 3], [3, 3]]]) - - -def test_repartition(eng): - if eng is not None: - data = images.fromlist([array([1, 1]), array([2, 2]), array([3, 3]), array([4, 4]), - array([5, 5]), array([6, 6]), array([7, 7]), array([8, 8]), - array([9, 9]), array([10, 10]), array([11, 11]), array([12, 12])], - engine=eng, npartitions=10) - assert allclose(data.first(), array([1, 1])) - assert isinstance(data.first(), (ndarray, generic)) - data = data.repartition(3) - assert allclose(data.first(), array([1, 1])) - - data = series.fromlist([array([1, 1]), array([2, 2]), array([3, 3]), array([4, 4]), - array([5, 5]), array([6, 6]), array([7, 7]), array([8, 8]), - array([9, 9]), array([10, 10]), array([11, 11]), array([12, 12])], - engine=eng, npartitions=10) - assert allclose(data.first(), array([1, 1])) - data = data.repartition(3) - assert allclose(data.first(), array([1, 1])) - assert isinstance(data.first(), (ndarray, generic)) + assert allclose(mapped.toarray(), [[[1, 1], [1, 1]], [[3, 3], [3, 3]]]) \ No newline at end of file diff --git a/test/test_blocks.py b/test/test_blocks.py index 3504453b..4fa6cbe5 100644 --- a/test/test_blocks.py +++ b/test/test_blocks.py @@ -1,55 +1,33 @@ import pytest -from numpy import arange, array, allclose, ones, float64, asarray +from numpy import arange, array, allclose, ones from thunder.images.readers import fromlist pytestmark = pytest.mark.usefixtures("eng") - def test_conversion(eng): + if eng is None: + return a = arange(8).reshape((4, 2)) data = fromlist([a, a], engine=eng) - vals = data.toblocks((2, 2)).collect_blocks() + vals = data.toblocks((2, 2)).tordd().sortByKey().values().collect() truth = [array([a[0:2, 0:2], a[0:2, 0:2]]), array([a[2:4, 0:2], a[2:4, 0:2]])] assert allclose(vals, truth) def test_full(eng): + if eng is None: + return a = arange(8).reshape((4, 2)) data = fromlist([a, a], engine=eng) - vals = data.toblocks((4,2)).collect_blocks() + vals = data.toblocks((4, 2)).tordd().values().collect() truth = [a, a] assert allclose(vals, truth) -def test_blocksize(eng): - a = arange(100*100, dtype='int16').reshape((100, 100)) - data = fromlist(10*[a], engine=eng) - - blocks = data.toblocks((5, 5)) - assert blocks.blockshape == (10, 5, 5) - - blocks = data.toblocks('1') - assert blocks.blockshape == (10, 5, 100) - - -def test_padding(eng): - a = arange(30).reshape((5, 6)) - data = fromlist([a, a], engine=eng) - - blocks = data.toblocks((2, 3), padding=(1, 1)) - vals = blocks.collect_blocks() - shapes = list(map(lambda x: x.shape, vals)) - truth = [(2, 3, 4), (2, 3, 4), (2, 4, 4), (2, 4, 4), (2, 2, 4), (2, 2, 4)] - assert allclose(array(shapes), array(truth)) - - truth = data.toarray() - assert allclose(data.toblocks((2, 3), padding=1).toarray(), truth) - assert allclose(data.toblocks((2, 3), padding=(0, 1)).toarray(), truth) - assert allclose(data.toblocks((2, 3), padding=(1, 1)).toarray(), truth) - - def test_count(eng): + if eng is None: + return a = arange(8).reshape((2, 4)) data = fromlist([a], engine=eng) assert data.toblocks((1, 1)).count() == 8 @@ -59,6 +37,8 @@ def test_count(eng): def test_conversion_series(eng): + if eng is None: + return a = arange(8).reshape((4, 2)) data = fromlist([a], engine=eng) vals = data.toblocks((1, 2)).toseries().toarray() @@ -66,6 +46,8 @@ def test_conversion_series(eng): def test_conversion_series_3d(eng): + if eng is None: + return a = arange(24).reshape((2, 3, 4)) data = fromlist([a], engine=eng) vals = data.toblocks((2, 3, 4)).toseries().toarray() @@ -73,6 +55,8 @@ def test_conversion_series_3d(eng): def test_roundtrip(eng): + if eng is None: + return a = arange(8).reshape((4, 2)) data = fromlist([a, a], engine=eng) vals = data.toblocks((2, 2)).toimages() @@ -80,6 +64,8 @@ def test_roundtrip(eng): def test_series_roundtrip_simple(eng): + if eng is None: + return a = arange(8).reshape((4, 2)) data = fromlist([a, a], engine=eng) vals = data.toseries().toimages() @@ -87,26 +73,19 @@ def test_series_roundtrip_simple(eng): def test_shape(eng): + if eng is None: + return data = fromlist([ones((30, 30)) for _ in range(0, 3)], engine=eng) blocks = data.toblocks((10, 10)) - values = blocks.collect_blocks() + values = [v for k, v in blocks.tordd().collect()] assert blocks.blockshape == (3, 10, 10) assert all([v.shape == (3, 10, 10) for v in values]) - -def test_map(eng): - a = arange(8).reshape((4, 2)) - data = fromlist([a, a], engine=eng) - map1 = data.toblocks((4, 2)).map(lambda x: 1.0 * x, dtype=float64).toimages() - map2 = data.toblocks((4, 2)).map(lambda x: 1.0 * x).toimages() - assert map1.dtype == float64 - assert map2.dtype == float64 - -def test_map_generic(eng): - a = arange(3*4).reshape((3, 4)) - data = fromlist([a, a], engine=eng) - b = asarray(data.toblocks((2, 2)).map_generic(lambda x: [0, 1])) - assert b.shape == (2, 2) - assert b.dtype == object - truth = [v == [0, 1] for v in b.flatten()] - assert all(truth) +def test_local_mode(eng): + a = arange(64).reshape((8, 8)) + data = fromlist([a, a]) + if data.mode == 'local': + blocks = data.toblocks((4, 4)) + assert allclose(blocks.values, data.values) + assert blocks.count() == 1 + assert blocks.blockshape == (2, 8, 8) diff --git a/test/test_images.py b/test/test_images.py index e702d3e1..eaea5f0b 100644 --- a/test/test_images.py +++ b/test/test_images.py @@ -1,5 +1,5 @@ import pytest -from numpy import arange, allclose, array, mean, apply_along_axis, float64 +from numpy import arange, allclose, array, mean, apply_along_axis from thunder.images.readers import fromlist, fromarray from thunder.images.images import Images @@ -12,8 +12,6 @@ def test_map(eng): data = fromlist([arange(6).reshape((2, 3))], engine=eng) assert allclose(data.map(lambda x: x + 1).toarray(), [[1, 2, 3], [4, 5, 6]]) - assert data.map(lambda x: 1.0*x, dtype=float64).dtype == float64 - assert data.map(lambda x: 1.0*x).dtype == float64 def test_map_singleton(eng): @@ -188,15 +186,16 @@ def test_map_as_series(eng): def f(x): return x - mean(x) result = apply_along_axis(f, 0, data.toarray()) - size = (2, 2) - assert allclose(data.map_as_series(f, chunk_size=size).toarray(), result) - assert allclose(data.map_as_series(f, chunk_size=size, value_size=5).toarray(), result) + assert allclose(data.map_as_series(f).toarray(), result) + assert allclose(data.map_as_series(f, value_size=5).toarray(), result) + assert allclose(data.map_as_series(f, block_size=(2, 2)).toarray(), result) # function does change size of series def f(x): return x[:-1] result = apply_along_axis(f, 0, data.toarray()) - assert allclose(data.map_as_series(f, chunk_size=size).toarray(), result) - assert allclose(data.map_as_series(f, chunk_size=size, value_size=4).toarray(), result) + assert allclose(data.map_as_series(f).toarray(), result) + assert allclose(data.map_as_series(f, value_size=4).toarray(), result) + assert allclose(data.map_as_series(f, block_size=(2, 2)).toarray(), result) \ No newline at end of file diff --git a/test/test_images_io.py b/test/test_images_io.py index d7c4ca47..815ef2c1 100644 --- a/test/test_images_io.py +++ b/test/test_images_io.py @@ -16,7 +16,7 @@ def test_from_list(eng): a = arange(8).reshape((2, 4)) data = fromlist([a], engine=eng) assert allclose(data.shape, (1,) + a.shape) - assert allclose(data.value_shape, a.shape) + assert allclose(data.dims, a.shape) assert allclose(data.toarray(), a) @@ -24,7 +24,7 @@ def test_from_array(eng): a = arange(8).reshape((1, 2, 4)) data = fromarray(a, engine=eng) assert allclose(data.shape, a.shape) - assert allclose(data.value_shape, a.shape[1:]) + assert allclose(data.dims, a.shape[1:]) assert allclose(data.toarray(), a) @@ -36,7 +36,7 @@ def test_from_array_bolt(eng): b = barray(a) data = fromarray(b) assert allclose(data.shape, a.shape) - assert allclose(data.value_shape, a.shape[1:]) + assert allclose(data.dims, a.shape[1:]) assert allclose(data.toarray(), a) @@ -44,7 +44,7 @@ def test_from_array_single(eng): a = arange(8).reshape((2, 4)) data = fromarray(a, engine=eng) assert allclose(data.shape, (1,) + a.shape) - assert allclose(data.value_shape, a.shape) + assert allclose(data.dims, a.shape) assert allclose(data.toarray(), a) @@ -114,16 +114,6 @@ def test_from_tif_multi_planes(eng): assert [x.sum() for x in data.toarray()] == [1140006, 1119161, 1098917] -def test_from_tif_multi_planes_discard_extra(eng): - path = os.path.join(resources, 'multilayer_tif', 'dotdotdot_lzw.tif') - data = fromtif(path, nplanes=2, engine=eng, discard_extra=True) - assert data.shape[0] == 1 - assert data.shape[1] == 2 - with pytest.raises(BaseException) as error_msg: - data = fromtif(path, nplanes=2, engine=eng, discard_extra=False) - assert 'nplanes' in str(error_msg.value) - - def test_from_tif_multi_planes_many(eng): path = os.path.join(resources, 'multilayer_tif', 'dotdotdot_lzw*.tif') data = fromtif(path, nplanes=3, engine=eng) diff --git a/test/test_series.py b/test/test_series.py index c197bc34..97914ef4 100644 --- a/test/test_series.py +++ b/test/test_series.py @@ -1,5 +1,5 @@ import pytest -from numpy import allclose, arange, array, asarray, dot, cov, corrcoef, float64 +from numpy import allclose, arange, array, asarray, dot, cov, corrcoef from thunder.series.readers import fromlist, fromarray from thunder.images.readers import fromlist as img_fromlist @@ -10,8 +10,6 @@ def test_map(eng): data = fromlist([array([1, 2]), array([3, 4])], engine=eng) assert allclose(data.map(lambda x: x + 1).toarray(), [[2, 3], [4, 5]]) - assert data.map(lambda x: 1.0*x, dtype=float64).dtype == float64 - assert data.map(lambda x: 1.0*x).dtype == float64 def test_map_singletons(eng): diff --git a/test/test_series_io.py b/test/test_series_io.py index f7191152..8926f9c6 100644 --- a/test/test_series_io.py +++ b/test/test_series_io.py @@ -115,7 +115,7 @@ def test_to_binary_roundtrip(tmpdir, eng): def test_to_binary_roundtrip_partitioned(tmpdir, eng): a = arange(8, dtype='int16').reshape((4, 2)) p = str(tmpdir) + '/data' - data = fromarray([a, a], npartitions=4, engine=eng) + data = fromarray([a, a], npartitions=2, engine=eng) data.tobinary(p) loaded = frombinary(p) assert allclose(data.toarray(), loaded.toarray()) @@ -137,4 +137,4 @@ def test_from_example(eng): data = fromexample('mouse', engine=eng) assert allclose(data.toarray().shape, (64, 64, 20)) data = fromexample('iris', engine=eng) - assert allclose(data.toarray().shape, (150, 4)) + assert allclose(data.toarray().shape, (150, 4)) \ No newline at end of file diff --git a/thunder/__init__.py b/thunder/__init__.py index 0d70f22a..6b95c7ff 100644 --- a/thunder/__init__.py +++ b/thunder/__init__.py @@ -12,4 +12,4 @@ def _setup(): _setup() -__version__ = '1.2.0' \ No newline at end of file +__version__ = '1.0.0' \ No newline at end of file diff --git a/thunder/base.py b/thunder/base.py index a6beb746..af6d3bfd 100644 --- a/thunder/base.py +++ b/thunder/base.py @@ -1,6 +1,6 @@ from numpy import array, asarray, ndarray, prod, ufunc, add, subtract, \ - multiply, divide, isscalar, newaxis, unravel_index, dtype -from bolt.utils import inshape, tupleize, slicify + multiply, divide, isscalar, newaxis, unravel_index, argsort +from bolt.utils import inshape, tupleize from bolt.base import BoltArray from bolt.spark.array import BoltArraySpark from bolt.spark.chunk import ChunkedArray @@ -76,7 +76,7 @@ def _constructor(self): @property def dtype(self): - return dtype(self._values.dtype) + return self._values.dtype @property def shape(self): @@ -175,7 +175,8 @@ def repartition(self, npartitions): Number of partitions after repartitions. """ if self.mode == 'spark': - return self._constructor(self.values.repartition(npartitions)).__finalize__(self) + self.values._rdd = self.values._rdd.repartition(npartitions) + return self else: notsupported(self.mode) @@ -194,11 +195,11 @@ class Data(Base): _attributes = Base._attributes + ['labels'] def __getitem__(self, item): - # handle values -- convert ints to slices so no dimensions are dropped + # handle values if isinstance(item, int): - item = tuple([slicify(item, self.shape[0])]) + item = slice(item, item+1, None) if isinstance(item, tuple): - item = tuple([slicify(i, n) if isinstance(i, int) else i for i, n in zip(item, self.shape[:len(item)])]) + item = tuple([slice(i, i+1, None) if isinstance(i, int) else i for i in item]) if isinstance(item, (list, ndarray)): item = (item,) new = self._values.__getitem__(item) @@ -230,10 +231,6 @@ def baseaxes(self): def baseshape(self): return self.shape[:len(self.baseaxes)] - @property - def value_shape(self): - return self.shape[len(self.baseaxes):] - @property def labels(self): return self._labels @@ -406,7 +403,7 @@ def filter(self, func): return self._constructor(filtered, labels=newlabels).__finalize__(self, noprop=('labels',)) - def map(self, func, value_shape=None, dtype=None, with_keys=False): + def _map(self, func, axis=(0,), value_shape=None, dtype=None, with_keys=False): """ Apply an array -> array function across an axis. @@ -433,8 +430,6 @@ def map(self, func, value_shape=None, dtype=None, with_keys=False): with_keys : bool, optional, default=False Include keys as an argument to the function """ - axis = self.baseaxes - if self.mode == 'local': axes = sorted(tupleize(axis)) key_shape = [self.shape[axis] for axis in axes] @@ -542,7 +537,7 @@ def func(record): return k1, op(x, y) rdd = self.tordd().zip(other.tordd()).map(func) - barray = BoltArraySpark(rdd, shape=self.shape, dtype=self.dtype, split=self.values.split) + barray = BoltArraySpark(rdd, shape=self.shape, dtype=self.dtype) return self._constructor(barray).__finalize__(self) def plus(self, other): diff --git a/thunder/blocks/__init__.py b/thunder/blocks/__init__.py index ced07427..1931e766 100644 --- a/thunder/blocks/__init__.py +++ b/thunder/blocks/__init__.py @@ -1,2 +1,2 @@ """Modules related to subdividing Images into smaller contiguous blocks -""" +""" \ No newline at end of file diff --git a/thunder/blocks/blocks.py b/thunder/blocks/blocks.py index 0c9f65aa..8e4abd08 100644 --- a/thunder/blocks/blocks.py +++ b/thunder/blocks/blocks.py @@ -1,5 +1,3 @@ -from numpy import prod, rollaxis - from ..base import Base import logging @@ -10,7 +8,7 @@ class Blocks(Base): Subclasses of Blocks will be returned by an images.toBlocks() call. """ - _metadata = Base._metadata + ['blockshape', 'padding'] + _metadata = Base._metadata + ['blockshape'] def __init__(self, values): super(Blocks, self).__init__(values) @@ -21,11 +19,11 @@ def _constructor(self): @property def blockshape(self): - return tuple(self.values.plan) + if self.mode == 'spark': + return tuple(self.values.plan) - @property - def padding(self): - return tuple(self.values.padding) + if self.mode == 'local': + return tuple(self.values.shape) def count(self): """ @@ -37,31 +35,23 @@ def count(self): return self.tordd().count() if self.mode == 'local': - return prod(self.values.values.shape) + return 1 - def collect_blocks(self): + def map(self, func, dims=None, dtype=None): """ - Collect the blocks in a list + Apply an array -> array function to each block """ if self.mode == 'spark': - return self.values.tordd().sortByKey().values().collect() + mapped = self.values.map(func, value_shape=dims, dtype=dtype) if self.mode == 'local': - return self.values.values.flatten().tolist() + if dims is not None: + logger = logging.getLogger('thunder') + logger.warn("dims has no meaning in Blocks.map in local mode") + mapped = func(self.values) - def map(self, func, value_shape=None, dtype=None): - """ - Apply an array -> array function to each block - """ - mapped = self.values.map(func, value_shape=value_shape, dtype=dtype) return self._constructor(mapped).__finalize__(self, noprop=('dtype',)) - def map_generic(self, func): - """ - Apply an arbitrary array -> object function to each blocks. - """ - return self.values.map_generic(func)[0] - def first(self): """ Return the first element. @@ -70,7 +60,7 @@ def first(self): return self.values.tordd().values().first() if self.mode == 'local': - return self.values.first + return self.values def toimages(self): """ @@ -82,7 +72,7 @@ def toimages(self): values = self.values.values_to_keys((0,)).unchunk() if self.mode == 'local': - values = self.values.unchunk() + values = self.values return Images(values) @@ -96,17 +86,7 @@ def toseries(self): values = self.values.values_to_keys(tuple(range(1, len(self.shape)))).unchunk() if self.mode == 'local': - values = self.values.unchunk() - values = rollaxis(values, 0, values.ndim) + n = len(self.shape) - 1 + values = self.values.transpose(tuple(range(1, n+1)) + (0,)) return Series(values) - - def toarray(self): - """ - Convert blocks to local ndarray - """ - if self.mode == 'spark': - return self.values.unchunk().toarray() - - if self.mode == 'local': - return self.values.unchunk() diff --git a/thunder/blocks/local.py b/thunder/blocks/local.py deleted file mode 100644 index 6d8f0d13..00000000 --- a/thunder/blocks/local.py +++ /dev/null @@ -1,259 +0,0 @@ -from numpy import arange, r_, empty, zeros, random, where, prod, array, asarray, floor -from itertools import product -from bolt.utils import allstack - -from ..base import Base -import logging - - -class LocalChunks: - """ - Simple helper class for a local chunked ndarray. - """ - - def __init__(self, values, shape, plan, dtype=None, padding=None): - """ - Create a chunked ndarray. - - Parameters - ---------- - values : ndarray (dtype 'object') or ndarrays - Array containing all the chunks. - - shape : tuple - Shape of the full unchunked array. - - plan : tuple - Shape of each chunk (excluding edge effects). - - dtype : numpy dtype, optional, default = None - dtype of chunks. If not given, will be inferred from first chunk. - - padding: tuple, optional, default = None - amount of padding in each dimension to include in each block - """ - self.values = values - self.shape = shape - self.plan = plan - self.dtype = dtype - self.padding = padding - - if self.dtype is None: - self.dtype = self.first.dtype - - if self.padding is None: - self.padding = len(self.shape)*(0,) - - @property - def first(self): - """ - First chunk - """ - return self.values[tuple(zeros(len(self.values.shape)))] - - def unchunk(self): - """ - Reconstitute the chunked array back into a full ndarray. - - Returns - ------- - ndarray - """ - if self.padding != len(self.shape)*(0,): - shape = self.values.shape - arr = empty(shape, dtype=object) - for inds in product(*[arange(s) for s in shape]): - slices = [] - for i, p, n in zip(inds, self.padding, shape): - start = None if (i == 0 or p == 0) else p - stop = None if (i == n-1 or p == 0) else -p - slices.append(slice(start, stop, None)) - arr[inds] = self.values[inds][tuple(slices)] - else: - arr = self.values - - return allstack(arr.tolist()) - - def map(self, func, value_shape=None, dtype=None): - - if value_shape is None or dtype is None: - # try to compute the size of each mapped element by applying func to a random array - try: - mapped = func(random.randn(*self.plan).astype(self.dtype)) - # if this fails, try to use the first block instead - except Exception: - mapped = func(self.first) - if value_shape is None: - value_shape = mapped.shape - if dtype is None: - dtype = mapped.dtype - - blocked_dims = where(array(self.plan) != array(self.shape))[0] - unblocked_dims = where(array(self.plan) == array(self.shape))[0] - - # check that no dimensions are dropped - if len(value_shape) != len(self.plan): - raise NotImplementedError('map on chunked array cannot drop dimensions') - - # check that chunked dimensions did not change shape - if any([value_shape[i] != self.plan[i] for i in blocked_dims]): - raise ValueError('map cannot change the sizes of chunked dimensions') - - newshape = [value_shape[i] if i in unblocked_dims else self.shape[i] for i in range(self.values.ndim)] - newshape = tuple(newshape) - - c = prod(self.values.shape) - mapped = empty(c, dtype=object) - for i, a in enumerate(self.values.flatten()): - mapped[i] = func(a) - return LocalChunks(mapped.reshape(self.values.shape), newshape, value_shape, dtype) - - def map_generic(self, func): - - shape = self.values.shape - n = prod(shape) - arr = empty(n, dtype=object) - for i, x in enumerate(self.values.flatten()): - arr[i] = func(x) - return arr.reshape(shape) - - @staticmethod - def chunk(arr, chunk_size="150", padding=None): - """ - Created a chunked array from a full array and a chunk size. - - Parameters - ---------- - array : ndarray - Array that will be broken into chunks - - chunk_size : string or tuple, default = '150' - Size of each image chunk. - If a str, size of memory footprint in KB. - If a tuple, then the dimensions of each chunk. - If an int, then all dimensions will use this number - - padding : tuple or int - Amount of padding along each dimensions for chunks. If an int, then - the same amount of padding is used for all dimensions - - Returns - ------- - LocalChunks - """ - - plan, _ = LocalChunks.getplan(chunk_size, arr.shape[1:], arr.dtype) - plan = r_[arr.shape[0], plan] - - if padding is None: - pad = arr.ndim*(0,) - elif isinstance(padding, int): - pad = (0,) + (arr.ndim-1)*(padding,) - else: - pad = (0,) + padding - - shape = arr.shape - - if any([x + y > z for x, y, z in zip(plan, pad, shape)]): - raise ValueError("Chunk sizes %s plus padding sizes %s cannot exceed value dimensions %s along any axis" - % (tuple(plan), tuple(pad), tuple(shape))) - - if any([x > y for x, y in zip(pad, plan)]): - raise ValueError("Padding sizes %s cannot exceed chunk sizes %s along any axis" - % (tuple(pad), tuple(plan))) - - def rectify(x): - x[x<0] = 0 - return x - - breaks = [r_[arange(0, n, s), n] for n, s in zip(shape, plan)] - limits = [zip(rectify(b[:-1]-p), b[1:]+p) for b, p in zip(breaks, pad)] - slices = product(*[[slice(x[0], x[1]) for x in l] for l in limits]) - vals = [arr[s] for s in slices] - newarr = empty(len(vals), dtype=object) - for i in range(len(vals)): - newarr[i] = vals[i] - newsize = [b.shape[0]-1 for b in breaks] - newarr = newarr.reshape(*newsize) - return LocalChunks(newarr, shape, plan, dtype=arr.dtype, padding=pad) - - @staticmethod - def getplan(size, shape, dtype, axes=None, padding=None): - """ - Identify a plan for chunking values along each dimension. - - Generates an ndarray with the size (in number of elements) of chunks - in each dimension. If provided, will estimate chunks for only a - subset of axes, leaving all others to the full size of the axis. - - Parameters - ---------- - size : string or tuple - If str, the average size (in KB) of the chunks in all value dimensions. - If int/tuple, an explicit specification of the number chunks in - each moving value dimension. - - axes : tuple, optional, default=None - One or more axes to estimate chunks for, if provided any - other axes will use one chunk. - - padding : tuple or int, option, default=None - Size over overlapping padding between chunks in each dimension. - If tuple, specifies padding along each chunked dimension; if int, - all dimensions use same padding; if None, no padding - """ - from numpy import dtype as gettype - - # initialize with all elements in one chunk - plan = asarray(shape) - - # check for subset of axes - if axes is None: - if isinstance(size, str): - axes = arange(len(shape)) - else: - axes = arange(len(size)) - else: - axes = asarray(axes, 'int') - - # set padding - pad = array(len(shape)*[0, ]) - if padding is not None: - pad[axes] = padding - - # set the plan - if isinstance(size, tuple): - plan[axes] = size - - elif isinstance(size, str): - # convert from kilobytes - size = 1000.0 * float(size) - - # calculate from dtype - elsize = gettype(dtype).itemsize - nelements = prod(shape) - dims = plan[axes] - - if size <= elsize: - s = ones(len(axes)) - - else: - remsize = 1.0 * nelements * elsize - s = [] - for (i, d) in enumerate(dims): - minsize = remsize/d - if minsize >= size: - s.append(1) - remsize = minsize - continue - else: - s.append(min(d, floor(size/minsize))) - s[i+1:] = plan[i+1:] - break - - plan[axes] = s - - else: - raise ValueError("Chunk size not understood, must be tuple or int") - - return plan, pad diff --git a/thunder/images/images.py b/thunder/images/images.py index 515e8610..b1879988 100644 --- a/thunder/images/images.py +++ b/thunder/images/images.py @@ -1,7 +1,6 @@ import logging from numpy import ndarray, arange, amax, amin, size, asarray, random, prod, \ apply_along_axis -from itertools import product from ..base import Data @@ -35,6 +34,10 @@ def baseaxes(self): def _constructor(self): return Images + @property + def dims(self): + return self.shape[1:] + def count(self): """ Count the number of images. @@ -55,40 +58,33 @@ def first(self): return self.values[0] if self.mode == 'spark': - return self.values.first().toarray() + return self.values.tordd().values().first() - def toblocks(self, chunk_size='auto', padding=None): + def toblocks(self, size='150'): """ Convert to blocks which represent subdivisions of the images data. Parameters ---------- - chunk_size : str or tuple, size of image chunk used during conversion, default = 'auto' - String interpreted as memory size (in kilobytes, e.g. '64'). - The exception is the string 'auto'. In spark mode, 'auto' will choose a chunk size to make the - resulting blocks ~100 MB in size. In local mode, 'auto' will create a single block. - Tuple of ints interpreted as 'pixels per dimension'. - - padding : tuple or int - Amount of padding along each dimensions for blocks. If an int, then - the same amount of padding is used for all dimensions + size : str, or tuple of block size per dimension, + String interpreted as memory size (in megabytes, e.g. '64'). + Tuple of ints interpreted as 'pixels per dimension'. + Only valid in spark mode. """ from thunder.blocks.blocks import Blocks - from thunder.blocks.local import LocalChunks if self.mode == 'spark': - if chunk_size is 'auto': - chunk_size = str(int(100000.0/self.shape[0])) - chunks = self.values.chunk(chunk_size, padding=padding).keys_to_values((0,)) + blocks = self.values.chunk(size).keys_to_values((0,)) if self.mode == 'local': - if chunk_size is 'auto': - chunk_size = self.shape[1:] - chunks = LocalChunks.chunk(self.values, chunk_size, padding=padding) + if size != '150': + logger = logging.getLogger('thunder') + logger.warn('size has no meaning in Images.toblocks in local mode') + blocks = self.values - return Blocks(chunks) + return Blocks(blocks) - def toseries(self, chunk_size='auto'): + def toseries(self, size='150'): """ Converts to series data. @@ -96,22 +92,16 @@ def toseries(self, chunk_size='auto'): Parameters ---------- - chunk_size : str or tuple, size of image chunk used during conversion, default = 'auto' - String interpreted as memory size (in kilobytes, e.g. '64'). - The exception is the string 'auto', which will choose a chunk size to make the - resulting blocks ~100 MB in size. Tuple of ints interpreted as 'pixels per dimension'. - Only valid in spark mode. + size : string memory size, optional, default = '150M' + String interpreted as memory size (e.g. '64M'). """ from thunder.series.series import Series - if chunk_size is 'auto': - chunk_size = str(int(100000.0/self.shape[0])) - n = len(self.shape) - 1 index = arange(self.shape[0]) if self.mode == 'spark': - return Series(self.values.swap((0,), tuple(range(n)), size=chunk_size), index=index) + return Series(self.values.swap((0,), tuple(range(n)), size=size), index=index) if self.mode == 'local': return Series(self.values.transpose(tuple(range(1, n+1)) + (0,)), index=index) @@ -181,6 +171,23 @@ def sample(self, nsamples=100, seed=None): return self._constructor(result) + def map(self, func, dims=None, with_keys=False): + """ + Map an array -> array function over each image. + + Parameters + ---------- + func : function + The function to apply in the map. + + dims : tuple, optional, default = None + If known, the dimensions of the data following function evaluation. + + with_keys : boolean, optional, default = False + If true, function should be of both tuple indices and values. + """ + return self._map(func, axis=0, value_shape=dims, with_keys=with_keys) + def reduce(self, func): """ Reduce a function over images. @@ -244,13 +251,13 @@ def max_projection(self, axis=2): axis : int, optional, default = 2 Which axis to compute projection along. """ - if axis >= size(self.value_shape): + if axis >= size(self.dims): raise Exception('Axis for projection (%s) exceeds ' - 'image dimensions (%s-%s)' % (axis, 0, size(self.value_shape)-1)) + 'image dimensions (%s-%s)' % (axis, 0, size(self.dims)-1)) - new_value_shape = list(self.value_shape) - del new_value_shape[axis] - return self.map(lambda x: amax(x, axis), value_shape=new_value_shape) + newdims = list(self.dims) + del newdims[axis] + return self.map(lambda x: amax(x, axis), dims=newdims) def max_min_projection(self, axis=2): """ @@ -263,13 +270,13 @@ def max_min_projection(self, axis=2): axis : int, optional, default = 2 Which axis to compute projection along. """ - if axis >= size(self.value_shape): + if axis >= size(self.dims): raise Exception('Axis for projection (%s) exceeds ' - 'image dimensions (%s-%s)' % (axis, 0, size(self.value_shape)-1)) + 'image dimensions (%s-%s)' % (axis, 0, size(self.dims)-1)) - new_value_shape = list(self.value_shape) - del new_value_shape[axis] - return self.map(lambda x: amax(x, axis) + amin(x, axis), value_shape=new_value_shape) + newdims = list(self.dims) + del newdims[axis] + return self.map(lambda x: amax(x, axis) + amin(x, axis), dims=newdims) def subsample(self, factor): """ @@ -278,12 +285,12 @@ def subsample(self, factor): Parameters ---------- factor : positive int or tuple of positive ints - Stride to use in subsampling. If a single int is passed, - each dimension of the image will be downsampled by this factor. + Stride to use in subsampling. If a single int is passed, + each dimension of the image will be downsampled by this factor. If a tuple is passed, each dimension will be downsampled by the given factor. """ - value_shape = self.value_shape - ndims = len(value_shape) + dims = self.dims + ndims = len(dims) if not hasattr(factor, '__len__'): factor = [factor] * ndims factor = [int(sf) for sf in factor] @@ -294,10 +301,10 @@ def subsample(self, factor): def roundup(a, b): return (a + b - 1) // b - slices = [slice(0, value_shape[i], factor[i]) for i in range(ndims)] - new_value_shape = tuple([roundup(value_shape[i], factor[i]) for i in range(ndims)]) + slices = [slice(0, dims[i], factor[i]) for i in range(ndims)] + newdims = tuple([roundup(dims[i], factor[i]) for i in range(ndims)]) - return self.map(lambda v: v[slices], value_shape=new_value_shape) + return self.map(lambda v: v[slices], dims=newdims) def gaussian_filter(self, sigma=2, order=0): """ @@ -308,17 +315,17 @@ def gaussian_filter(self, sigma=2, order=0): Parameters ---------- sigma : scalar or sequence of scalars, default = 2 - Size of the filter size as standard deviation in pixels. - A sequence is interpreted as the standard deviation for each axis. + Size of the filter size as standard deviation in pixels. + A sequence is interpreted as the standard deviation for each axis. A single scalar is applied equally to all axes. order : choice of 0 / 1 / 2 / 3 or sequence from same set, optional, default = 0 - Order of the gaussian kernel, 0 is a gaussian, + Order of the gaussian kernel, 0 is a gaussian, higher numbers correspond to derivatives of a gaussian. """ from scipy.ndimage.filters import gaussian_filter - return self.map(lambda v: gaussian_filter(v, sigma, order), value_shape=self.value_shape) + return self.map(lambda v: gaussian_filter(v, sigma, order), dims=self.dims) def uniform_filter(self, size=2): """ @@ -329,8 +336,8 @@ def uniform_filter(self, size=2): Parameters ---------- size: int, optional, default = 2 - Size of the filter neighbourhood in pixels. - A sequence is interpreted as the neighborhood size for each axis. + Size of the filter neighbourhood in pixels. + A sequence is interpreted as the neighborhood size for each axis. A single scalar is applied equally to all axes. """ return self._image_filter(filter='uniform', size=size) @@ -344,8 +351,8 @@ def median_filter(self, size=2): parameters ---------- size: int, optional, default = 2 - Size of the filter neighbourhood in pixels. - A sequence is interpreted as the neighborhood size for each axis. + Size of the filter neighbourhood in pixels. + A sequence is interpreted as the neighborhood size for each axis. A single scalar is applied equally to all axes. """ return self._image_filter(filter='median', size=size) @@ -373,8 +380,8 @@ def _image_filter(self, filter=None, size=2): func = FILTERS[filter] mode = self.mode - value_shape = self.value_shape - ndims = len(value_shape) + dims = self.dims + ndims = len(dims) if ndims == 3 and isscalar(size) == 1: size = [size, size, size] @@ -385,13 +392,13 @@ def filter_(im): im.setflags(write=True) else: im = im.copy() - for z in arange(0, value_shape[2]): + for z in arange(0, dims[2]): im[:, :, z] = func(im[:, :, z], size[0:2]) return im else: filter_ = lambda x: func(x, size) - return self.map(lambda v: filter_(v), value_shape=self.value_shape) + return self.map(lambda v: filter_(v), dims=self.dims) def localcorr(self, size=2): """ @@ -443,11 +450,11 @@ def subtract(self, val): Value to subtract. """ if isinstance(val, ndarray): - if val.shape != self.value_shape: + if val.shape != self.dims: raise Exception('Cannot subtract image with dimensions %s ' - 'from images with dimension %s' % (str(val.shape), str(self.value_shape))) + 'from images with dimension %s' % (str(val.shape), str(self.dims))) - return self.map(lambda x: x - val, value_shape=self.value_shape) + return self.map(lambda x: x - val, dims=self.dims) def topng(self, path, prefix='image', overwrite=False): """ @@ -513,7 +520,7 @@ def tobinary(self, path, prefix='image', overwrite=False): from thunder.images.writers import tobinary tobinary(self, path, prefix=prefix, overwrite=overwrite) - def map_as_series(self, func, value_size=None, dtype=None, chunk_size='auto'): + def map_as_series(self, func, value_size=None, block_size='150'): """ Efficiently apply a function to images as series data. @@ -533,17 +540,11 @@ def map_as_series(self, func, value_size=None, dtype=None, chunk_size='auto'): func. If not supplied, will be automatically inferred for an extra computational cost. - dtype : str, optional, default = None - dtype of one-dimensional ndarray resulting from application of func. - If not supplied it will be automatically inferred for an extra computational cost. - - chunk_size : str or tuple, size of image chunk used during conversion, default = 'auto' - String interpreted as memory size (in kilobytes, e.g. '64'). - The exception is the string 'auto'. In spark mode, 'auto' will choose a chunk size to make the - resulting blocks ~100 MB in size. In local mode, 'auto' will create a single block. - Tuple of ints interpreted as 'pixels per dimension'. + block_size : str, or tuple of block size per dimension, optional, default = '150' + String interpreted as memory size (in megabytes e.g. '64'). Tuple of + ints interpreted as 'pixels per dimension'. """ - blocks = self.toblocks(chunk_size=chunk_size) + blocks = self.toblocks(size=block_size) if value_size is not None: dims = list(blocks.blockshape) @@ -554,4 +555,4 @@ def map_as_series(self, func, value_size=None, dtype=None, chunk_size='auto'): def f(block): return apply_along_axis(func, 0, block) - return blocks.map(f, value_shape=dims, dtype=dtype).toimages() + return blocks.map(f, dims=dims).toimages() diff --git a/thunder/images/readers.py b/thunder/images/readers.py old mode 100644 new mode 100755 index d2987459..e4e19354 --- a/thunder/images/readers.py +++ b/thunder/images/readers.py @@ -1,5 +1,4 @@ import itertools -import logging from io import BytesIO from numpy import frombuffer, prod, random, asarray, expand_dims @@ -7,7 +6,7 @@ spark = check_spark() -def fromrdd(rdd, dims=None, nrecords=None, dtype=None, labels=None, ordered=False): +def fromrdd(rdd, dims=None, nrecords=None, dtype=None, labels=None): """ Load images from a Spark RDD. @@ -31,9 +30,6 @@ def fromrdd(rdd, dims=None, nrecords=None, dtype=None, labels=None, ordered=Fals labels : array, optional, default = None Labels for records. If provided, should be one-dimensional. - - ordered : boolean, optional, default = False - Whether or not the rdd is ordered by key """ from .images import Images from bolt.spark.array import BoltArraySpark @@ -46,13 +42,7 @@ def fromrdd(rdd, dims=None, nrecords=None, dtype=None, labels=None, ordered=Fals if nrecords is None: nrecords = rdd.count() - def process_keys(record): - k, v = record - if isinstance(k, int): - k = (k,) - return k, v - - values = BoltArraySpark(rdd.map(process_keys), shape=(nrecords,) + tuple(dims), dtype=dtype, split=1, ordered=ordered) + values = BoltArraySpark(rdd, shape=(nrecords,) + tuple(dims), dtype=dtype, split=1) return Images(values, labels=labels) def fromarray(values, labels=None, npartitions=None, engine=None): @@ -110,7 +100,6 @@ def fromarray(values, labels=None, npartitions=None, engine=None): if not npartitions: npartitions = engine.defaultParallelism values = bolt.array(values, context=engine, npartitions=npartitions, axis=(0,)) - values._ordered = True return Images(values) return Images(values, labels=labels) @@ -149,7 +138,7 @@ def fromlist(items, accessor=None, keys=None, dims=None, dtype=None, labels=None rdd = engine.parallelize(items, npartitions) if accessor: rdd = rdd.mapValues(accessor) - return fromrdd(rdd, nrecords=nrecords, dims=dims, dtype=dtype, labels=labels, ordered=True) + return fromrdd(rdd, nrecords=nrecords, dims=dims, dtype=dtype, labels=labels) else: if accessor: @@ -211,7 +200,7 @@ def switch(record): data = data.values().zipWithIndex().map(switch) else: nrecords = reader.nfiles - return fromrdd(data, nrecords=nrecords, dims=dims, dtype=dtype, labels=labels, ordered=True) + return fromrdd(data, nrecords=nrecords, dims=dims, dtype=dtype, labels=labels) else: if accessor: @@ -288,8 +277,8 @@ def frombinary(path, shape=None, dtype=None, ext='bin', start=None, stop=None, r raise ValueError("Last dimension '%d' must be divisible by nplanes '%d'" % (shape[-1], nplanes)) - def getarray(idx_buffer_filename): - idx, buf, _ = idx_buffer_filename + def getarray(idxAndBuf): + idx, buf = idxAndBuf ary = frombuffer(buf, dtype=dtype, count=int(prod(shape))).reshape(shape, order=order) if nplanes is None: yield (idx,), ary @@ -299,17 +288,17 @@ def getarray(idx_buffer_filename): if shape[-1] % nplanes: npoints += 1 timepoint = 0 - last_plane = 0 - current_plane = 1 - while current_plane < ary.shape[-1]: - if current_plane % nplanes == 0: - slices = [slice(None)] * (ary.ndim - 1) + [slice(last_plane, current_plane)] + lastPlane = 0 + curPlane = 1 + while curPlane < ary.shape[-1]: + if curPlane % nplanes == 0: + slices = [slice(None)] * (ary.ndim - 1) + [slice(lastPlane, curPlane)] yield idx*npoints + timepoint, ary[slices].squeeze() timepoint += 1 - last_plane = current_plane - current_plane += 1 + lastPlane = curPlane + curPlane += 1 # yield remaining planes - slices = [slice(None)] * (ary.ndim - 1) + [slice(last_plane, ary.shape[-1])] + slices = [slice(None)] * (ary.ndim - 1) + [slice(lastPlane, ary.shape[-1])] yield (idx*npoints + timepoint,), ary[slices].squeeze() recount = False if nplanes is None else True @@ -320,7 +309,7 @@ def getarray(idx_buffer_filename): dims=newdims, dtype=dtype, labels=labels, recount=recount, engine=engine, credentials=credentials) -def fromtif(path, ext='tif', start=None, stop=None, recursive=False, nplanes=None, npartitions=None, labels=None, engine=None, credentials=None, discard_extra=False): +def fromtif(path, ext='tif', start=None, stop=None, recursive=False, nplanes=None, npartitions=None, labels=None, engine=None, credentials=None): """ Loads images from single or multi-page TIF files. @@ -343,6 +332,7 @@ def fromtif(path, ext='tif', start=None, stop=None, recursive=False, nplanes=Non nplanes : positive integer, optional, default = None If passed, will cause single files to be subdivided into nplanes separate images. + If nplanes does not evenly divide the page count, the remaining pages will be discarded. Otherwise, each file is taken to represent one image. npartitions : int, optional, default = None @@ -351,31 +341,20 @@ def fromtif(path, ext='tif', start=None, stop=None, recursive=False, nplanes=Non labels : array, optional, default = None Labels for records. If provided, should be one-dimensional. - - discard_extra : boolean, optional, default = False - If True and nplanes doesn't divide by the number of pages in a multi-page tiff, the reminder will - be discarded and a warning will be shown. If False, it will raise an error """ import skimage.external.tifffile as tifffile if nplanes is not None and nplanes <= 0: raise ValueError('nplanes must be positive if passed, got %d' % nplanes) - def getarray(idx_buffer_filename): - idx, buf, fname = idx_buffer_filename + def getarray(idxAndBuf): + idx, buf = idxAndBuf fbuf = BytesIO(buf) tfh = tifffile.TiffFile(fbuf) ary = tfh.asarray() pageCount = ary.shape[0] + pageCount = pageCount - pageCount % nplanes if nplanes is not None: - extra = pageCount % nplanes - if extra: - if discard_extra: - pageCount = pageCount - extra - logging.getLogger('thunder').warn('Ignored %d pages in file %s' % (extra, fname)) - else: - raise ValueError("nplanes '%d' does not evenly divide '%d in file %s'" % (nplanes, pageCount, - fname)) values = [ary[i:(i+nplanes)] for i in range(0, pageCount, nplanes)] else: values = [ary] @@ -423,8 +402,8 @@ def frompng(path, ext='png', start=None, stop=None, recursive=False, npartitions """ from scipy.misc import imread - def getarray(idx_buffer_filename): - idx, buf, _ = idx_buffer_filename + def getarray(idxAndBuf): + idx, buf = idxAndBuf fbuf = BytesIO(buf) yield (idx,), imread(fbuf) diff --git a/thunder/images/writers.py b/thunder/images/writers.py index 54636a17..6b54e748 100644 --- a/thunder/images/writers.py +++ b/thunder/images/writers.py @@ -9,10 +9,10 @@ def topng(images, path, prefix="image", overwrite=False, credentials=None): -------- thunder.data.images.topng """ - value_shape = images.value_shape - if not len(value_shape) in [2, 3]: + dims = images.dims + if not len(dims) in [2, 3]: raise ValueError("Only 2D or 3D images can be exported to png, " - "images are %d-dimensional." % len(value_shape)) + "images are %d-dimensional." % len(dims)) from scipy.misc import imsave from io import BytesIO @@ -36,10 +36,10 @@ def totif(images, path, prefix="image", overwrite=False, credentials=None): -------- thunder.data.images.totif """ - value_shape = images.value_shape - if not len(value_shape) in [2, 3]: + dims = images.dims + if not len(dims) in [2, 3]: raise ValueError("Only 2D or 3D images can be exported to tif, " - "images are %d-dimensional." % len(value_shape)) + "images are %d-dimensional." % len(dims)) from scipy.misc import imsave from io import BytesIO @@ -72,7 +72,7 @@ def tobuffer(kv): writer = get_parallel_writer(path)(path, overwrite=overwrite, credentials=credentials) images.foreach(lambda x: writer.write(tobuffer(x))) - config(path, list(images.value_shape), images.dtype, overwrite=overwrite) + config(path, list(images.dims), images.dtype, overwrite=overwrite) def config(path, shape, dtype, name="conf.json", overwrite=True, credentials=None): """ diff --git a/thunder/readers.py b/thunder/readers.py index b1e0f33a..552f008e 100644 --- a/thunder/readers.py +++ b/thunder/readers.py @@ -149,9 +149,9 @@ def read(self, path, ext=None, start=None, stop=None, recursive=False, npartitio if spark and isinstance(self.engine, spark): npartitions = min(npartitions, nfiles) if npartitions else nfiles rdd = self.engine.parallelize(enumerate(files), npartitions) - return rdd.map(lambda kv: (kv[0], readlocal(kv[1]), kv[1])) + return rdd.map(lambda kv: (kv[0], readlocal(kv[1]))) else: - return [(k, readlocal(v), v) for k, v in enumerate(files)] + return [(k, readlocal(v)) for k, v in enumerate(files)] class LocalFileReader(object): @@ -388,10 +388,10 @@ def getsplit(kvIter): raise NotImplementedError("No file reader implementation for URL scheme " + scheme) for kv in kvIter: - idx, keyname = kv - key = bucket.get_key(keyname) + idx, keyName = kv + key = bucket.get_key(keyName) buf = key.get_contents_as_string() - yield idx, buf, keyname + yield idx, buf npartitions = min(npartitions, self.nfiles) if npartitions else self.nfiles rdd = self.engine.parallelize(enumerate(keylist), npartitions) @@ -412,7 +412,7 @@ def getsplit(kv): idx, keyName = kv key = bucket.get_key(keyName) buf = key.get_contents_as_string() - return idx, buf, keyName + return idx, buf return [getsplit(kv) for kv in enumerate(keylist)] diff --git a/thunder/series/readers.py b/thunder/series/readers.py index 354b5614..35017cb9 100644 --- a/thunder/series/readers.py +++ b/thunder/series/readers.py @@ -10,7 +10,7 @@ spark = check_spark() -def fromrdd(rdd, nrecords=None, shape=None, index=None, labels=None, dtype=None, ordered=False): +def fromrdd(rdd, nrecords=None, shape=None, index=None, labels=None, dtype=None): """ Load series data from a Spark RDD. @@ -37,9 +37,6 @@ def fromrdd(rdd, nrecords=None, shape=None, index=None, labels=None, dtype=None, dtype : string, default = None Data numerical type (if provided will avoid check) - - ordered : boolean, optional, default = False - Whether or not the rdd is ordered by key """ from .series import Series from bolt.spark.array import BoltArraySpark @@ -53,22 +50,13 @@ def fromrdd(rdd, nrecords=None, shape=None, index=None, labels=None, dtype=None, if dtype is None: dtype = item.dtype - if nrecords is None and shape is not None: - nrecords = prod(shape[:-1]) - - if nrecords is None: + if shape is None or nrecords is None: nrecords = rdd.count() if shape is None: shape = (nrecords, asarray(index).shape[0]) - def process_keys(record): - k, v = record - if isinstance(k, int): - k = (k,) - return k, v - - values = BoltArraySpark(rdd.map(process_keys), shape=shape, dtype=dtype, split=len(shape)-1, ordered=ordered) + values = BoltArraySpark(rdd, shape=shape, dtype=dtype, split=len(shape)-1) return Series(values, index=index, labels=labels) def fromarray(values, index=None, labels=None, npartitions=None, engine=None): @@ -118,7 +106,6 @@ def fromarray(values, index=None, labels=None, npartitions=None, engine=None): if spark and isinstance(engine, spark): axis = tuple(range(values.ndim - 1)) values = bolt.array(values, context=engine, npartitions=npartitions, axis=axis) - values._ordered = True return Series(values, index=index) return Series(values, index=index, labels=labels) @@ -165,7 +152,7 @@ def fromlist(items, accessor=None, index=None, labels=None, dtype=None, npartiti rdd = engine.parallelize(items, npartitions) if accessor: rdd = rdd.mapValues(accessor) - return fromrdd(rdd, nrecords=nrecords, index=index, labels=labels, dtype=dtype, ordered=True) + return fromrdd(rdd, nrecords=nrecords, index=index, labels=labels, dtype=dtype) else: if accessor: @@ -231,7 +218,7 @@ def switch(record): return (idx,), ary rdd = data.zipWithIndex().map(switch) - return fromrdd(rdd, dtype=str(dtype), shape=shape, index=index, ordered=True) + return fromrdd(rdd, dtype=str(dtype), shape=shape, index=index) else: reader = get_parallel_reader(path)(engine, credentials=credentials) @@ -315,7 +302,7 @@ def switch(record): if not index: index = arange(shape[-1]) - return fromrdd(rdd, dtype=dtype, shape=shape, index=index, ordered=True) + return fromrdd(rdd, dtype=dtype, shape=shape, index=index) else: reader = get_parallel_reader(path)(engine, credentials=credentials) diff --git a/thunder/series/series.py b/thunder/series/series.py index 409828dc..15933697 100755 --- a/thunder/series/series.py +++ b/thunder/series/series.py @@ -113,7 +113,7 @@ def first(self): return self.values[tuple(zeros(len(self.baseaxes))) + (slice(None, None),)] if self.mode == 'spark': - return self.values.first().toarray() + return self.values.tordd().values().first() def tolocal(self): """ @@ -140,7 +140,7 @@ def tospark(self, engine=None): if engine is None: raise ValueError('Must provide SparkContext') - return fromarray(self.toarray(), index=self.index, labels=self.labels, engine=engine) + return fromarray(self.toarray(), index=self.index, labels=self.lables, engine=engine) def sample(self, n=100, seed=None): """ @@ -170,7 +170,7 @@ def sample(self, n=100, seed=None): return self._constructor(result, index=self.index) - def map(self, func, index=None, value_shape=None, dtype=None, with_keys=False): + def map(self, func, index=None, with_keys=False): """ Map an array -> array function over each record. @@ -182,32 +182,12 @@ def map(self, func, index=None, value_shape=None, dtype=None, with_keys=False): index : array-like, optional, default = None If known, the index to be used following function evaluation. - value_shape : int, optional, default=None - Known shape of values resulting from operation. Only - valid in spark mode. - - dtype : numpy.dtype, optional, default = None - If known, the type of the data following function evaluation. - with_keys : boolean, optional, default = False If true, function should be of both tuple indices and series values. """ - # if new index is given, can infer missing value_shape - if value_shape is None and index is not None: - value_shape = len(index) - - if isinstance(value_shape, int): - values_shape = (value_shape, ) - new = super(Series, self).map(func, value_shape=value_shape, dtype=dtype, with_keys=with_keys) - - if index is not None: - new.index = index - # if series shape did not change and no index was supplied, propagate original index - else: - if len(new.index) == len(self.index): - new.index = self.index - - return new + value_shape = len(index) if index is not None else None + new = self._map(func, axis=self.baseaxes, value_shape=value_shape, with_keys=with_keys) + return self._constructor(new.values, index=index, labels=self.labels) def reduce(self, func): """ diff --git a/thunder/series/writers.py b/thunder/series/writers.py index d21d0eb4..22935e20 100644 --- a/thunder/series/writers.py +++ b/thunder/series/writers.py @@ -84,4 +84,4 @@ def getlabel(key): """ Get a file label from keys with reversed order """ - return '-'.join(["%05g" % k for k in key]) + return '-'.join(reversed(["%05g" % k for k in key])) \ No newline at end of file