[Data] Add Zarr Datasource (duplicate PR of 63003)#63552
[Data] Add Zarr Datasource (duplicate PR of 63003)#63552ArturNiederfahrenhorst wants to merge 98 commits into
Conversation
Signed-off-by: Alexandr Plashchinsky <alexandr.plashchinsky@alexandr.plashchinsky-h765g66h9v> Signed-off-by: Alexandr Plashchinsky <alexandr.plashchinsky@alexandrplashchinsky-H765G66H9V.local>
Signed-off-by: Alexandr Plashchinsky <alexandr.plashchinsky@alexandr.plashchinsky-h765g66h9v> Signed-off-by: Alexandr Plashchinsky <alexandr.plashchinsky@alexandrplashchinsky-H765G66H9V.local>
Signed-off-by: Alexandr Plashchinsky <alexandr.plashchinsky@alexandr.plashchinsky-h765g66h9v> Signed-off-by: Alexandr Plashchinsky <alexandr.plashchinsky@alexandrplashchinsky-H765G66H9V.local>
Signed-off-by: Alexandr Plashchinsky <alexandr.plashchinsky@alexandr.plashchinsky-h765g66h9v> Signed-off-by: Alexandr Plashchinsky <alexandr.plashchinsky@alexandrplashchinsky-H765G66H9V.local>
Signed-off-by: Alexandr Plashchinsky <alexandr.plashchinsky@alexandr.plashchinsky-h765g66h9v> Signed-off-by: Alexandr Plashchinsky <alexandr.plashchinsky@alexandrplashchinsky-H765G66H9V.local>
Signed-off-by: Alexandr Plashchinsky <alexandr.plashchinsky@alexandr.plashchinsky-h765g66h9v> Signed-off-by: Alexandr Plashchinsky <alexandr.plashchinsky@alexandrplashchinsky-H765G66H9V.local>
Signed-off-by: Alexandr Plashchinsky <alexandr.plashchinsky@alexandr.plashchinsky-h765g66h9v> Signed-off-by: Alexandr Plashchinsky <alexandr.plashchinsky@alexandrplashchinsky-H765G66H9V.local>
Signed-off-by: Alexandr Plashchinsky <alexandr.plashchinsky@alexandr.plashchinsky-h765g66h9v> Signed-off-by: Alexandr Plashchinsky <alexandr.plashchinsky@alexandrplashchinsky-H765G66H9V.local>
Signed-off-by: Alexandr Plashchinsky <alexandr.plashchinsky@alexandr.plashchinsky-h765g66h9v> Signed-off-by: Alexandr Plashchinsky <alexandr.plashchinsky@alexandrplashchinsky-H765G66H9V.local>
Signed-off-by: Alexandr Plashchinsky <alexandr.plashchinsky@alexandr.plashchinsky-h765g66h9v> Signed-off-by: Alexandr Plashchinsky <alexandr.plashchinsky@alexandrplashchinsky-H765G66H9V.local>
Signed-off-by: Alexandr Plashchinsky <alexandr.plashchinsky@alexandrplashchinsky-H765G66H9V.local>
Signed-off-by: Alexandr Plashchinsky <alexandr.plashchinsky@alexandrplashchinsky-H765G66H9V.local>
Signed-off-by: Alexandr Plashchinsky <alexandr.plashchinsky@alexandrplashchinsky-H765G66H9V.local>
Signed-off-by: Alexandr Plashchinsky <alexandr.plashchinsky@alexandrplashchinsky-H765G66H9V.local>
Signed-off-by: Alexandr Plashchinsky <alexandr.plashchinsky@alexandrplashchinsky-H765G66H9V.local>
Signed-off-by: Alexandr Plashchinsky <alexandr.plashchinsky@alexandrplashchinsky-H765G66H9V.local>
Signed-off-by: Alexandr Plashchinsky <alexandr.plashchinsky@alexandrplashchinsky-H765G66H9V.local>
Signed-off-by: Alexandr Plashchinsky <alexandr.plashchinsky@alexandrplashchinsky-H765G66H9V.local>
Signed-off-by: alexandrplashchinsky <alexandr.plashchinsky@anyscale.com>
Signed-off-by: Alexandr Plashchinsky <alexandr.plashchinsky@alexandrplashchinsky-H765G66H9V.local>
Signed-off-by: Alexandr Plashchinsky <alexandr.plashchinsky@alexandrplashchinsky-H765G66H9V.local>
Signed-off-by: Artur Niederfahrenhorst <artur@anyscale.com>
Signed-off-by: Artur Niederfahrenhorst <artur@anyscale.com>
Signed-off-by: Artur Niederfahrenhorst <artur@anyscale.com>
Signed-off-by: Artur Niederfahrenhorst <artur@anyscale.com>
| "network", | ||
| "socket", | ||
| "HTTP error", | ||
| ) |
There was a problem hiding this comment.
These are all taken from #63003.
I assume that they are useful in matching patterns @alexandrplashchinsky
| match=match, | ||
| max_attempts=max_attempts, | ||
| max_backoff_s=max_backoff_s, | ||
| ) |
There was a problem hiding this comment.
We use this call_with_retry pattern in iceberg, lance and parquet so I assume that it's worth propagating the pattern to this PR.
| } | ||
| ) | ||
|
|
||
| return read_fn |
There was a problem hiding this comment.
This produces arrays of different dimensions so we expect Ray Data to handle pyarrow's variable shaped tensors downstream. We could also force tensors of the same length here and use padding.
| for name in aligned_array_names: | ||
| cols[name] = [ | ||
| _read_chunk(root, name, ((d.t_start, d.t_stop_data),)) for d in batch | ||
| ] |
There was a problem hiding this comment.
These reads are sequential and if we read from some web interface that is super slow, this make become a significant I/O bottleneck. A simple solution would be a threadpool to remove the IO bottleneck.
|
|
||
| # Resolve filesystem + store path. The order of precedence: | ||
| # 1. Explicit ``filesystem=`` always wins. | ||
| # 2. ``.zip`` URL/path → auto-wrap with fsspec's ZipFileSystem. |
There was a problem hiding this comment.
This case happened to me with some UMI dataset
Signed-off-by: Artur Niederfahrenhorst <artur@anyscale.com>
Signed-off-by: Artur Niederfahrenhorst <artur@anyscale.com>
Signed-off-by: Artur Niederfahrenhorst <artur@anyscale.com>
| ... align_axis_0=True, | ||
| ... chunk_shape=[50], | ||
| ... ) | ||
| >>> ds.count() # doctest: +SKIP |
There was a problem hiding this comment.
These are skipped but the dataset exists.
Just skipping because all other datasources in ray data also skip.
If we didn't skip, the docsbuild would require the zarr python package.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes using default effort and found 2 potential issues.
Reviewed by Cursor Bugbot for commit 4b4366c. Configure here.
| cols[name] = [ | ||
| _read_chunk(root, name, ((d.t_start, d.t_stop_data),)) for d in batch | ||
| ] | ||
| yield pd.DataFrame(cols) |
There was a problem hiding this comment.
Sequential I/O reads cause significant remote-store bottleneck
Medium Severity
In _create_aligned_read_fn, all chunk reads across all arrays and all batch items are done sequentially in a single thread. For N aligned arrays and M descriptors in a batch, this performs N×M sequential I/O operations. Similarly in _create_read_fn, all batch chunks are read sequentially. When reading from remote stores (S3, GCS, HTTPS), these sequential reads form a significant I/O bottleneck since each read involves a full network round-trip with no concurrency.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit 4b4366c. Configure here.
| f"integers (list or tuple), got {chunk_shape!r}" | ||
| ) | ||
|
|
||
| self.chunk_shape = tuple(chunk_shape) if chunk_shape is not None else None |
There was a problem hiding this comment.
chunk_shape validation allows zero and negative values
Low Severity
The chunk_shape validation only checks that the value is a list or tuple but doesn't verify elements are positive integers. Passing chunk_shape=[0] would later cause a ZeroDivisionError inside grid_shape (via math.ceil(s / c) where c=0), producing a confusing traceback far from the validation site. The error message promises "non-empty sequence of positive integers" but the check doesn't enforce this.
Reviewed by Cursor Bugbot for commit 4b4366c. Configure here.
|
Closing in favor of #63003 |


Description
Keeping some changes to the #63003 in this feature branch.