Skip to content

Commit 077f8c1

Browse files
matthieu-bernardberfalkephi
authored
Dask support (#4)
* feat: Add support for dask arrays Serialization wil be done by block (only split the first dim by 1st dim chunk size) * fix: Yield each elements od dods_encode use to yield the generator itself... * Merge branch 'origin/dask-support' into local branch 'dask-suppoert' * fix: allow for np.int64 data * fix: added missing requirements * test: added Python 3.9. to both tox.ini and travis config * doc: Adapted README to reflect dask.Array support Co-authored-by: ber <[email protected]> Co-authored-by: Philipp Falke <[email protected]> Co-authored-by: Philipp Falke <[email protected]>
1 parent 4f88cca commit 077f8c1

File tree

6 files changed

+89
-37
lines changed

6 files changed

+89
-37
lines changed

.travis.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ python:
33
- '3.6'
44
- '3.7'
55
- '3.8'
6+
- '3.9'
67
install:
78
- python setup.py install
89
- pip install codecov

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,4 @@ A pure Python implementation of the OPeNDAP server protocol.
1212
This module allows you to serve arbitrary data structures through the web
1313
framework of your choice as OPeNDAP data objects. It implements just the bare
1414
minimum of the DAP 2.0 protocol: DDS, DAS, and DODS responses and slicing. Array
15-
data needs to be supplied as `numpy.ndarray`.
15+
data needs to be supplied as `numpy.ndarray` or as `dask.array.Array`.

opendap_protocol/protocol.py

Lines changed: 36 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
2626
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
2727
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28-
2928
"""
3029
A pure Python implementation of the OPeNDAP server protocol.
3130
@@ -43,6 +42,7 @@
4342

4443
import re
4544
import numpy as np
45+
import dask.array as da
4646

4747
INDENT = ' '
4848
SLICE_CONSTRAINT_RE = r'\[([\d,\W]+)\]$'
@@ -55,7 +55,6 @@ class DAPError(Exception):
5555
class DAPObject(object):
5656
"""A generic DAP object class.
5757
"""
58-
5958
def __init__(self, name='', parent=None, *args, **kwargs):
6059
try:
6160
self.name = '_'.join(name.split(' '))
@@ -132,12 +131,12 @@ def data_path(self):
132131
return '.'.join([self.parent.data_path, self.name])
133132

134133
def ddshead(self):
135-
return '{indent}{obj} {{\n'.format(
136-
indent=self.indent, obj=self.__class__.__name__)
134+
return '{indent}{obj} {{\n'.format(indent=self.indent,
135+
obj=self.__class__.__name__)
137136

138137
def ddstail(self):
139-
return '{indent}}} {name};\n'.format(
140-
indent=self.indent, name=self.name)
138+
return '{indent}}} {name};\n'.format(indent=self.indent,
139+
name=self.name)
141140

142141
def dashead(self):
143142
name = self.name
@@ -211,12 +210,13 @@ def das(self, constraint=''):
211210

212211
def dds(self, constraint=''):
213212
if meets_constraint(constraint, self.data_path):
214-
yield '{indent}{dtype} {name};\n'.format(
215-
indent=self.indent, dtype=self.__str__(), name=self.name)
213+
yield '{indent}{dtype} {name};\n'.format(indent=self.indent,
214+
dtype=self.__str__(),
215+
name=self.name)
216216

217217
def dods_data(self, constraint=''):
218218
if meets_constraint(constraint, self.data_path):
219-
yield dods_encode(self._val, self)
219+
yield from dods_encode(self._val, self)
220220

221221

222222
class Byte(DAPAtom):
@@ -226,32 +226,32 @@ class Byte(DAPAtom):
226226

227227
class Int16(DAPAtom):
228228
dtype = np.int16
229-
str = '<i4'
229+
str = '>i4'
230230

231231

232232
class UInt16(DAPAtom):
233233
dtype = np.uint16
234-
str = '<u4'
234+
str = '>u4'
235235

236236

237237
class Int32(DAPAtom):
238238
dtype = np.int32
239-
str = '<i4'
239+
str = '>i4'
240240

241241

242242
class UInt32(DAPAtom):
243243
dtype = np.uint32
244-
str = '<u4'
244+
str = '>u4'
245245

246246

247247
class Float32(DAPAtom):
248248
dtype = np.float32
249-
str = '<f4'
249+
str = '>f4'
250250

251251

252252
class Float64(DAPAtom):
253253
dtype = np.float64
254-
str = '<f8'
254+
str = '>f8'
255255

256256

257257
class String(DAPAtom):
@@ -260,7 +260,7 @@ class String(DAPAtom):
260260

261261
def dods_data(self, constraint=''):
262262
if meets_constraint(constraint, self.data_path):
263-
yield dods_encode(self._val.encode('ascii'), self)
263+
yield from dods_encode(self._val.encode('ascii'), self)
264264

265265

266266
class URL(String):
@@ -277,7 +277,6 @@ class Structure(DAPObject):
277277
class Dataset(Structure):
278278
"""Class representing a DAP dataset.
279279
"""
280-
281280
def dods_data(self, constraint=''):
282281

283282
yield b'Data:\r\n'
@@ -342,7 +341,6 @@ def dods_data(self, constraint=''):
342341
class SequenceInstance(DAPObject):
343342
"""Class representing a data item that will be added to a sequence.
344343
"""
345-
346344
@property
347345
def data_path(self):
348346
return self.parent.data_path
@@ -372,7 +370,6 @@ class DAPDataObject(DAPObject):
372370
"""A generic class for typed non-atomic objects holding actual data (i.e.
373371
Array and Grid).
374372
"""
375-
376373
def _parse_args(self, args, kwargs):
377374

378375
self.data = kwargs.get('data', None)
@@ -394,11 +391,11 @@ def dods_data(self, constraint=''):
394391

395392
if meets_constraint(constraint, self.data_path):
396393
slices = parse_slice_constraint(constraint)
397-
yield dods_encode(self.data[slices], self.dtype)
394+
yield from dods_encode(self.data[slices], self.dtype)
398395
if self.dimensions is not None:
399396
for i, dim in enumerate(self.dimensions):
400397
sl = slices[i] if i < len(slices) else ...
401-
yield dods_encode(dim.data[sl], dim.dtype)
398+
yield from dods_encode(dim.data[sl], dim.dtype)
402399

403400

404401
class Grid(DAPDataObject):
@@ -482,9 +479,25 @@ def dods_encode(data, dtype):
482479
if not is_scalar:
483480
packed_length = length.astype('<i4').byteswap().tobytes() * 2
484481

485-
packed_data = data.astype(dtype.str).byteswap().tobytes()
486482

487-
return packed_length + packed_data
483+
yield packed_length
484+
485+
486+
if isinstance(data, da.Array):
487+
for x in range(0, data.shape[0], data.chunks[0][0]):
488+
yield np.array(data[x:x + data.chunks[0][0],
489+
...]).astype(dtype.str).tobytes()
490+
else:
491+
yield data.astype(dtype.str).tobytes()
492+
#yield data.astype(dtype.str).byteswap().tobytes()
493+
494+
######
495+
#import pudb; pudb.set_trace()
496+
#if isinstance(data, dask.array.Array):
497+
# return packed_length + data.map_blocks(np.ndarray.tobytes, dtype=dtype.str)
498+
#else:
499+
# packed_data = data.astype(dtype.str).tobytes()
500+
# return packed_length + packed_data
488501

489502

490503
def parse_slice_constraint(constraint):

setup.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333

3434
requirements = [
3535
'numpy',
36+
'dask',
37+
'toolz',
3638
]
3739
setup_requirements = [
3840
'setuptools_scm',

tests/test_all.py

Lines changed: 48 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,13 @@
2626
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
2727
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
2828

29+
from opendap_protocol.protocol import dods_encode
2930
import opendap_protocol as dap
3031

3132
import xdrlib
3233
import numpy as np
34+
import dask.array as da
35+
from functools import reduce
3336

3437
XDRPACKER = xdrlib.Packer()
3538

@@ -40,13 +43,37 @@ def test_dods_encode():
4043

4144
xdrpacked = pack_xdr_float(testdata)
4245

43-
assert xdrpacked == dap.dods_encode(testdata, dap.Float32)
46+
assert xdrpacked == b''.join(dap.dods_encode(testdata, dap.Float32))
4447

45-
assert b'\x00\x00\x00\x00' == dap.dods_encode(0, dap.Float32)
48+
assert b'\x00\x00\x00\x00' == b''.join(dap.dods_encode(0, dap.Float32))
4649

4750
arrdata = np.asarray([1, 2, 3])
48-
assert dap.dods_encode(arrdata,
49-
dap.Float64) == pack_xdr_double_array(arrdata)
51+
assert b''.join(dap.dods_encode(
52+
arrdata, dap.Float64)) == pack_xdr_double_array(arrdata)
53+
54+
# test dask vs numpy
55+
x_dim = 28
56+
y_dim = 30
57+
time_dim = 8
58+
vertical_dim = 1
59+
real_dim = 21
60+
ref_time_dim = 3
61+
62+
np_data = np.arange(
63+
0, x_dim * y_dim * time_dim * vertical_dim * real_dim *
64+
ref_time_dim).reshape(
65+
(x_dim, y_dim, time_dim, vertical_dim, real_dim, ref_time_dim))
66+
67+
data_vals = da.from_array(np_data,
68+
chunks=(14, y_dim, 1, vertical_dim, 1, 1))
69+
70+
x = dap.dods_encode(data_vals, dap.Int32)
71+
y = dap.dods_encode(np_data, dap.Int32)
72+
assert b''.join(x) == b''.join(y)
73+
74+
int_arrdata = np.arange(0, 20, 2, dtype='<i4')
75+
assert b''.join(dods_encode(int_arrdata,
76+
dap.Int32)) == pack_xdr_int_array(int_arrdata)
5077

5178

5279
def test_parse_slice():
@@ -58,7 +85,8 @@ def test_parse_slice():
5885

5986
def test_parse_slice_constraint():
6087

61-
assert dap.parse_slice_constraint('[0][:][:][4:7]') == (0, Ellipsis, Ellipsis,
88+
assert dap.parse_slice_constraint('[0][:][:][4:7]') == (0, Ellipsis,
89+
Ellipsis,
6290
slice(4, 8))
6391
assert dap.parse_slice_constraint('[0][:][:]') == (0, Ellipsis, Ellipsis)
6492
assert dap.parse_slice_constraint('[0][:]') == (0, Ellipsis)
@@ -111,8 +139,9 @@ def test_Attribute():
111139
dataset = dap.Dataset(name='test')
112140

113141
attr1 = dap.Attribute(name='Attribute 1', value=3, dtype=dap.Float32)
114-
attr2 = dap.Attribute(
115-
name='Attribute 2', value='a string', dtype=dap.String)
142+
attr2 = dap.Attribute(name='Attribute 2',
143+
value='a string',
144+
dtype=dap.String)
116145

117146
dataset.append(attr1, attr2)
118147

@@ -160,11 +189,10 @@ def test_complete_dap_response():
160189
x = dap.Array(name='x', data=np.array([0, 1]), dtype=dap.Int16)
161190
y = dap.Array(name='y', data=np.array([10, 11]), dtype=dap.Int16)
162191

163-
z = dap.Grid(
164-
name='z',
165-
data=np.array([[0, 0], [0, 0]]),
166-
dtype=dap.Int32,
167-
dimensions=[x, y])
192+
z = dap.Grid(name='z',
193+
data=np.array([[0, 0], [0, 0]]),
194+
dtype=dap.Int32,
195+
dimensions=[x, y])
168196

169197
z_attr = [
170198
dap.Attribute(name='units', value='second', dtype=dap.String),
@@ -197,3 +225,11 @@ def pack_xdr_double_array(data):
197225
XDRPACKER.pack_int(np.asarray(len(data)))
198226
XDRPACKER.pack_farray(len(data), data.astype('<f8'), XDRPACKER.pack_double)
199227
return XDRPACKER.get_buffer()
228+
229+
230+
def pack_xdr_int_array(data):
231+
XDRPACKER.reset()
232+
XDRPACKER.pack_int(np.asarray(len(data)))
233+
XDRPACKER.pack_int(np.asarray(len(data)))
234+
XDRPACKER.pack_farray(len(data), data.astype('<i4'), XDRPACKER.pack_int)
235+
return XDRPACKER.get_buffer()

tox.ini

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
# and then run "tox" from this directory.
55

66
[tox]
7-
envlist = clean, py36, py37, py38, coverage
7+
envlist = clean, py36, py37, py38, py39, coverage
88

99
[testenv]
1010

0 commit comments

Comments
 (0)