Skip to content

Commit 87c04fa

Browse files
authored
Merge pull request #14 from scalableminds/parallel-streams
Concurrent Read and Write
2 parents 339a48f + 7c6f6ed commit 87c04fa

File tree

2 files changed

+152
-69
lines changed

2 files changed

+152
-69
lines changed

src/main/java/dev/zarr/zarrjava/v3/Array.java

+125-66
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import java.util.Map;
1515
import java.util.function.Function;
1616
import java.util.stream.Collectors;
17+
import java.util.stream.Stream;
1718
import javax.annotation.Nonnull;
1819
import javax.annotation.Nullable;
1920
import ucar.ma2.InvalidRangeException;
@@ -119,6 +120,7 @@ public static ArrayMetadataBuilder metadataBuilder(ArrayMetadata existingMetadat
119120

120121
/**
121122
* Reads the entire Zarr array into an ucar.ma2.Array.
123+
* Utilizes no parallelism.
122124
*
123125
* @throws ZarrException
124126
*/
@@ -129,13 +131,38 @@ public ucar.ma2.Array read() throws ZarrException {
129131

130132
/**
131133
* Reads a part of the Zarr array based on a requested offset and shape into an ucar.ma2.Array.
134+
* Utilizes no parallelism.
132135
*
133136
* @param offset
134137
* @param shape
135138
* @throws ZarrException
136139
*/
137140
@Nonnull
138141
public ucar.ma2.Array read(final long[] offset, final int[] shape) throws ZarrException {
142+
return read(offset, shape, false);
143+
}
144+
145+
/**
146+
* Reads the entire Zarr array into an ucar.ma2.Array.
147+
*
148+
* @param parallel
149+
* @throws ZarrException
150+
*/
151+
@Nonnull
152+
public ucar.ma2.Array read(final boolean parallel) throws ZarrException {
153+
return read(new long[metadata.ndim()], Utils.toIntArray(metadata.shape), parallel);
154+
}
155+
156+
/**
157+
* Reads a part of the Zarr array based on a requested offset and shape into an ucar.ma2.Array.
158+
*
159+
* @param offset
160+
* @param shape
161+
* @param parallel
162+
* @throws ZarrException
163+
*/
164+
@Nonnull
165+
public ucar.ma2.Array read(final long[] offset, final int[] shape, final boolean parallel) throws ZarrException {
139166
if (offset.length != metadata.ndim()) {
140167
throw new IllegalArgumentException("'offset' needs to have rank '" + metadata.ndim() + "'.");
141168
}
@@ -155,43 +182,46 @@ public ucar.ma2.Array read(final long[] offset, final int[] shape) throws ZarrEx
155182

156183
final ucar.ma2.Array outputArray = ucar.ma2.Array.factory(metadata.dataType.getMA2DataType(),
157184
shape);
158-
Arrays.stream(IndexingUtils.computeChunkCoords(metadata.shape, chunkShape, offset, shape))
159-
.forEach(
160-
chunkCoords -> {
161-
try {
162-
final IndexingUtils.ChunkProjection chunkProjection =
163-
IndexingUtils.computeProjection(chunkCoords, metadata.shape, chunkShape, offset,
164-
shape
165-
);
166-
167-
if (chunkIsInArray(chunkCoords)) {
168-
MultiArrayUtils.copyRegion(metadata.allocateFillValueChunk(),
169-
chunkProjection.chunkOffset, outputArray, chunkProjection.outOffset,
170-
chunkProjection.shape
171-
);
172-
}
173-
174-
final String[] chunkKeys = metadata.chunkKeyEncoding.encodeChunkKey(chunkCoords);
175-
final StoreHandle chunkHandle = storeHandle.resolve(chunkKeys);
176-
if (!chunkHandle.exists()) {
177-
return;
178-
}
179-
if (codecPipeline.supportsPartialDecode()) {
180-
final ucar.ma2.Array chunkArray = codecPipeline.decodePartial(chunkHandle,
181-
Utils.toLongArray(chunkProjection.chunkOffset), chunkProjection.shape);
182-
MultiArrayUtils.copyRegion(chunkArray, new int[metadata.ndim()], outputArray,
183-
chunkProjection.outOffset, chunkProjection.shape
184-
);
185-
} else {
186-
MultiArrayUtils.copyRegion(readChunk(chunkCoords), chunkProjection.chunkOffset,
187-
outputArray, chunkProjection.outOffset, chunkProjection.shape
188-
);
189-
}
190-
191-
} catch (ZarrException e) {
192-
throw new RuntimeException(e);
193-
}
194-
});
185+
Stream<long[]> chunkStream = Arrays.stream(IndexingUtils.computeChunkCoords(metadata.shape, chunkShape, offset, shape));
186+
if (parallel) {
187+
chunkStream = chunkStream.parallel();
188+
}
189+
chunkStream.forEach(
190+
chunkCoords -> {
191+
try {
192+
final IndexingUtils.ChunkProjection chunkProjection =
193+
IndexingUtils.computeProjection(chunkCoords, metadata.shape, chunkShape, offset,
194+
shape
195+
);
196+
197+
if (chunkIsInArray(chunkCoords)) {
198+
MultiArrayUtils.copyRegion(metadata.allocateFillValueChunk(),
199+
chunkProjection.chunkOffset, outputArray, chunkProjection.outOffset,
200+
chunkProjection.shape
201+
);
202+
}
203+
204+
final String[] chunkKeys = metadata.chunkKeyEncoding.encodeChunkKey(chunkCoords);
205+
final StoreHandle chunkHandle = storeHandle.resolve(chunkKeys);
206+
if (!chunkHandle.exists()) {
207+
return;
208+
}
209+
if (codecPipeline.supportsPartialDecode()) {
210+
final ucar.ma2.Array chunkArray = codecPipeline.decodePartial(chunkHandle,
211+
Utils.toLongArray(chunkProjection.chunkOffset), chunkProjection.shape);
212+
MultiArrayUtils.copyRegion(chunkArray, new int[metadata.ndim()], outputArray,
213+
chunkProjection.outOffset, chunkProjection.shape
214+
);
215+
} else {
216+
MultiArrayUtils.copyRegion(readChunk(chunkCoords), chunkProjection.chunkOffset,
217+
outputArray, chunkProjection.outOffset, chunkProjection.shape
218+
);
219+
}
220+
221+
} catch (ZarrException e) {
222+
throw new RuntimeException(e);
223+
}
224+
});
195225
return outputArray;
196226
}
197227

@@ -235,6 +265,7 @@ public ucar.ma2.Array readChunk(long[] chunkCoords)
235265
/**
236266
* Writes a ucar.ma2.Array into the Zarr array at the beginning of the Zarr array. The shape of
237267
* the Zarr array needs be large enough for the write.
268+
* Utilizes no parallelism.
238269
*
239270
* @param array
240271
*/
@@ -245,11 +276,37 @@ public void write(ucar.ma2.Array array) {
245276
/**
246277
* Writes a ucar.ma2.Array into the Zarr array at a specified offset. The shape of the Zarr array
247278
* needs be large enough for the write.
279+
* Utilizes no parallelism.
248280
*
249281
* @param offset
250282
* @param array
251283
*/
252284
public void write(long[] offset, ucar.ma2.Array array) {
285+
write(offset, array, false);
286+
}
287+
288+
/**
289+
* Writes a ucar.ma2.Array into the Zarr array at the beginning of the Zarr array. The shape of
290+
* the Zarr array needs be large enough for the write.
291+
*
292+
* @param array
293+
* @param parallel
294+
*/
295+
public void write(ucar.ma2.Array array, boolean parallel) {
296+
write(new long[metadata.ndim()], array, parallel);
297+
}
298+
299+
300+
301+
/**
302+
* Writes a ucar.ma2.Array into the Zarr array at a specified offset. The shape of the Zarr array
303+
* needs be large enough for the write.
304+
*
305+
* @param offset
306+
* @param array
307+
* @param parallel
308+
*/
309+
public void write(long[] offset, ucar.ma2.Array array, boolean parallel) {
253310
if (offset.length != metadata.ndim()) {
254311
throw new IllegalArgumentException("'offset' needs to have rank '" + metadata.ndim() + "'.");
255312
}
@@ -260,34 +317,37 @@ public void write(long[] offset, ucar.ma2.Array array) {
260317
int[] shape = array.getShape();
261318

262319
final int[] chunkShape = metadata.chunkShape();
263-
Arrays.stream(IndexingUtils.computeChunkCoords(metadata.shape, chunkShape, offset, shape))
264-
.forEach(
265-
chunkCoords -> {
266-
try {
267-
final IndexingUtils.ChunkProjection chunkProjection =
268-
IndexingUtils.computeProjection(chunkCoords, metadata.shape, chunkShape, offset,
269-
shape
270-
);
271-
272-
ucar.ma2.Array chunkArray;
273-
if (IndexingUtils.isFullChunk(chunkProjection.chunkOffset, chunkProjection.shape,
274-
chunkShape
275-
)) {
276-
chunkArray = array.sectionNoReduce(chunkProjection.outOffset,
277-
chunkProjection.shape,
278-
null
279-
);
280-
} else {
281-
chunkArray = readChunk(chunkCoords);
282-
MultiArrayUtils.copyRegion(array, chunkProjection.outOffset, chunkArray,
283-
chunkProjection.chunkOffset, chunkProjection.shape
284-
);
285-
}
286-
writeChunk(chunkCoords, chunkArray);
287-
} catch (ZarrException | InvalidRangeException e) {
288-
throw new RuntimeException(e);
289-
}
290-
});
320+
Stream<long[]> chunkStream = Arrays.stream(IndexingUtils.computeChunkCoords(metadata.shape, chunkShape, offset, shape));
321+
if(parallel) {
322+
chunkStream = chunkStream.parallel();
323+
}
324+
chunkStream.forEach(
325+
chunkCoords -> {
326+
try {
327+
final IndexingUtils.ChunkProjection chunkProjection =
328+
IndexingUtils.computeProjection(chunkCoords, metadata.shape, chunkShape, offset,
329+
shape
330+
);
331+
332+
ucar.ma2.Array chunkArray;
333+
if (IndexingUtils.isFullChunk(chunkProjection.chunkOffset, chunkProjection.shape,
334+
chunkShape
335+
)) {
336+
chunkArray = array.sectionNoReduce(chunkProjection.outOffset,
337+
chunkProjection.shape,
338+
null
339+
);
340+
} else {
341+
chunkArray = readChunk(chunkCoords);
342+
MultiArrayUtils.copyRegion(array, chunkProjection.outOffset, chunkArray,
343+
chunkProjection.chunkOffset, chunkProjection.shape
344+
);
345+
}
346+
writeChunk(chunkCoords, chunkArray);
347+
} catch (ZarrException | InvalidRangeException e) {
348+
throw new RuntimeException(e);
349+
}
350+
});
291351
}
292352

293353
/**
@@ -434,6 +494,5 @@ public void write(@Nonnull ucar.ma2.Array content) throws ZarrException {
434494
}
435495
array.write(offset, content);
436496
}
437-
438497
}
439498
}

src/test/java/dev/zarr/zarrjava/ZarrTest.java

+27-3
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ public void testWriteReadWithZarrita(String codec, String codecParam) throws Exc
210210
Assertions.assertArrayEquals(new int[]{2, 4, 8}, readArray.metadata.chunkShape());
211211
Assertions.assertEquals("test_value", readArray.metadata.attributes.get("test_key"));
212212

213-
Assertions.assertArrayEquals(testData, (int[]) result.get1DJavaArray(ucar.ma2.DataType.INT));
213+
Assertions.assertArrayEquals(testData, (int[]) result.get1DJavaArray(ucar.ma2.DataType.UINT));
214214

215215
//read in zarrita
216216
String command = pythonPath();
@@ -274,7 +274,7 @@ public void testLargerChunkSizeThanArraySize() throws ZarrException, IOException
274274
Array readArray = Array.open(storeHandle);
275275
ucar.ma2.Array result = readArray.read();
276276

277-
Assertions.assertArrayEquals(testData, (int[]) result.get1DJavaArray(ucar.ma2.DataType.INT));
277+
Assertions.assertArrayEquals(testData, (int[]) result.get1DJavaArray(ucar.ma2.DataType.UINT));
278278
}
279279

280280
static Stream<int[]> invalidChunkSizes() {
@@ -346,7 +346,7 @@ public void testZstdCodecReadWrite(int clevel, boolean checksum) throws ZarrExce
346346
Array readArray = Array.open(storeHandle);
347347
ucar.ma2.Array result = readArray.read();
348348

349-
Assertions.assertArrayEquals(testData, (int[]) result.get1DJavaArray(ucar.ma2.DataType.INT));
349+
Assertions.assertArrayEquals(testData, (int[]) result.get1DJavaArray(ucar.ma2.DataType.UINT));
350350
}
351351

352352
@Test
@@ -631,4 +631,28 @@ public void testReadL4Sample(String mag) throws IOException, ZarrException {
631631

632632
assert MultiArrayUtils.allValuesEqual(httpData2, localData2);
633633
}
634+
635+
@ParameterizedTest
636+
@ValueSource(booleans = {false,true})
637+
public void testParallel(boolean useParallel) throws IOException, ZarrException {
638+
int[] testData = new int[512 * 512 * 512];
639+
Arrays.setAll(testData, p -> p);
640+
641+
StoreHandle storeHandle = new FilesystemStore(TESTOUTPUT).resolve("testParallelRead");
642+
ArrayMetadata metadata = Array.metadataBuilder()
643+
.withShape(512, 512, 512)
644+
.withDataType(DataType.UINT32)
645+
.withChunkShape(100, 100, 100)
646+
.withFillValue(0)
647+
.build();
648+
Array writeArray = Array.create(storeHandle, metadata);
649+
writeArray.write(ucar.ma2.Array.factory(ucar.ma2.DataType.UINT, new int[]{512, 512, 512}, testData), useParallel);
650+
651+
Array readArray = Array.open(storeHandle);
652+
ucar.ma2.Array result = readArray.read(useParallel);
653+
654+
Assertions.assertArrayEquals(testData, (int[]) result.get1DJavaArray(ucar.ma2.DataType.UINT));
655+
clearTestoutputFolder();
656+
}
634657
}
658+

0 commit comments

Comments
 (0)