From 2b5f6b568692d185547e8929b377516a01c20415 Mon Sep 17 00:00:00 2001 From: machichima Date: Mon, 27 Jan 2025 19:37:57 +0800 Subject: [PATCH 01/22] feat: add write() for open() in fsspec --- obstore/python/obstore/fsspec.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index d5d89335..894ccdd6 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -22,7 +22,7 @@ import asyncio from collections import defaultdict -from typing import Any, Coroutine, Dict, List, Tuple +from typing import Tuple import fsspec.asyn import fsspec.spec @@ -181,8 +181,6 @@ def _open(self, path, mode="rb", **kwargs): class BufferedFileSimple(fsspec.spec.AbstractBufferedFile): def __init__(self, fs, path, mode="rb", **kwargs): - if mode != "rb": - raise ValueError("Only 'rb' mode is currently supported") super().__init__(fs, path, mode, **kwargs) def read(self, length: int = -1): @@ -198,3 +196,17 @@ def read(self, length: int = -1): data = self.fs.cat_file(self.path, self.loc, self.loc + length) self.loc += length return data + + def write(self, data): + _, path = self.split_path(self.path) + out = self.fs.pipe_file(path, data) + return out + + def split_path(self, path: str) -> Tuple[str, str]: + """ + Split bucket and file path + """ + path_li = path.split("/") + bucket = path_li[0] + file_path = "/".join(path_li[1:]) + return (bucket, file_path) From 361b30d56ed7eb8a4f9419e337ba10c943ae039c Mon Sep 17 00:00:00 2001 From: machichima Date: Mon, 27 Jan 2025 23:14:41 +0800 Subject: [PATCH 02/22] temp: upload with iterator --- obstore/python/obstore/fsspec.py | 41 +++++++++++++++++++++++++++++--- 1 file changed, 38 insertions(+), 3 deletions(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index 894ccdd6..313f38d0 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -22,6 +22,7 @@ import asyncio from collections import defaultdict +from io import BytesIO from typing import Tuple import fsspec.asyn @@ -29,6 +30,8 @@ import obstore as obs +# from obstore.store._aws import S3Store + class AsyncFsspecStore(fsspec.asyn.AsyncFileSystem): """An fsspec implementation based on a obstore Store. @@ -176,11 +179,16 @@ async def _ls(self, path, detail=True, **kwargs): def _open(self, path, mode="rb", **kwargs): """Return raw bytes-mode file-like from the file-system""" + return BufferedFileSimple(self, path, mode, **kwargs) class BufferedFileSimple(fsspec.spec.AbstractBufferedFile): def __init__(self, fs, path, mode="rb", **kwargs): + # self.buffer = [] + self.closed = False + self.data_li = [] + self.chunk_size = 5 * 1024 super().__init__(fs, path, mode, **kwargs) def read(self, length: int = -1): @@ -198,9 +206,36 @@ def read(self, length: int = -1): return data def write(self, data): - _, path = self.split_path(self.path) - out = self.fs.pipe_file(path, data) - return out + """Buffer the written data for streaming upload.""" + if self.closed: + raise ValueError("Cannot write to a closed file.") + self.data_li.append(data) + return len(data) + + def flush(self, force=False): + """Flush the buffer to the object store.""" + if self.closed: + raise ValueError("Cannot flush a closed file.") + + if self.data_li: + # Convert buffer to an async iterator for upload + def buffer_iterator(): + for chunk in self.data_li: + yield chunk + + # _, path = self.split_path(self.path) + path = self.path + out = self.fs.pipe_file(path, buffer_iterator()) + + def close(self): + """Finalize and upload the collected chunks.""" + if self.closed: + return + + self.flush() + + self.closed = True + super().close() def split_path(self, path: str) -> Tuple[str, str]: """ From e0ec01ab0a3f8c491f0e43f59fae9670c644a2fe Mon Sep 17 00:00:00 2001 From: machichima Date: Mon, 27 Jan 2025 23:47:29 +0800 Subject: [PATCH 03/22] refactor: rename data_li to buffer --- obstore/python/obstore/fsspec.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index 313f38d0..f8f14c24 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -29,6 +29,7 @@ import fsspec.spec import obstore as obs +from obstore.store import S3Store # from obstore.store._aws import S3Store @@ -180,16 +181,19 @@ async def _ls(self, path, detail=True, **kwargs): def _open(self, path, mode="rb", **kwargs): """Return raw bytes-mode file-like from the file-system""" + if isinstance(self.store, S3Store): + # TODO: need to check if virtual_hosted_style_request are set + pass + return BufferedFileSimple(self, path, mode, **kwargs) class BufferedFileSimple(fsspec.spec.AbstractBufferedFile): def __init__(self, fs, path, mode="rb", **kwargs): - # self.buffer = [] + super().__init__(fs, path, mode, **kwargs) self.closed = False - self.data_li = [] + self.buffer = [] self.chunk_size = 5 * 1024 - super().__init__(fs, path, mode, **kwargs) def read(self, length: int = -1): """Return bytes from the remote file @@ -206,21 +210,20 @@ def read(self, length: int = -1): return data def write(self, data): - """Buffer the written data for streaming upload.""" + """Buffer the written data in list""" if self.closed: raise ValueError("Cannot write to a closed file.") - self.data_li.append(data) + self.buffer.append(data) return len(data) def flush(self, force=False): - """Flush the buffer to the object store.""" if self.closed: raise ValueError("Cannot flush a closed file.") - if self.data_li: + if self.buffer: # Convert buffer to an async iterator for upload def buffer_iterator(): - for chunk in self.data_li: + for chunk in self.buffer: yield chunk # _, path = self.split_path(self.path) From d75f3e87c9f2d632fbeb814ce2837d7e560d1942 Mon Sep 17 00:00:00 2001 From: machichima Date: Wed, 29 Jan 2025 23:17:33 +0800 Subject: [PATCH 04/22] feat: buffered write in fsspec --- obstore/python/obstore/fsspec.py | 50 ++++++++++++++++++++++++++------ 1 file changed, 41 insertions(+), 9 deletions(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index f8f14c24..9f354c8c 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -23,15 +23,13 @@ import asyncio from collections import defaultdict from io import BytesIO -from typing import Tuple +from typing import Any, Coroutine, Dict, List, Tuple import fsspec.asyn import fsspec.spec import obstore as obs -from obstore.store import S3Store - -# from obstore.store._aws import S3Store +from obstore import open_reader, open_writer class AsyncFsspecStore(fsspec.asyn.AsyncFileSystem): @@ -180,15 +178,49 @@ async def _ls(self, path, detail=True, **kwargs): def _open(self, path, mode="rb", **kwargs): """Return raw bytes-mode file-like from the file-system""" + if "w" in mode: + return BufferedFileWrite(self, path, mode, **kwargs) + if "r" in mode: + return BufferedFileRead(self, path, mode, **kwargs) + + +class BufferedFileWrite(fsspec.spec.AbstractBufferedFile): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.blocksize = 2048 + self._writer = open_writer(self.fs.store, self.path) + + def _initiate_upload(self): + """ + Called by AbstractBufferedFile flusH() on the first flush + """ + self._writer = open_writer(self.fs.store, self.path) - if isinstance(self.store, S3Store): - # TODO: need to check if virtual_hosted_style_request are set - pass + def _upload_chunk(self, final=False): + """ + Called every time fsspec flushes the write buffer + """ + if self.buffer and len(self.buffer.getbuffer()) > 0: + self.buffer.seek(0) + self._writer.write(self.buffer.read()) + # flush all the data in buffer when closing + if final: + self._writer.flush() + return True + else: + return False - return BufferedFileSimple(self, path, mode, **kwargs) + def close(self): + """Close file + Ensure flushing the buffer + """ + if self._writer.closed(): + return + self._upload_chunk(final=True) + self._writer.close() -class BufferedFileSimple(fsspec.spec.AbstractBufferedFile): +class BufferedFileRead(fsspec.spec.AbstractBufferedFile): def __init__(self, fs, path, mode="rb", **kwargs): super().__init__(fs, path, mode, **kwargs) self.closed = False From 75d373433d214c1db049e6ad5d0ed93aada6fd8b Mon Sep 17 00:00:00 2001 From: machichima Date: Wed, 29 Jan 2025 23:19:38 +0800 Subject: [PATCH 05/22] fix: remove unused code --- obstore/python/obstore/fsspec.py | 45 -------------------------------- 1 file changed, 45 deletions(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index 9f354c8c..ae2266cf 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -22,7 +22,6 @@ import asyncio from collections import defaultdict -from io import BytesIO from typing import Any, Coroutine, Dict, List, Tuple import fsspec.asyn @@ -187,7 +186,6 @@ def _open(self, path, mode="rb", **kwargs): class BufferedFileWrite(fsspec.spec.AbstractBufferedFile): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self.blocksize = 2048 self._writer = open_writer(self.fs.store, self.path) def _initiate_upload(self): @@ -223,9 +221,6 @@ def close(self): class BufferedFileRead(fsspec.spec.AbstractBufferedFile): def __init__(self, fs, path, mode="rb", **kwargs): super().__init__(fs, path, mode, **kwargs) - self.closed = False - self.buffer = [] - self.chunk_size = 5 * 1024 def read(self, length: int = -1): """Return bytes from the remote file @@ -240,43 +235,3 @@ def read(self, length: int = -1): data = self.fs.cat_file(self.path, self.loc, self.loc + length) self.loc += length return data - - def write(self, data): - """Buffer the written data in list""" - if self.closed: - raise ValueError("Cannot write to a closed file.") - self.buffer.append(data) - return len(data) - - def flush(self, force=False): - if self.closed: - raise ValueError("Cannot flush a closed file.") - - if self.buffer: - # Convert buffer to an async iterator for upload - def buffer_iterator(): - for chunk in self.buffer: - yield chunk - - # _, path = self.split_path(self.path) - path = self.path - out = self.fs.pipe_file(path, buffer_iterator()) - - def close(self): - """Finalize and upload the collected chunks.""" - if self.closed: - return - - self.flush() - - self.closed = True - super().close() - - def split_path(self, path: str) -> Tuple[str, str]: - """ - Split bucket and file path - """ - path_li = path.split("/") - bucket = path_li[0] - file_path = "/".join(path_li[1:]) - return (bucket, file_path) From b2f9d6f61951d08eeebd1dd0bff63b5b3c927894 Mon Sep 17 00:00:00 2001 From: machichima Date: Thu, 30 Jan 2025 15:40:28 +0800 Subject: [PATCH 06/22] fix: assert mode is either rb or wb --- obstore/python/obstore/fsspec.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index e843eb0e..be93ea15 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -180,9 +180,10 @@ async def _ls(self, path, detail=True, **kwargs): def _open(self, path, mode="rb", **kwargs): """Return raw bytes-mode file-like from the file-system""" - if "w" in mode: + assert mode in ("rb", "wb"), f"Only 'rb' and 'wb' mode is currently supported, got: {mode}" + if mode == "wb": return BufferedFileWrite(self, path, mode, **kwargs) - if "r" in mode: + if mode == "rb": return BufferedFileRead(self, path, mode, **kwargs) From 07ae55d109b4b878183f15c325b201db9fc6d2e4 Mon Sep 17 00:00:00 2001 From: machichima Date: Thu, 30 Jan 2025 15:48:23 +0800 Subject: [PATCH 07/22] fix: correctly detect file exist for read_parquet --- obstore/python/obstore/fsspec.py | 41 +++++++++++++++++++++++--------- 1 file changed, 30 insertions(+), 11 deletions(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index be93ea15..bcc2cf98 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -149,17 +149,36 @@ async def _get_file(self, rpath, lpath, **kwargs): f.write(buffer) async def _info(self, path, **kwargs): - head = await obs.head_async(self.store, path) - return { - # Required of `info`: (?) - "name": head["path"], - "size": head["size"], - "type": "directory" if head["path"].endswith("/") else "file", - # Implementation-specific keys - "e_tag": head["e_tag"], - "last_modified": head["last_modified"], - "version": head["version"], - } + try: + head = await obs.head_async(self.store, path) + return { + # Required of `info`: (?) + "name": head["path"], + "size": head["size"], + "type": "directory" if head["path"].endswith("/") else "file", + # Implementation-specific keys + "e_tag": head["e_tag"], + "last_modified": head["last_modified"], + "version": head["version"], + } + except FileNotFoundError: + # try ls, refer to the info implementation in fsspec + # https://github.com/fsspec/filesystem_spec/blob/08d1e494db177d90ccc77e5f154d5fbb34657b13/fsspec/spec.py#L643-L675 + parent = self._parent(path) + out = await self._ls(parent) + out = [o for o in out if o["name"].rstrip("/") == path] + if out: + return out[0] + out = await self._ls(path) + out1 = [o for o in out if o["name"].rstrip("/") == path] + if len(out1) == 1: + if "size" not in out1[0]: + out1[0]["size"] = None + return out1[0] + elif len(out1) > 1 or out: + return {"name": path, "size": 0, "type": "directory"} + else: + raise FileNotFoundError(path) async def _ls(self, path, detail=True, **kwargs): result = await obs.list_with_delimiter_async(self.store, path) From 428a66dffa60abae32592a110789b18205b5bfcd Mon Sep 17 00:00:00 2001 From: machichima Date: Thu, 30 Jan 2025 17:10:33 +0800 Subject: [PATCH 08/22] run pre-commit --- obstore/python/obstore/fsspec.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index bcc2cf98..ee52b3e6 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -31,7 +31,7 @@ import fsspec.spec import obstore as obs -from obstore import open_reader, open_writer +from obstore import open_writer class AsyncFsspecStore(fsspec.asyn.AsyncFileSystem): @@ -199,7 +199,10 @@ async def _ls(self, path, detail=True, **kwargs): def _open(self, path, mode="rb", **kwargs): """Return raw bytes-mode file-like from the file-system""" - assert mode in ("rb", "wb"), f"Only 'rb' and 'wb' mode is currently supported, got: {mode}" + assert mode in ( + "rb", + "wb", + ), f"Only 'rb' and 'wb' mode is currently supported, got: {mode}" if mode == "wb": return BufferedFileWrite(self, path, mode, **kwargs) if mode == "rb": From bc4ffaabda60fa22ae07cfeda3e2a6d07d6cfea2 Mon Sep 17 00:00:00 2001 From: machichima Date: Sun, 2 Feb 2025 22:47:31 +0800 Subject: [PATCH 09/22] feat: split bucket name from path in fsspec _open --- obstore/python/obstore/fsspec.py | 32 +++++++++++++++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index ee52b3e6..a9450812 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -32,6 +32,7 @@ import obstore as obs from obstore import open_writer +from obstore.store import AzureStore, GCSStore, S3Store class AsyncFsspecStore(fsspec.asyn.AsyncFileSystem): @@ -82,6 +83,32 @@ def __init__( *args, asynchronous=asynchronous, loop=loop, batch_size=batch_size ) + def _split_path(self, path: str) -> Tuple[str, str]: + """ + Split bucket and file path + + Args: + path (str): Input path, like `s3://mybucket/path/to/file` + + Examples: + >>> split_path("s3://mybucket/path/to/file") + ['mybucket', 'path/to/file'] + """ + + store_with_bucket = (S3Store, GCSStore, AzureStore) + + if not isinstance(self.store, store_with_bucket): + # no bucket name in path + return "", path + + if "/" not in path: + return path, "" + else: + path_li = path.split("/") + bucket = path_li[0] + file_path = "/".join(path_li[1:]) + return (bucket, file_path) + async def _rm_file(self, path, **kwargs): return await obs.delete_async(self.store, path) @@ -197,12 +224,15 @@ async def _ls(self, path, detail=True, **kwargs): else: return sorted([object["path"] for object in objects] + prefs) - def _open(self, path, mode="rb", **kwargs): + def _open(self, path: str, mode="rb", **kwargs): """Return raw bytes-mode file-like from the file-system""" assert mode in ( "rb", "wb", ), f"Only 'rb' and 'wb' mode is currently supported, got: {mode}" + + _, path = self._split_path(path) + if mode == "wb": return BufferedFileWrite(self, path, mode, **kwargs) if mode == "rb": From 79e40a182e839de9829aa29205cc9afc31564526 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Wed, 5 Feb 2025 11:14:39 -0500 Subject: [PATCH 10/22] Update obstore/python/obstore/fsspec.py Co-authored-by: Martin Durant --- obstore/python/obstore/fsspec.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index a9450812..cbcc140b 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -226,10 +226,8 @@ async def _ls(self, path, detail=True, **kwargs): def _open(self, path: str, mode="rb", **kwargs): """Return raw bytes-mode file-like from the file-system""" - assert mode in ( - "rb", - "wb", - ), f"Only 'rb' and 'wb' mode is currently supported, got: {mode}" + if mode not in {"rb", "wb"}: + raise ValueError(f"Only 'rb' and 'wb' mode is currently supported, got: {mode}") _, path = self._split_path(path) From 74dd9ede860b706993cd35de82c7cfd8802795d0 Mon Sep 17 00:00:00 2001 From: machichima Date: Thu, 6 Feb 2025 21:22:58 +0800 Subject: [PATCH 11/22] fix: move incorrect mode exception into else --- obstore/python/obstore/fsspec.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index cbcc140b..2985ff02 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -226,8 +226,6 @@ async def _ls(self, path, detail=True, **kwargs): def _open(self, path: str, mode="rb", **kwargs): """Return raw bytes-mode file-like from the file-system""" - if mode not in {"rb", "wb"}: - raise ValueError(f"Only 'rb' and 'wb' mode is currently supported, got: {mode}") _, path = self._split_path(path) @@ -235,6 +233,8 @@ def _open(self, path: str, mode="rb", **kwargs): return BufferedFileWrite(self, path, mode, **kwargs) if mode == "rb": return BufferedFileRead(self, path, mode, **kwargs) + else: + raise ValueError(f"Only 'rb' and 'wb' mode is currently supported, got: {mode}") class BufferedFileWrite(fsspec.spec.AbstractBufferedFile): From 879794414f226880c04cbc9253eb62af697e9e4d Mon Sep 17 00:00:00 2001 From: machichima Date: Thu, 6 Feb 2025 22:24:43 +0800 Subject: [PATCH 12/22] fix: remove writer in init and add self.closed=True --- obstore/python/obstore/fsspec.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index 2985ff02..e2f2bee5 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -240,7 +240,6 @@ def _open(self, path: str, mode="rb", **kwargs): class BufferedFileWrite(fsspec.spec.AbstractBufferedFile): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self._writer = open_writer(self.fs.store, self.path) def _initiate_upload(self): """ @@ -270,6 +269,7 @@ def close(self): return self._upload_chunk(final=True) self._writer.close() + self.closed = True class BufferedFileRead(fsspec.spec.AbstractBufferedFile): From cf1856a60b9290c3d9059a42fde2a59d59839917 Mon Sep 17 00:00:00 2001 From: machichima Date: Sun, 9 Feb 2025 12:15:05 +0800 Subject: [PATCH 13/22] fix: self._writer not exist error in close --- obstore/python/obstore/fsspec.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index e2f2bee5..88556236 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -265,9 +265,9 @@ def close(self): """Close file Ensure flushing the buffer """ - if self._writer.closed(): + if self.closed: return - self._upload_chunk(final=True) + self.flush(force=True) self._writer.close() self.closed = True From ff5d6bd09cc37cd786e88c28bf095d673149dee6 Mon Sep 17 00:00:00 2001 From: machichima Date: Sun, 9 Feb 2025 12:15:56 +0800 Subject: [PATCH 14/22] fix: use info() in AbstractFileSystem --- obstore/python/obstore/fsspec.py | 20 +++----------------- 1 file changed, 3 insertions(+), 17 deletions(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index 88556236..cba86ef9 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -189,23 +189,9 @@ async def _info(self, path, **kwargs): "version": head["version"], } except FileNotFoundError: - # try ls, refer to the info implementation in fsspec - # https://github.com/fsspec/filesystem_spec/blob/08d1e494db177d90ccc77e5f154d5fbb34657b13/fsspec/spec.py#L643-L675 - parent = self._parent(path) - out = await self._ls(parent) - out = [o for o in out if o["name"].rstrip("/") == path] - if out: - return out[0] - out = await self._ls(path) - out1 = [o for o in out if o["name"].rstrip("/") == path] - if len(out1) == 1: - if "size" not in out1[0]: - out1[0]["size"] = None - return out1[0] - elif len(out1) > 1 or out: - return {"name": path, "size": 0, "type": "directory"} - else: - raise FileNotFoundError(path) + # use info in fsspec.AbstractFileSystem + loop = asyncio.get_running_loop() + return await loop.run_in_executor(None, super().info, path, **kwargs) async def _ls(self, path, detail=True, **kwargs): result = await obs.list_with_delimiter_async(self.store, path) From 19c26469bc2232a118762ea908a679d30d4d826a Mon Sep 17 00:00:00 2001 From: machichima Date: Sat, 1 Mar 2025 18:25:45 +0800 Subject: [PATCH 15/22] fix: typing and linting --- obstore/python/obstore/fsspec.py | 56 +++++++++++++++++++------------- 1 file changed, 34 insertions(+), 22 deletions(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index 837fbc3b..ae4aff27 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -76,8 +76,6 @@ "file", "memory", } -from obstore import open_writer -from obstore.store import AzureStore, GCSStore, S3Store class AsyncFsspecStore(fsspec.asyn.AsyncFileSystem): @@ -464,32 +462,45 @@ def _open( autocommit: Any = True, # noqa: ARG002 cache_options: Any = None, # noqa: ARG002 **kwargs: Any, - ) -> BufferedFileSimple: + ) -> BufferedFileWrite | BufferedFileRead: """Return raw bytes-mode file-like from the file-system.""" - _, path = self._split_path(path) if mode == "wb": return BufferedFileWrite(self, path, mode, **kwargs) if mode == "rb": return BufferedFileRead(self, path, mode, **kwargs) - else: - raise ValueError(f"Only 'rb' and 'wb' mode is currently supported, got: {mode}") + + err_msg = f"Only 'rb' and 'wb' mode is currently supported, got: {mode}" + raise ValueError(err_msg) class BufferedFileWrite(fsspec.spec.AbstractBufferedFile): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) + """Write buffered file wrapped around `fsspec.spec.AbstractBufferedFile`.""" + + def __init__(self, *args: Any, **_kwargs: Any) -> None: + """Construct a new AsyncFsspecStore. + + Args: + args: positional arguments passed on to the `BufferedFileWrite` + constructor. + + Keyword Args: + _kwargs: keyword arguments passed on to the `BufferedFileWrite` constructor - def _initiate_upload(self): - """ - Called by AbstractBufferedFile flusH() on the first flush """ + super().__init__(*args, **_kwargs) + + def _initiate_upload(self) -> None: + """Call by AbstractBufferedFile flusH() on the first flush.""" self._writer = open_writer(self.fs.store, self.path) - def _upload_chunk(self, final=False): - """ - Called every time fsspec flushes the write buffer + def _upload_chunk(self, final: bool = False) -> bool: + """Call every time fsspec flushes the write buffer. + + Returns: + Bool showing if chunk is updated + """ if self.buffer and len(self.buffer.getbuffer()) > 0: self.buffer.seek(0) @@ -498,13 +509,11 @@ def _upload_chunk(self, final=False): if final: self._writer.flush() return True - else: - return False - def close(self): - """Close file - Ensure flushing the buffer - """ + return False + + def close(self) -> None: + """Close file. Ensure flushing the buffer.""" if self.closed: return self.flush(force=True) @@ -513,7 +522,7 @@ def close(self): class BufferedFileRead(fsspec.spec.AbstractBufferedFile): - """Implementation of buffered file around `fsspec.spec.AbstractBufferedFile`.""" + """Read buffered file wrapped around `fsspec.spec.AbstractBufferedFile`.""" def __init__( self, @@ -525,13 +534,16 @@ def __init__( """Create new buffered file.""" super().__init__(fs, path, mode, **kwargs) - def read(self, length: int = -1) -> Any: + def read(self, length: int = -1) -> bytes: """Return bytes from the remote file. Args: length: if positive, returns up to this many bytes; if negative, return all remaining bytes. + Returns: + Data in bytes + """ if length < 0: data = self.fs.cat_file(self.path, self.loc, self.size) From 23febfb214bb29251c3efbe2fbfa4abec0bb96fe Mon Sep 17 00:00:00 2001 From: machichima Date: Sun, 2 Mar 2025 20:18:34 +0800 Subject: [PATCH 16/22] feat: merge BufferedFileWrite/Read together --- obstore/python/obstore/fsspec.py | 107 ++++++++++++++++--------------- 1 file changed, 55 insertions(+), 52 deletions(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index ae4aff27..61b73b2b 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -46,7 +46,7 @@ import fsspec.spec import obstore as obs -from obstore import open_writer +from obstore import open_reader, open_writer from obstore.store import from_url if TYPE_CHECKING: @@ -220,6 +220,14 @@ def _construct_store(self, bucket: str) -> ObjectStore: retry_config=self.retry_config, ) + def split_path(self, path: str) -> tuple[str, str]: + """Public method to split a path into bucket and path components.""" + return self._split_path(path) + + def construct_store(self, bucket: str) -> ObjectStore: + """Public method to construct a store for the given bucket.""" + return self._construct_store(bucket) + async def _rm_file(self, path: str, **_kwargs: Any) -> None: bucket, path = self._split_path(path) store = self._construct_store(bucket) @@ -462,38 +470,60 @@ def _open( autocommit: Any = True, # noqa: ARG002 cache_options: Any = None, # noqa: ARG002 **kwargs: Any, - ) -> BufferedFileWrite | BufferedFileRead: + ) -> BufferedFile: """Return raw bytes-mode file-like from the file-system.""" - _, path = self._split_path(path) - - if mode == "wb": - return BufferedFileWrite(self, path, mode, **kwargs) - if mode == "rb": - return BufferedFileRead(self, path, mode, **kwargs) + if mode in ("wb", "rb"): + return BufferedFile(self, path, mode, **kwargs) err_msg = f"Only 'rb' and 'wb' mode is currently supported, got: {mode}" raise ValueError(err_msg) -class BufferedFileWrite(fsspec.spec.AbstractBufferedFile): +class BufferedFile(fsspec.spec.AbstractBufferedFile): """Write buffered file wrapped around `fsspec.spec.AbstractBufferedFile`.""" - def __init__(self, *args: Any, **_kwargs: Any) -> None: - """Construct a new AsyncFsspecStore. + def __init__( + self, + fs: AsyncFsspecStore, + path: str, + mode: str = "rb", + **kwargs: Any, + ) -> None: + """Create new buffered file.""" + super().__init__(fs, path, mode, **kwargs) + + bucket, self.path = self.fs.split_path(path) + self.store = self.fs.construct_store(bucket) + + self.mode = mode + + if self.mode == "rb": + self._reader = open_reader(self.store, self.path) + + def read(self, length: int = -1) -> Bytes: + """Return bytes from the remote file. Args: - args: positional arguments passed on to the `BufferedFileWrite` - constructor. + length: if positive, returns up to this many bytes; if negative, return all + remaining bytes. - Keyword Args: - _kwargs: keyword arguments passed on to the `BufferedFileWrite` constructor + Returns: + Data in bytes """ - super().__init__(*args, **_kwargs) + if length < 0: + length = self.size - self.loc + + self._reader.seek(self.loc) + out = self._reader.read(length) + + self.loc += length + + return out def _initiate_upload(self) -> None: """Call by AbstractBufferedFile flusH() on the first flush.""" - self._writer = open_writer(self.fs.store, self.path) + self._writer = open_writer(self.store, self.path) def _upload_chunk(self, final: bool = False) -> bool: """Call every time fsspec flushes the write buffer. @@ -516,42 +546,15 @@ def close(self) -> None: """Close file. Ensure flushing the buffer.""" if self.closed: return - self.flush(force=True) - self._writer.close() - self.closed = True - - -class BufferedFileRead(fsspec.spec.AbstractBufferedFile): - """Read buffered file wrapped around `fsspec.spec.AbstractBufferedFile`.""" - - def __init__( - self, - fs: AsyncFsspecStore, - path: str, - mode: str = "rb", - **kwargs: Any, - ) -> None: - """Create new buffered file.""" - super().__init__(fs, path, mode, **kwargs) - def read(self, length: int = -1) -> bytes: - """Return bytes from the remote file. - - Args: - length: if positive, returns up to this many bytes; if negative, return all - remaining bytes. - - Returns: - Data in bytes - - """ - if length < 0: - data = self.fs.cat_file(self.path, self.loc, self.size) - self.loc = self.size - else: - data = self.fs.cat_file(self.path, self.loc, self.loc + length) - self.loc += length - return data + try: + if self.mode == "rb": + self._reader.close() + else: + self.flush(force=True) + self._writer.close() + finally: + self.closed = True def register(protocol: str | Iterable[str], *, asynchronous: bool = False) -> None: From ddcf6f9cca68405889dbaa62e321e1b96a0bfd0a Mon Sep 17 00:00:00 2001 From: machichima Date: Sun, 2 Mar 2025 20:18:53 +0800 Subject: [PATCH 17/22] test: for write to parquet --- tests/test_fsspec.py | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/tests/test_fsspec.py b/tests/test_fsspec.py index 629f04bc..e94e808e 100644 --- a/tests/test_fsspec.py +++ b/tests/test_fsspec.py @@ -225,9 +225,15 @@ async def test_list_async(s3_store_config: S3Config): @pytest.mark.network -def test_remote_parquet(): - register("https") +def test_remote_parquet(s3_store_config: S3Config): + register(["https", "s3"]) fs = fsspec.filesystem("https") + fs_s3 = fsspec.filesystem( + "s3", + config=s3_store_config, + client_options={"allow_http": True}, + ) + url = "github.com/opengeospatial/geoparquet/raw/refs/heads/main/examples/example.parquet" # noqa: E501 pq.read_metadata(url, filesystem=fs) @@ -235,6 +241,22 @@ def test_remote_parquet(): url = "https://github.com/opengeospatial/geoparquet/raw/refs/heads/main/examples/example.parquet" pq.read_metadata(url, filesystem=fs) + # Read the remote Parquet file into a PyArrow table + table = pq.read_table(url, filesystem=fs) + write_parquet_path = f"{TEST_BUCKET_NAME}/test.parquet" + + # Write the table to s3 + pq.write_table(table, write_parquet_path, filesystem=fs_s3) + + out = fs_s3.ls(f"{TEST_BUCKET_NAME}", detail=False) + assert f"{TEST_BUCKET_NAME}/test.parquet" in out + + # Read Parquet file from s3 and verify its contents + parquet_table = pq.read_table(write_parquet_path, filesystem=fs_s3) + assert parquet_table.equals(table), ( + "Parquet file contents from s3 do not match the original file" + ) + def test_multi_file_ops(fs: AsyncFsspecStore): data = { From 27c62c21bee0be5ba4885f01746db0bbb33923c2 Mon Sep 17 00:00:00 2001 From: machichima Date: Sun, 2 Mar 2025 20:40:09 +0800 Subject: [PATCH 18/22] docs: update docstring --- obstore/python/obstore/fsspec.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index 61b73b2b..be3e1c81 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -480,7 +480,7 @@ def _open( class BufferedFile(fsspec.spec.AbstractBufferedFile): - """Write buffered file wrapped around `fsspec.spec.AbstractBufferedFile`.""" + """Read/Write buffered file wrapped around `fsspec.spec.AbstractBufferedFile`.""" def __init__( self, From 5947e8a77a4eb438daf11e19eea4e8bcf2812953 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Mon, 3 Mar 2025 12:59:47 -0500 Subject: [PATCH 19/22] Use underlying reader/writer methods where possible --- obstore/python/obstore/fsspec.py | 153 +++++++++++++++++++++++-------- 1 file changed, 115 insertions(+), 38 deletions(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index be3e1c81..6c861e03 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -30,7 +30,7 @@ integration. """ -# ruff: noqa: ANN401, PTH123, FBT001, FBT002 +# ruff: noqa: ANN401, EM102, PTH123, FBT001, FBT002, S101 from __future__ import annotations @@ -52,7 +52,7 @@ if TYPE_CHECKING: from collections.abc import Coroutine, Iterable - from obstore import Bytes + from obstore import Bytes, ReadableFile, WritableFile from obstore.store import ( AzureConfig, AzureConfigInput, @@ -220,14 +220,6 @@ def _construct_store(self, bucket: str) -> ObjectStore: retry_config=self.retry_config, ) - def split_path(self, path: str) -> tuple[str, str]: - """Public method to split a path into bucket and path components.""" - return self._split_path(path) - - def construct_store(self, bucket: str) -> ObjectStore: - """Public method to construct a store for the given bucket.""" - return self._construct_store(bucket) - async def _rm_file(self, path: str, **_kwargs: Any) -> None: bucket, path = self._split_path(path) store = self._construct_store(bucket) @@ -472,35 +464,50 @@ def _open( **kwargs: Any, ) -> BufferedFile: """Return raw bytes-mode file-like from the file-system.""" - if mode in ("wb", "rb"): - return BufferedFile(self, path, mode, **kwargs) + if mode not in ("wb", "rb"): + err_msg = f"Only 'rb' and 'wb' modes supported, got: {mode}" + raise ValueError(err_msg) - err_msg = f"Only 'rb' and 'wb' mode is currently supported, got: {mode}" - raise ValueError(err_msg) + return BufferedFile(self, path, mode, **kwargs) class BufferedFile(fsspec.spec.AbstractBufferedFile): """Read/Write buffered file wrapped around `fsspec.spec.AbstractBufferedFile`.""" + mode: Literal["rb", "wb"] + fs: AsyncFsspecStore + _reader: ReadableFile + _writer: WritableFile + _writer_loc: int + """Stream position. + + Only defined for writers. We use the underlying rust stream position for reading. + """ + def __init__( self, fs: AsyncFsspecStore, path: str, - mode: str = "rb", + mode: Literal["rb", "wb"] = "rb", **kwargs: Any, ) -> None: """Create new buffered file.""" super().__init__(fs, path, mode, **kwargs) - bucket, self.path = self.fs.split_path(path) - self.store = self.fs.construct_store(bucket) + bucket, self.path = self.fs._split_path(path) # noqa: SLF001 + self.store = self.fs._construct_store(bucket) # noqa: SLF001 self.mode = mode if self.mode == "rb": self._reader = open_reader(self.store, self.path) + elif self.mode == "wb": + self._writer = open_writer(self.store, self.path) + self._writer_loc = 0 + else: + raise ValueError(f"Invalid mode: {mode}") - def read(self, length: int = -1) -> Bytes: + def read(self, length: int = -1) -> bytes: """Return bytes from the remote file. Args: @@ -511,36 +518,105 @@ def read(self, length: int = -1) -> Bytes: Data in bytes """ + if self.mode != "rb": + raise ValueError("File not in read mode") if length < 0: - length = self.size - self.loc + length = self.size - self.tell() + if self.closed: + raise ValueError("I/O operation on closed file.") + if length == 0: + # don't even bother calling fetch + return b"" - self._reader.seek(self.loc) out = self._reader.read(length) + return out.to_bytes() - self.loc += length + def readline(self) -> bytes: + """Read until first occurrence of newline character.""" + if self.mode != "rb": + raise ValueError("File not in read mode") - return out + out = self._reader.readline() + return out.to_bytes() - def _initiate_upload(self) -> None: - """Call by AbstractBufferedFile flusH() on the first flush.""" - self._writer = open_writer(self.store, self.path) + def readlines(self) -> list[bytes]: + """Return all data, split by the newline character.""" + if self.mode != "rb": + raise ValueError("File not in read mode") - def _upload_chunk(self, final: bool = False) -> bool: - """Call every time fsspec flushes the write buffer. + out = self._reader.readlines() + return [b.to_bytes() for b in out] - Returns: - Bool showing if chunk is updated + def tell(self) -> int: + """Get current file location.""" + if self.mode == "rb": + return self._reader.tell() + + if self.mode == "wb": + # There's no way to get the stream position from the underlying writer + # because it's async. Here we happen to be using the async writer in a + # synchronous way, so we keep our own stream position. + assert self._writer_loc is not None + return self._writer_loc + + raise ValueError(f"Unexpected mode {self.mode}") + + def seek(self, loc: int, whence: int = 0) -> int: + """Set current file location. + + Args: + loc: byte location + whence: Either + - `0`: from start of file + - `1`: current location + - `2`: end of file + + """ + if self.mode != "rb": + raise ValueError("Seek only available in read mode.") + + return self._reader.seek(loc, whence) + + def write(self, data: bytes) -> int: + """Write data to buffer. + + Args: + data: Set of bytes to be written. + + """ + if not self.writable(): + raise ValueError("File not in write mode") + if self.closed: + raise ValueError("I/O operation on closed file.") + if self.forced: + raise ValueError("This file has been force-flushed, can only close") + + num_written = self._writer.write(data) + self._writer_loc += num_written + + return num_written + + def flush( + self, + force: bool = False, # noqa: ARG002 + ) -> None: + """Write buffered data to backend store. + + Writes the current buffer, if it is larger than the block-size, or if + the file is being closed. + + Args: + force: Unused. """ - if self.buffer and len(self.buffer.getbuffer()) > 0: - self.buffer.seek(0) - self._writer.write(self.buffer.read()) - # flush all the data in buffer when closing - if final: - self._writer.flush() - return True + if self.closed: + raise ValueError("Flush on closed file") + + if self.readable(): + # no-op to flush on read-mode + return - return False + self._writer.flush() def close(self) -> None: """Close file. Ensure flushing the buffer.""" @@ -600,5 +676,6 @@ def _register(protocol: str, *, asynchronous: bool) -> None: "asynchronous": asynchronous, }, # Assign protocol dynamically ), - clobber=False, + # Override any existing implementations of the same protocol + clobber=True, ) From 7f229d09517715840d7c7cfeb13177afa2b02d58 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Mon, 3 Mar 2025 13:27:31 -0500 Subject: [PATCH 20/22] Updated docs --- obstore/python/obstore/fsspec.py | 87 +++++++++++++++++++++++++++----- 1 file changed, 74 insertions(+), 13 deletions(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index 6c861e03..fb9e4cf6 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -52,7 +52,7 @@ if TYPE_CHECKING: from collections.abc import Coroutine, Iterable - from obstore import Bytes, ReadableFile, WritableFile + from obstore import Attributes, Bytes, ReadableFile, WritableFile from obstore.store import ( AzureConfig, AzureConfigInput, @@ -472,10 +472,15 @@ def _open( class BufferedFile(fsspec.spec.AbstractBufferedFile): - """Read/Write buffered file wrapped around `fsspec.spec.AbstractBufferedFile`.""" + """A buffered readable or writable file. + + This is a wrapper around [`obstore.ReadableFile`][] and [`obstore.WritableFile`][]. + If you don't have a need to use the fsspec integration, you may be better served by + using [`open_reader`][obstore.open_reader] or [`open_writer`][obstore.open_writer] + directly. + """ mode: Literal["rb", "wb"] - fs: AsyncFsspecStore _reader: ReadableFile _writer: WritableFile _writer_loc: int @@ -484,25 +489,79 @@ class BufferedFile(fsspec.spec.AbstractBufferedFile): Only defined for writers. We use the underlying rust stream position for reading. """ + @overload + def __init__( + self, + fs: AsyncFsspecStore, + path: str, + mode: Literal["rb"] = "rb", + *, + buffer_size: int = 1024 * 1024, + **kwargs: Any, + ) -> None: ... + @overload def __init__( + self, + fs: AsyncFsspecStore, + path: str, + mode: Literal["wb"], + *, + buffer_size: int = 10 * 1024 * 1024, + attributes: Attributes | None = None, + tags: dict[str, str] | None = None, + **kwargs: Any, + ) -> None: ... + def __init__( # noqa: PLR0913 self, fs: AsyncFsspecStore, path: str, mode: Literal["rb", "wb"] = "rb", + *, + buffer_size: int | None = None, + attributes: Attributes | None = None, + tags: dict[str, str] | None = None, **kwargs: Any, ) -> None: - """Create new buffered file.""" + """Create new buffered file. + + Args: + fs: The underlying fsspec store to read from. + path: The path within the store to use. + mode: `"rb"` for a readable binary file or `"wb"` for a writable binary + file. Defaults to "rb". + + Keyword Args: + attributes: Provide a set of `Attributes`. Only used when writing. Defaults + to `None`. + buffer_size: Up to `buffer_size` bytes will be buffered in memory. **When + reading:** The minimum number of bytes to read in a single request. + **When writing:** If `buffer_size` is exceeded, data will be uploaded + as a multipart upload in chunks of `buffer_size`. Defaults to None. + tags: Provide tags for this object. Only used when writing. Defaults to + `None`. + kwargs: Keyword arguments passed on to [`fsspec.spec.AbstractBufferedFile`][]. + + """ # noqa: E501 super().__init__(fs, path, mode, **kwargs) - bucket, self.path = self.fs._split_path(path) # noqa: SLF001 - self.store = self.fs._construct_store(bucket) # noqa: SLF001 + bucket, path = fs._split_path(path) # noqa: SLF001 + store = fs._construct_store(bucket) # noqa: SLF001 self.mode = mode if self.mode == "rb": - self._reader = open_reader(self.store, self.path) + buffer_size = 1024 * 1024 if buffer_size is None else buffer_size + self._reader = open_reader(store, path, buffer_size=buffer_size) elif self.mode == "wb": - self._writer = open_writer(self.store, self.path) + buffer_size = 10 * 1024 * 1024 if buffer_size is None else buffer_size + self._writer = open_writer( + store, + path, + attributes=attributes, + buffer_size=buffer_size, + tags=tags, + ) + self._writer_loc = 0 else: raise ValueError(f"Invalid mode: {mode}") @@ -647,14 +706,16 @@ def register(protocol: str | Iterable[str], *, asynchronous: bool = False) -> No asynchronous operations. Defaults to False. Example: - >>> register("s3") - >>> register("s3", asynchronous=True) # Registers an async store for "s3" - >>> register(["gcs", "abfs"]) # Registers both "gcs" and "abfs" + ```py + register("s3") + register("s3", asynchronous=True) # Registers an async store for "s3" + register(["gcs", "abfs"]) # Registers both "gcs" and "abfs" + ``` Notes: - Each protocol gets a dynamically generated subclass named - `AsyncFsspecStore_`. - - This avoids modifying the original AsyncFsspecStore class. + `AsyncFsspecStore_`. This avoids modifying the original + AsyncFsspecStore class. """ if isinstance(protocol, str): From 317e954e36f3b93db9bb16f922bd39cc453fd611 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Mon, 3 Mar 2025 13:34:50 -0500 Subject: [PATCH 21/22] Override `loc` property --- obstore/python/obstore/fsspec.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index fb9e4cf6..eb9975a1 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -691,6 +691,13 @@ def close(self) -> None: finally: self.closed = True + @property + def loc(self) -> int: + """Get current file location.""" + # Note, we override the `loc` attribute, because for the reader we manage that + # state in Rust. + return self.tell() + def register(protocol: str | Iterable[str], *, asynchronous: bool = False) -> None: """Dynamically register a subclass of AsyncFsspecStore for the given protocol(s). From 0442a5f433b62e04e319ca8d29cd5f0067365e0a Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Mon, 3 Mar 2025 13:45:16 -0500 Subject: [PATCH 22/22] loc setter to allow `__init__` --- obstore/python/obstore/fsspec.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index eb9975a1..3cff4616 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -698,6 +698,11 @@ def loc(self) -> int: # state in Rust. return self.tell() + @loc.setter + def loc(self, value: int) -> None: + if value != 0: + raise ValueError("Cannot set `.loc`. Use `seek` instead.") + def register(protocol: str | Iterable[str], *, asynchronous: bool = False) -> None: """Dynamically register a subclass of AsyncFsspecStore for the given protocol(s).