From 909b5b02c62cb5f439265bcca434b8f0c5c0df49 Mon Sep 17 00:00:00 2001 From: machichima Date: Mon, 3 Feb 2025 23:00:56 +0800 Subject: [PATCH 01/51] feat: split bucket from path + construct store constructe store with from_url using protocol and bucket name --- obstore/python/obstore/fsspec.py | 57 ++++++++++++++++++++++++++++++-- 1 file changed, 54 insertions(+), 3 deletions(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index 13b3f1e3..8d3c198e 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -24,12 +24,14 @@ import asyncio from collections import defaultdict +from functools import lru_cache from typing import Any, Coroutine, Dict, List, Tuple import fsspec.asyn import fsspec.spec import obstore as obs +from obstore.store import S3Store, GCSStore, AzureStore class AsyncFsspecStore(fsspec.asyn.AsyncFileSystem): @@ -45,6 +47,9 @@ def __init__( self, store: obs.store.ObjectStore, *args, + config: dict[str, Any] = {}, + client_options: dict[str, Any] = {}, + retry_config: dict[str, Any] = {}, asynchronous: bool = False, loop=None, batch_size: int | None = None, @@ -76,10 +81,53 @@ def __init__( """ self.store = store + + self.config = config + self.client_options = client_options + self.retry_config = retry_config + super().__init__( *args, asynchronous=asynchronous, loop=loop, batch_size=batch_size ) + def _split_path(self, path: str) -> Tuple[str, str]: + """ + Split bucket and file path + + Args: + path (str): Input path, like `s3://mybucket/path/to/file` + + Examples: + >>> split_path("s3://mybucket/path/to/file") + ['mybucket', 'path/to/file'] + """ + + store_with_bucket = (S3Store, GCSStore, AzureStore) + + if ( + not isinstance(self.store, store_with_bucket) # instance + and not self.store in store_with_bucket # not instantiation + ): + # no bucket name in path + return "", path + + if "/" not in path: + return path, "" + else: + path_li = path.split("/") + bucket = path_li[0] + file_path = "/".join(path_li[1:]) + return (bucket, file_path) + + @lru_cache(maxsize=10) + def _construct_store(self, bucket: str): + return self.store.from_url( + f"{self.protocol}://{bucket}", + **self.config, + client_options=self.client_options, + retry_config=self.retry_config if self.retry_config else None, + ) + async def _rm_file(self, path, **kwargs): return await obs.delete_async(self.store, path) @@ -90,11 +138,14 @@ async def _pipe_file(self, path, value, **kwargs): return await obs.put_async(self.store, path, value) async def _cat_file(self, path, start=None, end=None, **kwargs): + bucket, path = self._split_path(path) + store = self._construct_store(bucket) + if start is None and end is None: - resp = await obs.get_async(self.store, path) - return await resp.bytes_async() + resp = await obs.get_async(store, path) + return (await resp.bytes_async()).to_bytes() - range_bytes = await obs.get_range_async(self.store, path, start=start, end=end) + range_bytes = await obs.get_range_async(store, path, start=start, end=end) return range_bytes.to_bytes() async def _cat_ranges( From 29464a7b2e787a412a6cf05f05f603f5b1ba7fd6 Mon Sep 17 00:00:00 2001 From: machichima Date: Tue, 4 Feb 2025 22:04:37 +0800 Subject: [PATCH 02/51] feat: remove store + add protocol + apply to all methods --- obstore/python/obstore/fsspec.py | 57 +++++++++++++++++++++----------- 1 file changed, 37 insertions(+), 20 deletions(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index 8d3c198e..ed0aca54 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -31,7 +31,7 @@ import fsspec.spec import obstore as obs -from obstore.store import S3Store, GCSStore, AzureStore +from obstore.store import S3Store, GCSStore, AzureStore, from_url class AsyncFsspecStore(fsspec.asyn.AsyncFileSystem): @@ -45,7 +45,6 @@ class AsyncFsspecStore(fsspec.asyn.AsyncFileSystem): def __init__( self, - store: obs.store.ObjectStore, *args, config: dict[str, Any] = {}, client_options: dict[str, Any] = {}, @@ -80,8 +79,6 @@ def __init__( ``` """ - self.store = store - self.config = config self.client_options = client_options self.retry_config = retry_config @@ -102,12 +99,9 @@ def _split_path(self, path: str) -> Tuple[str, str]: ['mybucket', 'path/to/file'] """ - store_with_bucket = (S3Store, GCSStore, AzureStore) + protocol_with_bucket = ["s3", "s3a", "gcs", "gs", "abfs"] - if ( - not isinstance(self.store, store_with_bucket) # instance - and not self.store in store_with_bucket # not instantiation - ): + if not self.protocol in protocol_with_bucket: # no bucket name in path return "", path @@ -121,21 +115,28 @@ def _split_path(self, path: str) -> Tuple[str, str]: @lru_cache(maxsize=10) def _construct_store(self, bucket: str): - return self.store.from_url( - f"{self.protocol}://{bucket}", - **self.config, + # from obstore.store import from_url + return from_url( + url=f"{self.protocol}://{bucket}", + config=self.config, client_options=self.client_options, retry_config=self.retry_config if self.retry_config else None, ) async def _rm_file(self, path, **kwargs): - return await obs.delete_async(self.store, path) + bucket, path = self._split_path(path) + store = self._construct_store(bucket) + return await obs.delete_async(store, path) async def _cp_file(self, path1, path2, **kwargs): - return await obs.copy_async(self.store, path1, path2) + bucket, path = self._split_path(path) + store = self._construct_store(bucket) + return await obs.copy_async(store, path1, path2) async def _pipe_file(self, path, value, **kwargs): - return await obs.put_async(self.store, path, value) + bucket, path = self._split_path(path) + store = self._construct_store(bucket) + return await obs.put_async(store, path, value) async def _cat_file(self, path, start=None, end=None, **kwargs): bucket, path = self._split_path(path) @@ -158,6 +159,9 @@ async def _cat_ranges( on_error="return", **kwargs, ): + bucket, path = self._split_path(path) + store = self._construct_store(bucket) + if isinstance(starts, int): starts = [starts] * len(paths) if isinstance(ends, int): @@ -173,7 +177,7 @@ async def _cat_ranges( for path, ranges in per_file_requests.items(): offsets = [r[0] for r in ranges] ends = [r[1] for r in ranges] - fut = obs.get_ranges_async(self.store, path, starts=offsets, ends=ends) + fut = obs.get_ranges_async(store, path, starts=offsets, ends=ends) futs.append(fut) result = await asyncio.gather(*futs) @@ -188,17 +192,24 @@ async def _cat_ranges( return output_buffers async def _put_file(self, lpath, rpath, **kwargs): + bucket, path = self._split_path(path) + store = self._construct_store(bucket) with open(lpath, "rb") as f: - await obs.put_async(self.store, rpath, f) + await obs.put_async(store, rpath, f) async def _get_file(self, rpath, lpath, **kwargs): + bucket, path = self._split_path(path) + store = self._construct_store(bucket) with open(lpath, "wb") as f: - resp = await obs.get_async(self.store, rpath) + resp = await obs.get_async(store, rpath) async for buffer in resp.stream(): f.write(buffer) async def _info(self, path, **kwargs): - head = await obs.head_async(self.store, path) + bucket, path = self._split_path(path) + store = self._construct_store(bucket) + + head = await obs.head_async(store, path) return { # Required of `info`: (?) "name": head["path"], @@ -211,7 +222,10 @@ async def _info(self, path, **kwargs): } async def _ls(self, path, detail=True, **kwargs): - result = await obs.list_with_delimiter_async(self.store, path) + bucket, path = self._split_path(path) + store = self._construct_store(bucket) + + result = await obs.list_with_delimiter_async(store, path) objects = result["objects"] prefs = result["common_prefixes"] if detail: @@ -229,6 +243,9 @@ async def _ls(self, path, detail=True, **kwargs): def _open(self, path, mode="rb", **kwargs): """Return raw bytes-mode file-like from the file-system""" + bucket, path = self._split_path(path) + store = self._construct_store(bucket) + return BufferedFileSimple(self, path, mode, **kwargs) From a0d9e1de5d2eac44c66f23a2024e87a1ee82703d Mon Sep 17 00:00:00 2001 From: machichima Date: Tue, 4 Feb 2025 22:26:02 +0800 Subject: [PATCH 03/51] feat: inherit from AsyncFsspecStore to specify protocol Specify protocol s3, gs, and abfs --- obstore/python/obstore/fsspec.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index ed0aca54..8d73c30b 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -31,8 +31,7 @@ import fsspec.spec import obstore as obs -from obstore.store import S3Store, GCSStore, AzureStore, from_url - +from obstore.store import from_url class AsyncFsspecStore(fsspec.asyn.AsyncFileSystem): """An fsspec implementation based on a obstore Store. @@ -115,7 +114,6 @@ def _split_path(self, path: str) -> Tuple[str, str]: @lru_cache(maxsize=10) def _construct_store(self, bucket: str): - # from obstore.store import from_url return from_url( url=f"{self.protocol}://{bucket}", config=self.config, @@ -269,3 +267,13 @@ def read(self, length: int = -1): data = self.fs.cat_file(self.path, self.loc, self.loc + length) self.loc += length return data + + +class S3FsspecStore(AsyncFsspecStore): + protocol = "s3" + +class GCSFsspecStore(AsyncFsspecStore): + protocol = "gs" + +class AzureFsspecStore(AsyncFsspecStore): + protocol = "abfs" From 6614906c689c2fedd51482feba7f1dc01ad3c10d Mon Sep 17 00:00:00 2001 From: machichima Date: Thu, 6 Feb 2025 21:20:00 +0800 Subject: [PATCH 04/51] fix: correctly split protocol if exists in path --- obstore/python/obstore/fsspec.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index 8d73c30b..e657a222 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -104,6 +104,12 @@ def _split_path(self, path: str) -> Tuple[str, str]: # no bucket name in path return "", path + if path.startswith(self.protocol + "://"): + path = path[len(self.protocol) + 3 :] + elif path.startswith(self.protocol + "::"): + path = path[len(self.protocol) + 2 :] + path = path.rstrip("/") + if "/" not in path: return path, "" else: From 75c738e29916bc4874d9a876f03b713e5c95080b Mon Sep 17 00:00:00 2001 From: machichima Date: Fri, 7 Feb 2025 23:38:16 +0800 Subject: [PATCH 05/51] feat: use urlparse to extract protocol --- obstore/python/obstore/fsspec.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index e657a222..be137e3a 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -22,6 +22,7 @@ from __future__ import annotations +from urllib.parse import urlparse import asyncio from collections import defaultdict from functools import lru_cache @@ -104,11 +105,11 @@ def _split_path(self, path: str) -> Tuple[str, str]: # no bucket name in path return "", path - if path.startswith(self.protocol + "://"): - path = path[len(self.protocol) + 3 :] - elif path.startswith(self.protocol + "::"): - path = path[len(self.protocol) + 2 :] - path = path.rstrip("/") + res = urlparse(path) + if res.scheme: + if res.scheme != self.protocol: + raise ValueError(f"Expect protocol to be {self.protocol}. Got {res.schema}") + path = res.netloc + res.path if "/" not in path: return path, "" From 46c6b59e78a098a8ea7359e0eacd89b566587611 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Fri, 7 Feb 2025 11:31:03 -0500 Subject: [PATCH 06/51] update typing --- obstore/python/obstore/fsspec.py | 70 +++++++++++++++++++++++++++----- 1 file changed, 60 insertions(+), 10 deletions(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index 05ee8e2f..985ea2c9 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -22,18 +22,39 @@ from __future__ import annotations -from urllib.parse import urlparse import asyncio from collections import defaultdict from functools import lru_cache -from typing import Any, Coroutine, Dict, List, Tuple +from typing import ( + TYPE_CHECKING, + Any, + Coroutine, + Dict, + List, + Tuple, +) +from urllib.parse import urlparse import fsspec.asyn import fsspec.spec import obstore as obs +from obstore import Bytes from obstore.store import from_url +if TYPE_CHECKING: + from obstore.store import ( + AzureConfig, + AzureConfigInput, + ClientConfig, + GCSConfig, + GCSConfigInput, + RetryConfig, + S3Config, + S3ConfigInput, + ) + + class AsyncFsspecStore(fsspec.asyn.AsyncFileSystem): """An fsspec implementation based on a obstore Store. @@ -42,13 +63,30 @@ class AsyncFsspecStore(fsspec.asyn.AsyncFileSystem): """ cachable = False + config: ( + S3Config + | S3ConfigInput + | GCSConfig + | GCSConfigInput + | AzureConfig + | AzureConfigInput + | None + ) + client_options: ClientConfig | None + retry_config: RetryConfig | None def __init__( self, *args, - config: dict[str, Any] = {}, - client_options: dict[str, Any] = {}, - retry_config: dict[str, Any] = {}, + config: S3Config + | S3ConfigInput + | GCSConfig + | GCSConfigInput + | AzureConfig + | AzureConfigInput + | None = None, + client_options: ClientConfig | None = None, + retry_config: RetryConfig | None = None, asynchronous: bool = False, loop: Any = None, batch_size: int | None = None, @@ -108,7 +146,9 @@ def _split_path(self, path: str) -> Tuple[str, str]: res = urlparse(path) if res.scheme: if res.scheme != self.protocol: - raise ValueError(f"Expect protocol to be {self.protocol}. Got {res.schema}") + raise ValueError( + f"Expect protocol to be {self.protocol}. Got {res.scheme}" + ) path = res.netloc + res.path if "/" not in path: @@ -138,7 +178,7 @@ async def _cp_file(self, path1, path2, **kwargs): store = self._construct_store(bucket) return await obs.copy_async(store, path1, path2) - async def _pipe_file(self, path, value, **kwargs): + async def _pipe_file(self, path, value, mode="overwrite", **kwargs): bucket, path = self._split_path(path) store = self._construct_store(bucket) return await obs.put_async(store, path, value) @@ -178,7 +218,7 @@ async def _cat_ranges( for idx, (path, start, end) in enumerate(zip(paths, starts, ends)): per_file_requests[path].append((start, end, idx)) - futs: List[Coroutine[Any, Any, List[bytes]]] = [] + futs: List[Coroutine[Any, Any, List[Bytes]]] = [] for path, ranges in per_file_requests.items(): offsets = [r[0] for r in ranges] ends = [r[1] for r in ranges] @@ -196,7 +236,7 @@ async def _cat_ranges( return output_buffers - async def _put_file(self, lpath, rpath, **kwargs): + async def _put_file(self, lpath, rpath, mode="overwrite", **kwargs): bucket, path = self._split_path(path) store = self._construct_store(bucket) with open(lpath, "rb") as f: @@ -246,7 +286,15 @@ async def _ls(self, path, detail=True, **kwargs): else: return sorted([object["path"] for object in objects] + prefs) - def _open(self, path, mode="rb", **kwargs): + def _open( + self, + path, + mode="rb", + block_size=None, + autocommit=True, + cache_options=None, + **kwargs, + ): """Return raw bytes-mode file-like from the file-system""" bucket, path = self._split_path(path) store = self._construct_store(bucket) @@ -279,8 +327,10 @@ def read(self, length: int = -1): class S3FsspecStore(AsyncFsspecStore): protocol = "s3" + class GCSFsspecStore(AsyncFsspecStore): protocol = "gs" + class AzureFsspecStore(AsyncFsspecStore): protocol = "abfs" From 9ab35e1ba89a19e895da7265a74a306f584ee053 Mon Sep 17 00:00:00 2001 From: machichima Date: Sat, 8 Feb 2025 16:07:00 +0800 Subject: [PATCH 07/51] fix: unbounded error --- obstore/python/obstore/fsspec.py | 46 +++++++++++++++++++++++--------- 1 file changed, 34 insertions(+), 12 deletions(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index 985ea2c9..7214bd5d 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -37,6 +37,7 @@ import fsspec.asyn import fsspec.spec +from python.obstore import Bytes import obstore as obs from obstore import Bytes @@ -174,8 +175,15 @@ async def _rm_file(self, path, **kwargs): return await obs.delete_async(store, path) async def _cp_file(self, path1, path2, **kwargs): - bucket, path = self._split_path(path) - store = self._construct_store(bucket) + bucket1, path1 = self._split_path(path1) + bucket2, path2 = self._split_path(path2) + + if bucket1 != bucket2: + raise ValueError( + f"Bucket mismatch: Source bucket '{bucket1}' and destination bucket '{bucket2}' must be the same." + ) + + store = self._construct_store(bucket1) return await obs.copy_async(store, path1, path2) async def _pipe_file(self, path, value, mode="overwrite", **kwargs): @@ -204,9 +212,6 @@ async def _cat_ranges( on_error="return", **kwargs, ): - bucket, path = self._split_path(path) - store = self._construct_store(bucket) - if isinstance(starts, int): starts = [starts] * len(paths) if isinstance(ends, int): @@ -220,6 +225,9 @@ async def _cat_ranges( futs: List[Coroutine[Any, Any, List[Bytes]]] = [] for path, ranges in per_file_requests.items(): + bucket, path = self._split_path(path) + store = self._construct_store(bucket) + offsets = [r[0] for r in ranges] ends = [r[1] for r in ranges] fut = obs.get_ranges_async(store, path, starts=offsets, ends=ends) @@ -236,15 +244,31 @@ async def _cat_ranges( return output_buffers - async def _put_file(self, lpath, rpath, mode="overwrite", **kwargs): - bucket, path = self._split_path(path) - store = self._construct_store(bucket) + async def _put_file(self, lpath, rpath, **kwargs): + lbucket, lpath = self._split_path(lpath) + rbucket, rpath = self._split_path(rpath) + + if lbucket != rbucket: + raise ValueError( + f"Bucket mismatch: Source bucket '{lbucket}' and destination bucket '{rbucket}' must be the same." + ) + + store = self._construct_store(lbucket) + with open(lpath, "rb") as f: await obs.put_async(store, rpath, f) async def _get_file(self, rpath, lpath, **kwargs): - bucket, path = self._split_path(path) - store = self._construct_store(bucket) + lbucket, lpath = self._split_path(lpath) + rbucket, rpath = self._split_path(rpath) + + if lbucket != rbucket: + raise ValueError( + f"Bucket mismatch: Source bucket '{lbucket}' and destination bucket '{rbucket}' must be the same." + ) + + store = self._construct_store(lbucket) + with open(lpath, "wb") as f: resp = await obs.get_async(store, rpath) async for buffer in resp.stream(): @@ -296,8 +320,6 @@ def _open( **kwargs, ): """Return raw bytes-mode file-like from the file-system""" - bucket, path = self._split_path(path) - store = self._construct_store(bucket) return BufferedFileSimple(self, path, mode, **kwargs) From cb8049539d91a4fa00ec37be699e141a9f94ec01 Mon Sep 17 00:00:00 2001 From: machichima Date: Sat, 8 Feb 2025 16:13:51 +0800 Subject: [PATCH 08/51] fix: remove redundant import --- obstore/python/obstore/fsspec.py | 1 - 1 file changed, 1 deletion(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index 7214bd5d..85d4d571 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -37,7 +37,6 @@ import fsspec.asyn import fsspec.spec -from python.obstore import Bytes import obstore as obs from obstore import Bytes From b6a3d3a132ac9a6f9223634bd42cc5f8a569ed1c Mon Sep 17 00:00:00 2001 From: machichima Date: Sat, 8 Feb 2025 16:45:17 +0800 Subject: [PATCH 09/51] feat: add register() to register AsyncFsspecStore for provided protocol --- obstore/python/obstore/fsspec.py | 53 ++++++++++++++++++++++++-------- 1 file changed, 40 insertions(+), 13 deletions(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index 85d4d571..26d8255e 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -78,13 +78,15 @@ class AsyncFsspecStore(fsspec.asyn.AsyncFileSystem): def __init__( self, *args, - config: S3Config - | S3ConfigInput - | GCSConfig - | GCSConfigInput - | AzureConfig - | AzureConfigInput - | None = None, + config: ( + S3Config + | S3ConfigInput + | GCSConfig + | GCSConfigInput + | AzureConfig + | AzureConfigInput + | None + ) = None, client_options: ClientConfig | None = None, retry_config: RetryConfig | None = None, asynchronous: bool = False, @@ -345,13 +347,38 @@ def read(self, length: int = -1): return data -class S3FsspecStore(AsyncFsspecStore): - protocol = "s3" +def register(protocol: str | list[str]): + """ + Dynamically register a subclass of AsyncFsspecStore for the given protocol(s). + + This function creates a new subclass of AsyncFsspecStore with the specified + protocol and registers it with fsspec. If multiple protocols are provided, + the function registers each one individually. + Args: + protocol (str | list[str]): + A single protocol (e.g., "s3", "gcs", "abfs") or a list of protocols + to register AsyncFsspecStore for. -class GCSFsspecStore(AsyncFsspecStore): - protocol = "gs" + Example: + >>> register("s3") + >>> register(["gcs", "abfs"]) # Registers both "gcs" and "abfs" + Notes: + - Each protocol gets a dynamically generated subclass named `AsyncFsspecStore_`. + - This avoids modifying the original AsyncFsspecStore class. + """ -class AzureFsspecStore(AsyncFsspecStore): - protocol = "abfs" + if isinstance(protocol, list): + for p in protocol: + register(p) + return + + fsspec.register_implementation( + protocol, + type( + f"AsyncFsspecStore_{protocol}", # Unique class name + (AsyncFsspecStore,), # Base class + {"protocol": protocol}, # Assign protocol dynamically + ), + ) From 68cdff9caa4a3f65fda1014dfea5822f89d2fb1d Mon Sep 17 00:00:00 2001 From: machichima Date: Sat, 8 Feb 2025 17:36:44 +0800 Subject: [PATCH 10/51] feat: add validation for protocol in register() --- obstore/python/obstore/fsspec.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index 26d8255e..c0bb0cee 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -369,7 +369,26 @@ def register(protocol: str | list[str]): - This avoids modifying the original AsyncFsspecStore class. """ + # Ensure protocol is of type str or list + if not isinstance(protocol, (str, list)): + raise TypeError( + f"Protocol must be a string or a list of strings, got {type(protocol)}" + ) + + # Ensure protocol is not None or empty + if not protocol: + raise ValueError( + "Protocol must be a non-empty string or a list of non-empty strings." + ) + if isinstance(protocol, list): + # Ensure all elements are strings + if not all(isinstance(p, str) for p in protocol): + raise TypeError("All protocols in the list must be strings.") + # Ensure no empty strings in the list + if not all(p for p in protocol): + raise ValueError("Protocol names in the list must be non-empty strings.") + for p in protocol: register(p) return From fa5b53990f0b2649197f9342e2d674f67646ebbb Mon Sep 17 00:00:00 2001 From: machichima Date: Sat, 8 Feb 2025 18:03:16 +0800 Subject: [PATCH 11/51] test: for register() Check if AsyncFsspecStore is registered and test invalid types pass into register --- tests/test_fsspec.py | 45 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 44 insertions(+), 1 deletion(-) diff --git a/tests/test_fsspec.py b/tests/test_fsspec.py index ce9a1bf6..1295beab 100644 --- a/tests/test_fsspec.py +++ b/tests/test_fsspec.py @@ -1,12 +1,55 @@ import os +import fsspec import pyarrow.parquet as pq import pytest import obstore as obs -from obstore.fsspec import AsyncFsspecStore +from obstore.fsspec import AsyncFsspecStore, register +from tests.conftest import TEST_BUCKET_NAME +def test_register(): + """Test that register properly creates and registers a subclass for a given protocol.""" + register("s3") # Register the "s3" protocol dynamically + fs_class = fsspec.get_filesystem_class("s3") + + assert issubclass( + fs_class, AsyncFsspecStore + ), "Registered class should be a subclass of AsyncFsspecStore" + assert ( + fs_class.protocol == "s3" + ), "Registered class should have the correct protocol" + + # Ensure a new instance of the registered store can be created + fs_instance = fs_class() + assert isinstance( + fs_instance, AsyncFsspecStore + ), "Registered class should be instantiable" + + # Optionally, test multiple registrations + register(["gcs", "abfs"]) + assert issubclass(fsspec.get_filesystem_class("gcs"), AsyncFsspecStore) + assert issubclass(fsspec.get_filesystem_class("abfs"), AsyncFsspecStore) + + +def test_register_invalid_types(): + """Test that register rejects invalid input types.""" + with pytest.raises(TypeError): + register(123) # Not a string or list + + with pytest.raises(TypeError): + register(["s3", 42]) # List contains a non-string + + with pytest.raises(ValueError): + register(["s3", ""]) # List contains a non-string + + with pytest.raises(TypeError): + register(None) # None is invalid + + with pytest.raises(ValueError): + register([]) # Empty list is invalid + @pytest.fixture() def fs(s3_store): return AsyncFsspecStore(s3_store) From b7047797d8da4f998a50d211965cef4efa20453d Mon Sep 17 00:00:00 2001 From: machichima Date: Sat, 8 Feb 2025 18:14:37 +0800 Subject: [PATCH 12/51] feat: add async parameter for register() --- obstore/python/obstore/fsspec.py | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index c0bb0cee..68844b24 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -347,7 +347,7 @@ def read(self, length: int = -1): return data -def register(protocol: str | list[str]): +def register(protocol: str | list[str], asynchronous: bool = False): """ Dynamically register a subclass of AsyncFsspecStore for the given protocol(s). @@ -356,17 +356,21 @@ def register(protocol: str | list[str]): the function registers each one individually. Args: - protocol (str | list[str]): - A single protocol (e.g., "s3", "gcs", "abfs") or a list of protocols - to register AsyncFsspecStore for. + protocol (str | list[str]): + A single protocol (e.g., "s3", "gcs", "abfs") or a list of protocols + to register AsyncFsspecStore for. + asynchronous (bool, optional): + If True, the registered store will support asynchronous operations. + Defaults to False. Example: - >>> register("s3") - >>> register(["gcs", "abfs"]) # Registers both "gcs" and "abfs" + >>> register("s3") + >>> register("s3", asynchronous=True) # Registers an async-store for "s3" + >>> register(["gcs", "abfs"]) # Registers both "gcs" and "abfs" Notes: - - Each protocol gets a dynamically generated subclass named `AsyncFsspecStore_`. - - This avoids modifying the original AsyncFsspecStore class. + - Each protocol gets a dynamically generated subclass named `AsyncFsspecStore_`. + - This avoids modifying the original AsyncFsspecStore class. """ # Ensure protocol is of type str or list @@ -398,6 +402,9 @@ def register(protocol: str | list[str]): type( f"AsyncFsspecStore_{protocol}", # Unique class name (AsyncFsspecStore,), # Base class - {"protocol": protocol}, # Assign protocol dynamically + { + "protocol": protocol, + "asynchronous": asynchronous, + }, # Assign protocol dynamically ), ) From 61deac44eb28e3c9a59f11326324df391bd92d88 Mon Sep 17 00:00:00 2001 From: machichima Date: Sat, 8 Feb 2025 18:15:13 +0800 Subject: [PATCH 13/51] test: test async store created by register() --- tests/test_fsspec.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/tests/test_fsspec.py b/tests/test_fsspec.py index 1295beab..aac88754 100644 --- a/tests/test_fsspec.py +++ b/tests/test_fsspec.py @@ -27,9 +27,16 @@ def test_register(): fs_instance, AsyncFsspecStore ), "Registered class should be instantiable" - # Optionally, test multiple registrations - register(["gcs", "abfs"]) - assert issubclass(fsspec.get_filesystem_class("gcs"), AsyncFsspecStore) + # test register asynchronous + register("gcs", asynchronous=True) # Register the "s3" protocol dynamically + fs_class = fsspec.get_filesystem_class("gcs") + assert ( + fs_class.asynchronous == True + ), "Registered class should be asynchronous" + + # test multiple registrations + register(["file", "abfs"]) + assert issubclass(fsspec.get_filesystem_class("file"), AsyncFsspecStore) assert issubclass(fsspec.get_filesystem_class("abfs"), AsyncFsspecStore) From 4bc1599b481a7892109acc3595bf9b80533ffc4a Mon Sep 17 00:00:00 2001 From: machichima Date: Sat, 8 Feb 2025 22:53:13 +0800 Subject: [PATCH 14/51] feat: add http(s) into protocol_with_bucket list bucket for https is the netloc of the url (e.g. https://www.google.com/path, www.google.com is the bucket here) --- obstore/python/obstore/fsspec.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index 68844b24..76b94076 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -139,7 +139,7 @@ def _split_path(self, path: str) -> Tuple[str, str]: ['mybucket', 'path/to/file'] """ - protocol_with_bucket = ["s3", "s3a", "gcs", "gs", "abfs"] + protocol_with_bucket = ["s3", "s3a", "gcs", "gs", "abfs", "https", "http"] if not self.protocol in protocol_with_bucket: # no bucket name in path From 4dc914395298ecb0d94e4ae0c319442c51f92836 Mon Sep 17 00:00:00 2001 From: machichima Date: Sun, 9 Feb 2025 11:12:54 +0800 Subject: [PATCH 15/51] feat: ls return path with bucket name To solve error when _walk is called recurrsively with the previous result by ls --- obstore/python/obstore/fsspec.py | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index 76b94076..f587f778 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -291,6 +291,9 @@ async def _info(self, path, **kwargs): "version": head["version"], } + def _fill_bucket_name(self, path, bucket): + return f"{bucket}/{path}" + async def _ls(self, path, detail=True, **kwargs): bucket, path = self._split_path(path) store = self._construct_store(bucket) @@ -301,15 +304,25 @@ async def _ls(self, path, detail=True, **kwargs): if detail: return [ { - "name": object["path"], + "name": self._fill_bucket_name(object["path"], bucket), "size": object["size"], "type": "file", "e_tag": object["e_tag"], } for object in objects - ] + [{"name": object, "size": 0, "type": "directory"} for object in prefs] + ] + [ + { + "name": self._fill_bucket_name(pref, bucket), + "size": 0, + "type": "directory", + } + for pref in prefs + ] else: - return sorted([object["path"] for object in objects] + prefs) + return sorted( + [self._fill_bucket_name(object["path"], bucket) for object in objects] + + [self._fill_bucket_name(pref, bucket) for pref in prefs] + ) def _open( self, From fb607d0d2c9312ae6a5d806c39f83a2d591a4d13 Mon Sep 17 00:00:00 2001 From: machichima Date: Sun, 9 Feb 2025 11:18:49 +0800 Subject: [PATCH 16/51] feat: enable re-register same protocol --- obstore/python/obstore/fsspec.py | 1 + 1 file changed, 1 insertion(+) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index f587f778..5cf1b919 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -420,4 +420,5 @@ def register(protocol: str | list[str], asynchronous: bool = False): "asynchronous": asynchronous, }, # Assign protocol dynamically ), + clobber=True, ) From b74948afb4d7e8f7a9ee979a3e2ab21818561e3a Mon Sep 17 00:00:00 2001 From: machichima Date: Sun, 9 Feb 2025 11:19:28 +0800 Subject: [PATCH 17/51] test: update pytest fixture to use register() --- tests/conftest.py | 10 ++++++++++ tests/test_fsspec.py | 5 +++-- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 9739e932..897babe6 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -51,3 +51,13 @@ def s3_store(s3): "AWS_ALLOW_HTTP": "true", }, ) + + +@pytest.fixture() +def s3_store_config(s3): + return { + "AWS_ENDPOINT_URL": s3, + "AWS_REGION": "us-east-1", + "AWS_SKIP_SIGNATURE": "True", + "AWS_ALLOW_HTTP": "true", + } diff --git a/tests/test_fsspec.py b/tests/test_fsspec.py index aac88754..29a5b74a 100644 --- a/tests/test_fsspec.py +++ b/tests/test_fsspec.py @@ -58,8 +58,9 @@ def test_register_invalid_types(): register([]) # Empty list is invalid @pytest.fixture() -def fs(s3_store): - return AsyncFsspecStore(s3_store) +def fs(s3_store_config): + register("s3") + return fsspec.filesystem("s3", config=s3_store_config) def test_list(fs): From f6ba27cb1a97625bb457e14fe560b5d2a1eeb9d8 Mon Sep 17 00:00:00 2001 From: machichima Date: Sun, 9 Feb 2025 11:21:04 +0800 Subject: [PATCH 18/51] test: update test with new path format path with bucket name --- tests/test_fsspec.py | 101 ++++++++++++++++++++++++++----------------- 1 file changed, 62 insertions(+), 39 deletions(-) diff --git a/tests/test_fsspec.py b/tests/test_fsspec.py index 29a5b74a..cff6b2aa 100644 --- a/tests/test_fsspec.py +++ b/tests/test_fsspec.py @@ -4,7 +4,6 @@ import pyarrow.parquet as pq import pytest -import obstore as obs from obstore.fsspec import AsyncFsspecStore, register from tests.conftest import TEST_BUCKET_NAME @@ -30,9 +29,7 @@ def test_register(): # test register asynchronous register("gcs", asynchronous=True) # Register the "s3" protocol dynamically fs_class = fsspec.get_filesystem_class("gcs") - assert ( - fs_class.asynchronous == True - ), "Registered class should be asynchronous" + assert fs_class.asynchronous == True, "Registered class should be asynchronous" # test multiple registrations register(["file", "abfs"]) @@ -57,6 +54,7 @@ def test_register_invalid_types(): with pytest.raises(ValueError): register([]) # Empty list is invalid + @pytest.fixture() def fs(s3_store_config): register("s3") @@ -64,88 +62,113 @@ def fs(s3_store_config): def test_list(fs): - out = fs.ls("", detail=False) - assert out == ["afile"] - fs.pipe_file("dir/bfile", b"data") - out = fs.ls("", detail=False) - assert out == ["afile", "dir"] - out = fs.ls("", detail=True) + out = fs.ls(f"{TEST_BUCKET_NAME}", detail=False) + assert out == [f"{TEST_BUCKET_NAME}/afile"] + fs.pipe_file(f"{TEST_BUCKET_NAME}/dir/bfile", b"data") + out = fs.ls(f"{TEST_BUCKET_NAME}", detail=False) + assert out == [f"{TEST_BUCKET_NAME}/afile", f"{TEST_BUCKET_NAME}/dir"] + out = fs.ls(f"{TEST_BUCKET_NAME}", detail=True) assert out[0]["type"] == "file" assert out[1]["type"] == "directory" @pytest.mark.asyncio -async def test_list_async(s3_store): - fs = AsyncFsspecStore(s3_store, asynchronous=True) - out = await fs._ls("", detail=False) - assert out == ["afile"] - await fs._pipe_file("dir/bfile", b"data") - out = await fs._ls("", detail=False) - assert out == ["afile", "dir"] - out = await fs._ls("", detail=True) +async def test_list_async(s3_store_config): + register("s3") + fs = fsspec.filesystem("s3", config=s3_store_config, asynchronous=True) + + out = await fs._ls(f"{TEST_BUCKET_NAME}", detail=False) + assert out == [f"{TEST_BUCKET_NAME}/afile"] + await fs._pipe_file(f"{TEST_BUCKET_NAME}/dir/bfile", b"data") + out = await fs._ls(f"{TEST_BUCKET_NAME}", detail=False) + assert out == [f"{TEST_BUCKET_NAME}/afile", f"{TEST_BUCKET_NAME}/dir"] + out = await fs._ls(f"{TEST_BUCKET_NAME}", detail=True) assert out[0]["type"] == "file" assert out[1]["type"] == "directory" @pytest.mark.network def test_remote_parquet(): - store = obs.store.HTTPStore.from_url("https://github.com") - fs = AsyncFsspecStore(store) - url = "opengeospatial/geoparquet/raw/refs/heads/main/examples/example.parquet" + register("https") + fs = fsspec.filesystem("https") + url = "github.com/opengeospatial/geoparquet/raw/refs/heads/main/examples/example.parquet" + pq.read_metadata(url, filesystem=fs) + + # also test with full url + url = "https://github.com/opengeospatial/geoparquet/raw/refs/heads/main/examples/example.parquet" pq.read_metadata(url, filesystem=fs) def test_multi_file_ops(fs): - data = {"dir/test1": b"test data1", "dir/test2": b"test data2"} + data = { + f"{TEST_BUCKET_NAME}/dir/test1": b"test data1", + f"{TEST_BUCKET_NAME}/dir/test2": b"test data2", + } fs.pipe(data) out = fs.cat(list(data)) assert out == data - out = fs.cat("dir", recursive=True) + out = fs.cat(f"{TEST_BUCKET_NAME}/dir", recursive=True) assert out == data - fs.cp("dir", "dir2", recursive=True) - out = fs.find("", detail=False) - assert out == ["afile", "dir/test1", "dir/test2", "dir2/test1", "dir2/test2"] - fs.rm(["dir", "dir2"], recursive=True) - out = fs.find("", detail=False) - assert out == ["afile"] + fs.cp(f"{TEST_BUCKET_NAME}/dir", f"{TEST_BUCKET_NAME}/dir2", recursive=True) + out = fs.find(f"{TEST_BUCKET_NAME}", detail=False) + assert out == [ + f"{TEST_BUCKET_NAME}/afile", + f"{TEST_BUCKET_NAME}/dir/test1", + f"{TEST_BUCKET_NAME}/dir/test2", + f"{TEST_BUCKET_NAME}/dir2/test1", + f"{TEST_BUCKET_NAME}/dir2/test2", + ] + fs.rm([f"{TEST_BUCKET_NAME}/dir", f"{TEST_BUCKET_NAME}/dir2"], recursive=True) + out = fs.find(f"{TEST_BUCKET_NAME}", detail=False) + assert out == [f"{TEST_BUCKET_NAME}/afile"] def test_cat_ranges_one(fs): data1 = os.urandom(10000) - fs.pipe_file("data1", data1) + fs.pipe_file(f"{TEST_BUCKET_NAME}/data1", data1) # single range - out = fs.cat_ranges(["data1"], [10], [20]) + out = fs.cat_ranges([f"{TEST_BUCKET_NAME}/data1"], [10], [20]) assert out == [data1[10:20]] # range oob - out = fs.cat_ranges(["data1"], [0], [11000]) + out = fs.cat_ranges([f"{TEST_BUCKET_NAME}/data1"], [0], [11000]) assert out == [data1] # two disjoint ranges, one file - out = fs.cat_ranges(["data1", "data1"], [10, 40], [20, 60]) + out = fs.cat_ranges( + [f"{TEST_BUCKET_NAME}/data1", f"{TEST_BUCKET_NAME}/data1"], [10, 40], [20, 60] + ) assert out == [data1[10:20], data1[40:60]] # two adjoining ranges, one file - out = fs.cat_ranges(["data1", "data1"], [10, 30], [20, 60]) + out = fs.cat_ranges( + [f"{TEST_BUCKET_NAME}/data1", f"{TEST_BUCKET_NAME}/data1"], [10, 30], [20, 60] + ) assert out == [data1[10:20], data1[30:60]] # two overlapping ranges, one file - out = fs.cat_ranges(["data1", "data1"], [10, 15], [20, 60]) + out = fs.cat_ranges( + [f"{TEST_BUCKET_NAME}/data1", f"{TEST_BUCKET_NAME}/data1"], [10, 15], [20, 60] + ) assert out == [data1[10:20], data1[15:60]] # completely overlapping ranges, one file - out = fs.cat_ranges(["data1", "data1"], [10, 0], [20, 60]) + out = fs.cat_ranges( + [f"{TEST_BUCKET_NAME}/data1", f"{TEST_BUCKET_NAME}/data1"], [10, 0], [20, 60] + ) assert out == [data1[10:20], data1[0:60]] def test_cat_ranges_two(fs): data1 = os.urandom(10000) data2 = os.urandom(10000) - fs.pipe({"data1": data1, "data2": data2}) + fs.pipe({f"{TEST_BUCKET_NAME}/data1": data1, f"{TEST_BUCKET_NAME}/data2": data2}) # single range in each file - out = fs.cat_ranges(["data1", "data2"], [10, 10], [20, 20]) + out = fs.cat_ranges( + [f"{TEST_BUCKET_NAME}/data1", f"{TEST_BUCKET_NAME}/data2"], [10, 10], [20, 20] + ) assert out == [data1[10:20], data2[10:20]] @@ -170,4 +193,4 @@ def test_atomic_write(fs): def test_cat_ranges_error(fs): with pytest.raises(ValueError): - fs.cat_ranges(["path"], [], []) + fs.cat_ranges([f"{TEST_BUCKET_NAME}/path"], [], []) From 30250cf3aaf1acf8692bdb1b6b18213cf61bf2be Mon Sep 17 00:00:00 2001 From: machichima Date: Sun, 9 Feb 2025 11:35:22 +0800 Subject: [PATCH 19/51] fix: mkdocs build error --- obstore/python/obstore/fsspec.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index 5cf1b919..3af7afac 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -96,7 +96,11 @@ def __init__( """Construct a new AsyncFsspecStore Args: - store: a configured instance of one of the store classes in `obstore.store`. + config: Configuration for the cloud storage provider, which can be one of + S3Config, S3ConfigInput, GCSConfig, GCSConfigInput, AzureConfig, + or AzureConfigInput. If None, no cloud storage configuration is applied. + client_options: Additional options for configuring the client. + retry_config: Configuration for handling request errors. asynchronous: Set to `True` if this instance is meant to be be called using the fsspec async API. This should only be set to true when running within a coroutine. @@ -369,12 +373,10 @@ def register(protocol: str | list[str], asynchronous: bool = False): the function registers each one individually. Args: - protocol (str | list[str]): - A single protocol (e.g., "s3", "gcs", "abfs") or a list of protocols - to register AsyncFsspecStore for. - asynchronous (bool, optional): - If True, the registered store will support asynchronous operations. - Defaults to False. + protocol (str | list[str]): A single protocol (e.g., "s3", "gcs", "abfs") or + a list of protocols to register AsyncFsspecStore for. + asynchronous (bool, optional): If True, the registered store will support + asynchronous operations. Defaults to False. Example: >>> register("s3") From 27a0ac7dc842f4acb67fa91cc4085df6cc412c92 Mon Sep 17 00:00:00 2001 From: machichima Date: Thu, 13 Feb 2025 20:45:20 +0800 Subject: [PATCH 20/51] fix: error when merging --- obstore/python/obstore/fsspec.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index dc0c4390..f300c17d 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -226,7 +226,7 @@ async def _pipe_file( path: str, value: Any, mode: str = "overwrite", # noqa: ARG002 - mode="overwrite", **_kwargs: Any, + **_kwargs: Any, ) -> Any: bucket, path = self._split_path(path) store = self._construct_store(bucket) @@ -244,7 +244,7 @@ async def _cat_file( if start is None and end is None: resp = await obs.get_async(store, path) - return ((await resp.bytes_async()).to_bytes()).to_bytes() + return (await resp.bytes_async()).to_bytes() if start is None or end is None: raise NotImplementedError( From d2d0235055c2910132969e83ecf2892c4009a6f0 Mon Sep 17 00:00:00 2001 From: machichima Date: Thu, 13 Feb 2025 21:57:59 +0800 Subject: [PATCH 21/51] build: add some ruff ignore --- pyproject.toml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index 2f12586e..92df5cda 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,12 +48,18 @@ ignore = [ "ANN204", # Missing return type annotation for special method "E501", # Line too long ] +"*.py" = [ + "E501", # Line too long + "FBT001", # Boolean positional argument in function definition (should be keyword-only) + "FBT002", # Boolean default positional argument in function definition (should be keyword-only) +] "tests/*" = [ "S101", # assert "ANN201", # Missing return type annotation for public function "ANN202", # Missing return type annotation for private function `it` "D100", # Missing docstring in public module "D103", # Missing docstring in public function + "E501", # Line too long "PLR2004", # Magic value used in comparison, consider replacing `100` with a constant variable "S301", # `pickle` and modules that wrap it can be unsafe when used to deserialize untrusted data, possible security issue "SLF001", # Private member accessed From 1f97703348bfcd4717ed187a482a84fbee69bbd9 Mon Sep 17 00:00:00 2001 From: machichima Date: Thu, 13 Feb 2025 21:58:32 +0800 Subject: [PATCH 22/51] fix: ruff error --- obstore/python/obstore/fsspec.py | 116 ++++++++++++++----------------- tests/conftest.py | 4 +- tests/test_fsspec.py | 70 +++++++++++++------ 3 files changed, 105 insertions(+), 85 deletions(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index f300c17d..1158a3df 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -39,21 +39,13 @@ import asyncio from collections import defaultdict -from functools import lru_cache -from typing import ( - TYPE_CHECKING, - Any, - Coroutine, - Dict, - List, - Tuple, - Literal, - overload -) +from collections.abc import Coroutine +from typing import TYPE_CHECKING, Any, Literal, overload from urllib.parse import urlparse import fsspec.asyn import fsspec.spec +from cachetools import LRUCache import obstore as obs from obstore import Bytes @@ -66,6 +58,7 @@ ClientConfig, GCSConfig, GCSConfigInput, + ObjectStore, RetryConfig, S3Config, S3ConfigInput, @@ -77,7 +70,6 @@ from obstore import Bytes - class AsyncFsspecStore(fsspec.asyn.AsyncFileSystem): """An fsspec implementation based on a obstore Store. @@ -98,7 +90,7 @@ class AsyncFsspecStore(fsspec.asyn.AsyncFileSystem): client_options: ClientConfig | None retry_config: RetryConfig | None - def __init__( + def __init__( # noqa: PLR0913 self, *args: Any, config: ( @@ -149,7 +141,7 @@ def __init__( ``` """ - + self._store_cache = LRUCache(maxsize=10) self.config = config self.client_options = client_options self.retry_config = retry_config @@ -161,48 +153,50 @@ def __init__( batch_size=batch_size, ) - def _split_path(self, path: str) -> Tuple[str, str]: - """ - Split bucket and file path + def _split_path(self, path: str) -> tuple[str, str]: + """Split bucket and file path. Args: path (str): Input path, like `s3://mybucket/path/to/file` + Returns: + tuple[str, str]: with the first element as bucket name and second be + the file path inside the bucket + Examples: >>> split_path("s3://mybucket/path/to/file") ['mybucket', 'path/to/file'] - """ + """ protocol_with_bucket = ["s3", "s3a", "gcs", "gs", "abfs", "https", "http"] - if not self.protocol in protocol_with_bucket: + if self.protocol not in protocol_with_bucket: # no bucket name in path return "", path res = urlparse(path) if res.scheme: if res.scheme != self.protocol: - raise ValueError( - f"Expect protocol to be {self.protocol}. Got {res.scheme}" - ) + err_msg = f"Expect protocol to be {self.protocol}. Got {res.scheme}" + raise ValueError(err_msg) path = res.netloc + res.path if "/" not in path: return path, "" - else: - path_li = path.split("/") - bucket = path_li[0] - file_path = "/".join(path_li[1:]) - return (bucket, file_path) - - @lru_cache(maxsize=10) - def _construct_store(self, bucket: str): - return from_url( - url=f"{self.protocol}://{bucket}", - config=self.config, - client_options=self.client_options, - retry_config=self.retry_config if self.retry_config else None, - ) + path_li = path.split("/") + bucket = path_li[0] + file_path = "/".join(path_li[1:]) + return (bucket, file_path) + + def _construct_store(self, bucket: str) -> ObjectStore: + if bucket not in self._store_cache: + self._store_cache[bucket] = from_url( + url=f"{self.protocol}://{bucket}", + config=self.config, + client_options=self.client_options, + retry_config=self.retry_config or None, + ) + return self._store_cache[bucket] async def _rm_file(self, path: str, **_kwargs: Any) -> None: bucket, path = self._split_path(path) @@ -214,9 +208,8 @@ async def _cp_file(self, path1: str, path2: str, **_kwargs: Any) -> None: bucket2, path2 = self._split_path(path2) if bucket1 != bucket2: - raise ValueError( - f"Bucket mismatch: Source bucket '{bucket1}' and destination bucket '{bucket2}' must be the same." - ) + err_msg = f"Bucket mismatch: Source bucket '{bucket1}' and destination bucket '{bucket2}' must be the same." + raise ValueError(err_msg) store = self._construct_store(bucket1) return await obs.copy_async(store, path1, path2) @@ -279,12 +272,12 @@ async def _cat_ranges( # noqa: PLR0913 futs: list[Coroutine[Any, Any, list[Bytes]]] = [] for path, ranges in per_file_requests.items(): - bucket, path = self._split_path(path) + bucket, path_no_bucket = self._split_path(path) store = self._construct_store(bucket) offsets = [r[0] for r in ranges] ends = [r[1] for r in ranges] - fut = obs.get_ranges_async(store, path, starts=offsets, ends=ends) + fut = obs.get_ranges_async(store, path_no_bucket, starts=offsets, ends=ends) futs.append(fut) result = await asyncio.gather(*futs) @@ -315,9 +308,8 @@ async def _put_file( rbucket, rpath = self._split_path(rpath) if lbucket != rbucket: - raise ValueError( - f"Bucket mismatch: Source bucket '{lbucket}' and destination bucket '{rbucket}' must be the same." - ) + err_msg = f"Bucket mismatch: Source bucket '{lbucket}' and destination bucket '{rbucket}' must be the same." + raise ValueError(err_msg) store = self._construct_store(lbucket) @@ -331,9 +323,8 @@ async def _get_file(self, rpath: str, lpath: str, **_kwargs: Any) -> None: rbucket, rpath = self._split_path(rpath) if lbucket != rbucket: - raise ValueError( - f"Bucket mismatch: Source bucket '{lbucket}' and destination bucket '{rbucket}' must be the same." - ) + err_msg = f"Bucket mismatch: Source bucket '{lbucket}' and destination bucket '{rbucket}' must be the same." + raise ValueError(err_msg) store = self._construct_store(lbucket) @@ -358,7 +349,7 @@ async def _info(self, path: str, **_kwargs: Any) -> dict[str, Any]: "version": head["version"], } - def _fill_bucket_name(self, path, bucket): + def _fill_bucket_name(self, path: str, bucket: str) -> str: return f"{bucket}/{path}" @overload @@ -372,13 +363,13 @@ async def _ls( async def _ls( self, path: str, - detail: Literal[True] = True, # noqa: FBT002 + detail: Literal[True] = True, **_kwargs: Any, ) -> list[dict[str, Any]]: ... async def _ls( self, path: str, - detail: bool = True, # noqa: FBT001, FBT002 + detail: bool = True, **_kwargs: Any, ) -> list[dict[str, Any]] | list[str]: bucket, path = self._split_path(path) @@ -404,18 +395,17 @@ async def _ls( } for pref in prefs ] - else: - return sorted( - [self._fill_bucket_name(obj["path"], bucket) for obj in objects] - + [self._fill_bucket_name(pref, bucket) for pref in prefs] - ) + return sorted( + [self._fill_bucket_name(obj["path"], bucket) for obj in objects] + + [self._fill_bucket_name(pref, bucket) for pref in prefs], + ) def _open( self, path: str, mode: str = "rb", block_size: Any = None, # noqa: ARG002 - autocommit: Any = True, # noqa: ARG002, FBT002 + autocommit: Any = True, # noqa: ARG002 cache_options: Any = None, # noqa: ARG002 **kwargs: Any, ) -> BufferedFileSimple: @@ -455,9 +445,8 @@ def read(self, length: int = -1) -> Any: return data -def register(protocol: str | list[str], asynchronous: bool = False): - """ - Dynamically register a subclass of AsyncFsspecStore for the given protocol(s). +def register(protocol: str | list[str], asynchronous: bool = False) -> None: + """Dynamically register a subclass of AsyncFsspecStore for the given protocol(s). This function creates a new subclass of AsyncFsspecStore with the specified protocol and registers it with fsspec. If multiple protocols are provided, @@ -477,18 +466,19 @@ def register(protocol: str | list[str], asynchronous: bool = False): Notes: - Each protocol gets a dynamically generated subclass named `AsyncFsspecStore_`. - This avoids modifying the original AsyncFsspecStore class. - """ + """ # Ensure protocol is of type str or list - if not isinstance(protocol, (str, list)): - raise TypeError( + if not isinstance(protocol, str | list): + err_msg = ( f"Protocol must be a string or a list of strings, got {type(protocol)}" ) + raise TypeError(err_msg) # Ensure protocol is not None or empty if not protocol: raise ValueError( - "Protocol must be a non-empty string or a list of non-empty strings." + "Protocol must be a non-empty string or a list of non-empty strings.", ) if isinstance(protocol, list): diff --git a/tests/conftest.py b/tests/conftest.py index 919c34e4..bc536307 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -51,8 +51,8 @@ def s3_store(s3: str): ) -@pytest.fixture() -def s3_store_config(s3): +@pytest.fixture +def s3_store_config(s3: str): return { "AWS_ENDPOINT_URL": s3, "AWS_REGION": "us-east-1", diff --git a/tests/test_fsspec.py b/tests/test_fsspec.py index 26999ae5..7411e6c8 100644 --- a/tests/test_fsspec.py +++ b/tests/test_fsspec.py @@ -1,4 +1,7 @@ +from __future__ import annotations + import os +from typing import TYPE_CHECKING import fsspec import pyarrow.parquet as pq @@ -7,29 +10,34 @@ from obstore.fsspec import AsyncFsspecStore, register from tests.conftest import TEST_BUCKET_NAME +if TYPE_CHECKING: + from obstore.store import S3Config + def test_register(): - """Test that register properly creates and registers a subclass for a given protocol.""" + """Test if register() creates and registers a subclass for a given protocol.""" register("s3") # Register the "s3" protocol dynamically fs_class = fsspec.get_filesystem_class("s3") assert issubclass( - fs_class, AsyncFsspecStore + fs_class, + AsyncFsspecStore, ), "Registered class should be a subclass of AsyncFsspecStore" - assert ( - fs_class.protocol == "s3" - ), "Registered class should have the correct protocol" + assert fs_class.protocol == "s3", ( + "Registered class should have the correct protocol" + ) # Ensure a new instance of the registered store can be created fs_instance = fs_class() assert isinstance( - fs_instance, AsyncFsspecStore + fs_instance, + AsyncFsspecStore, ), "Registered class should be instantiable" # test register asynchronous register("gcs", asynchronous=True) # Register the "s3" protocol dynamically fs_class = fsspec.get_filesystem_class("gcs") - assert fs_class.asynchronous == True, "Registered class should be asynchronous" + assert fs_class.asynchronous, "Registered class should be asynchronous" # test multiple registrations register(["file", "abfs"]) @@ -39,24 +47,36 @@ def test_register(): def test_register_invalid_types(): """Test that register rejects invalid input types.""" - with pytest.raises(TypeError): + with pytest.raises( + TypeError, + match="Protocol must be a string or a list of strings", + ): register(123) # Not a string or list - with pytest.raises(TypeError): + with pytest.raises(TypeError, match="All protocols in the list must be strings"): register(["s3", 42]) # List contains a non-string - with pytest.raises(ValueError): + with pytest.raises( + ValueError, + match="Protocol names in the list must be non-empty strings", + ): register(["s3", ""]) # List contains a non-string - with pytest.raises(TypeError): + with pytest.raises( + TypeError, + match="Protocol must be a string or a list of strings", + ): register(None) # None is invalid - with pytest.raises(ValueError): + with pytest.raises( + ValueError, + match="Protocol must be a non-empty string or a list of non-empty strings", + ): register([]) # Empty list is invalid -@pytest.fixture() -def fs(s3_store_config): +@pytest.fixture +def fs(s3_store_config: S3Config): register("s3") return fsspec.filesystem("s3", config=s3_store_config) @@ -73,7 +93,7 @@ def test_list(fs: AsyncFsspecStore): @pytest.mark.asyncio -async def test_list_async(s3_store_config): +async def test_list_async(s3_store_config: S3Config): register("s3") fs = fsspec.filesystem("s3", config=s3_store_config, asynchronous=True) @@ -137,25 +157,33 @@ def test_cat_ranges_one(fs: AsyncFsspecStore): # two disjoint ranges, one file out = fs.cat_ranges( - [f"{TEST_BUCKET_NAME}/data1", f"{TEST_BUCKET_NAME}/data1"], [10, 40], [20, 60] + [f"{TEST_BUCKET_NAME}/data1", f"{TEST_BUCKET_NAME}/data1"], + [10, 40], + [20, 60], ) assert out == [data1[10:20], data1[40:60]] # two adjoining ranges, one file out = fs.cat_ranges( - [f"{TEST_BUCKET_NAME}/data1", f"{TEST_BUCKET_NAME}/data1"], [10, 30], [20, 60] + [f"{TEST_BUCKET_NAME}/data1", f"{TEST_BUCKET_NAME}/data1"], + [10, 30], + [20, 60], ) assert out == [data1[10:20], data1[30:60]] # two overlapping ranges, one file out = fs.cat_ranges( - [f"{TEST_BUCKET_NAME}/data1", f"{TEST_BUCKET_NAME}/data1"], [10, 15], [20, 60] + [f"{TEST_BUCKET_NAME}/data1", f"{TEST_BUCKET_NAME}/data1"], + [10, 15], + [20, 60], ) assert out == [data1[10:20], data1[15:60]] # completely overlapping ranges, one file out = fs.cat_ranges( - [f"{TEST_BUCKET_NAME}/data1", f"{TEST_BUCKET_NAME}/data1"], [10, 0], [20, 60] + [f"{TEST_BUCKET_NAME}/data1", f"{TEST_BUCKET_NAME}/data1"], + [10, 0], + [20, 60], ) assert out == [data1[10:20], data1[0:60]] @@ -167,7 +195,9 @@ def test_cat_ranges_two(fs: AsyncFsspecStore): # single range in each file out = fs.cat_ranges( - [f"{TEST_BUCKET_NAME}/data1", f"{TEST_BUCKET_NAME}/data2"], [10, 10], [20, 20] + [f"{TEST_BUCKET_NAME}/data1", f"{TEST_BUCKET_NAME}/data2"], + [10, 10], + [20, 20], ) assert out == [data1[10:20], data2[10:20]] From b002afb484295ddb342270df3f8c34b1825c8753 Mon Sep 17 00:00:00 2001 From: machichima Date: Thu, 13 Feb 2025 22:03:10 +0800 Subject: [PATCH 23/51] build: add cachetools dependencies --- pyproject.toml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 92df5cda..bab9fc62 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,9 @@ version = "0.1.0" description = "Add your description here" readme = "README.md" requires-python = ">=3.11" -dependencies = [] +dependencies = [ + "cachetools>=5.5.1", +] [tool.uv] dev-dependencies = [ From 897beb07b84d5fda85776551015ae4ff09408a37 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Thu, 13 Feb 2025 17:04:21 -0500 Subject: [PATCH 24/51] better scoping of lints --- obstore/python/obstore/fsspec.py | 23 +++++++++++++++-------- pyproject.toml | 10 +--------- 2 files changed, 16 insertions(+), 17 deletions(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index 1158a3df..78260b7a 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -30,10 +30,7 @@ integration. """ -# ruff: noqa: ANN401 -# Dynamically typed expressions (typing.Any) are disallowed -# ruff: noqa: PTH123 -# `open()` should be replaced by `Path.open()` +# ruff: noqa: ANN401, PTH123, FBT001, FBT002 from __future__ import annotations @@ -208,7 +205,10 @@ async def _cp_file(self, path1: str, path2: str, **_kwargs: Any) -> None: bucket2, path2 = self._split_path(path2) if bucket1 != bucket2: - err_msg = f"Bucket mismatch: Source bucket '{bucket1}' and destination bucket '{bucket2}' must be the same." + err_msg = ( + f"Bucket mismatch: Source bucket '{bucket1}' and " + f"destination bucket '{bucket2}' must be the same." + ) raise ValueError(err_msg) store = self._construct_store(bucket1) @@ -308,7 +308,10 @@ async def _put_file( rbucket, rpath = self._split_path(rpath) if lbucket != rbucket: - err_msg = f"Bucket mismatch: Source bucket '{lbucket}' and destination bucket '{rbucket}' must be the same." + err_msg = ( + f"Bucket mismatch: Source bucket '{lbucket}' and " + f"destination bucket '{rbucket}' must be the same." + ) raise ValueError(err_msg) store = self._construct_store(lbucket) @@ -323,7 +326,10 @@ async def _get_file(self, rpath: str, lpath: str, **_kwargs: Any) -> None: rbucket, rpath = self._split_path(rpath) if lbucket != rbucket: - err_msg = f"Bucket mismatch: Source bucket '{lbucket}' and destination bucket '{rbucket}' must be the same." + err_msg = ( + f"Bucket mismatch: Source bucket '{lbucket}' and " + f"destination bucket '{rbucket}' must be the same." + ) raise ValueError(err_msg) store = self._construct_store(lbucket) @@ -464,7 +470,8 @@ def register(protocol: str | list[str], asynchronous: bool = False) -> None: >>> register(["gcs", "abfs"]) # Registers both "gcs" and "abfs" Notes: - - Each protocol gets a dynamically generated subclass named `AsyncFsspecStore_`. + - Each protocol gets a dynamically generated subclass named + `AsyncFsspecStore_`. - This avoids modifying the original AsyncFsspecStore class. """ diff --git a/pyproject.toml b/pyproject.toml index bab9fc62..d06f5b52 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,9 +4,7 @@ version = "0.1.0" description = "Add your description here" readme = "README.md" requires-python = ">=3.11" -dependencies = [ - "cachetools>=5.5.1", -] +dependencies = ["cachetools>=5.5.1"] [tool.uv] dev-dependencies = [ @@ -50,18 +48,12 @@ ignore = [ "ANN204", # Missing return type annotation for special method "E501", # Line too long ] -"*.py" = [ - "E501", # Line too long - "FBT001", # Boolean positional argument in function definition (should be keyword-only) - "FBT002", # Boolean default positional argument in function definition (should be keyword-only) -] "tests/*" = [ "S101", # assert "ANN201", # Missing return type annotation for public function "ANN202", # Missing return type annotation for private function `it` "D100", # Missing docstring in public module "D103", # Missing docstring in public function - "E501", # Line too long "PLR2004", # Magic value used in comparison, consider replacing `100` with a constant variable "S301", # `pickle` and modules that wrap it can be unsafe when used to deserialize untrusted data, possible security issue "SLF001", # Private member accessed From 072699963dc84d35b85fdb5d664d872e441afe7e Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Thu, 13 Feb 2025 17:05:39 -0500 Subject: [PATCH 25/51] lint --- tests/test_fsspec.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_fsspec.py b/tests/test_fsspec.py index 7411e6c8..d46d332f 100644 --- a/tests/test_fsspec.py +++ b/tests/test_fsspec.py @@ -111,7 +111,7 @@ async def test_list_async(s3_store_config: S3Config): def test_remote_parquet(): register("https") fs = fsspec.filesystem("https") - url = "github.com/opengeospatial/geoparquet/raw/refs/heads/main/examples/example.parquet" + url = "github.com/opengeospatial/geoparquet/raw/refs/heads/main/examples/example.parquet" # noqa: E501 pq.read_metadata(url, filesystem=fs) # also test with full url From 5b87c468a43799ddba9b200f685b326591e72329 Mon Sep 17 00:00:00 2001 From: machichima Date: Fri, 14 Feb 2025 21:47:08 +0800 Subject: [PATCH 26/51] fix: update lru_cache + clean class attribute --- obstore/python/obstore/fsspec.py | 61 ++++++++++++++++++-------------- 1 file changed, 34 insertions(+), 27 deletions(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index 78260b7a..fb4a44b4 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -35,20 +35,24 @@ from __future__ import annotations import asyncio +import weakref from collections import defaultdict -from collections.abc import Coroutine -from typing import TYPE_CHECKING, Any, Literal, overload +from collections.abc import Callable, Coroutine +from functools import lru_cache, wraps +from typing import TYPE_CHECKING, Any, Literal, TypeVar, overload from urllib.parse import urlparse import fsspec.asyn import fsspec.spec -from cachetools import LRUCache import obstore as obs from obstore import Bytes from obstore.store import from_url if TYPE_CHECKING: + from collections.abc import Coroutine + + from obstore import Bytes from obstore.store import ( AzureConfig, AzureConfigInput, @@ -61,10 +65,25 @@ S3ConfigInput, ) -if TYPE_CHECKING: - from collections.abc import Coroutine - from obstore import Bytes +F = TypeVar("F", bound=Callable[..., Any]) + + +def weak_lru(maxsize: int = 128, typed: bool = False) -> Callable[[F], F]: + """LRU Cache decorator that keeps a weak reference to 'self'.""" + + def wrapper(func: F) -> F: + @lru_cache(maxsize, typed) + def _func(_self, *args, **kwargs) -> F: # noqa: ANN001, ANN002, ANN003 + return func(_self(), *args, **kwargs) + + @wraps(func) + def inner(self, *args, **kwargs) -> F: # noqa: ANN001, ANN002, ANN003 + return _func(weakref.ref(self), *args, **kwargs) + + return inner # type: ignore[return-value] + + return wrapper class AsyncFsspecStore(fsspec.asyn.AsyncFileSystem): @@ -75,17 +94,6 @@ class AsyncFsspecStore(fsspec.asyn.AsyncFileSystem): """ cachable = False - config: ( - S3Config - | S3ConfigInput - | GCSConfig - | GCSConfigInput - | AzureConfig - | AzureConfigInput - | None - ) - client_options: ClientConfig | None - retry_config: RetryConfig | None def __init__( # noqa: PLR0913 self, @@ -138,7 +146,6 @@ def __init__( # noqa: PLR0913 ``` """ - self._store_cache = LRUCache(maxsize=10) self.config = config self.client_options = client_options self.retry_config = retry_config @@ -185,15 +192,14 @@ def _split_path(self, path: str) -> tuple[str, str]: file_path = "/".join(path_li[1:]) return (bucket, file_path) + @weak_lru(maxsize=10) def _construct_store(self, bucket: str) -> ObjectStore: - if bucket not in self._store_cache: - self._store_cache[bucket] = from_url( - url=f"{self.protocol}://{bucket}", - config=self.config, - client_options=self.client_options, - retry_config=self.retry_config or None, - ) - return self._store_cache[bucket] + return from_url( + url=f"{self.protocol}://{bucket}", + config=self.config, + client_options=self.client_options, + retry_config=self.retry_config or None, + ) async def _rm_file(self, path: str, **_kwargs: Any) -> None: bucket, path = self._split_path(path) @@ -355,7 +361,8 @@ async def _info(self, path: str, **_kwargs: Any) -> dict[str, Any]: "version": head["version"], } - def _fill_bucket_name(self, path: str, bucket: str) -> str: + @staticmethod + def _fill_bucket_name(path: str, bucket: str) -> str: return f"{bucket}/{path}" @overload From dc4215dda24a739ce92813b93c0ec26c4738405b Mon Sep 17 00:00:00 2001 From: machichima Date: Sat, 15 Feb 2025 17:42:43 +0800 Subject: [PATCH 27/51] fix some bugs when using get/put/cp/info/ls --- obstore/python/obstore/fsspec.py | 117 ++++++++++++++++--------------- 1 file changed, 62 insertions(+), 55 deletions(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index fb4a44b4..85c74d82 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -39,6 +39,7 @@ from collections import defaultdict from collections.abc import Callable, Coroutine from functools import lru_cache, wraps +from pathlib import Path from typing import TYPE_CHECKING, Any, Literal, TypeVar, overload from urllib.parse import urlparse @@ -180,11 +181,13 @@ def _split_path(self, path: str) -> tuple[str, str]: res = urlparse(path) if res.scheme: + # path is in url format if res.scheme != self.protocol: err_msg = f"Expect protocol to be {self.protocol}. Got {res.scheme}" raise ValueError(err_msg) - path = res.netloc + res.path + return (res.netloc, res.path) + # path not in url format if "/" not in path: return path, "" path_li = path.split("/") @@ -207,8 +210,8 @@ async def _rm_file(self, path: str, **_kwargs: Any) -> None: return await obs.delete_async(store, path) async def _cp_file(self, path1: str, path2: str, **_kwargs: Any) -> None: - bucket1, path1 = self._split_path(path1) - bucket2, path2 = self._split_path(path2) + bucket1, path1_no_bucket = self._split_path(path1) + bucket2, path2_no_bucket = self._split_path(path2) if bucket1 != bucket2: err_msg = ( @@ -218,7 +221,12 @@ async def _cp_file(self, path1: str, path2: str, **_kwargs: Any) -> None: raise ValueError(err_msg) store = self._construct_store(bucket1) - return await obs.copy_async(store, path1, path2) + + is_dir1, is_dir2 = await asyncio.gather(self._isdir(path1), self._isdir(path2)) + if is_dir1 or is_dir2: + raise NotImplementedError("Copying directories is not supported") + + return await obs.copy_async(store, path1_no_bucket, path2_no_bucket) async def _pipe_file( self, @@ -308,37 +316,32 @@ async def _put_file( mode: str = "overwrite", # noqa: ARG002 **_kwargs: Any, ) -> None: + if not Path(lpath).is_file(): + err_msg = f"File {lpath} not found in local" + raise FileNotFoundError(err_msg) + # TODO: convert to use async file system methods using LocalStore # Async functions should not open files with blocking methods like `open` - lbucket, lpath = self._split_path(lpath) rbucket, rpath = self._split_path(rpath) - if lbucket != rbucket: - err_msg = ( - f"Bucket mismatch: Source bucket '{lbucket}' and " - f"destination bucket '{rbucket}' must be the same." - ) - raise ValueError(err_msg) - - store = self._construct_store(lbucket) + # Should construct the store instance by rbucket, which is the target path + store = self._construct_store(rbucket) with open(lpath, "rb") as f: # noqa: ASYNC230 await obs.put_async(store, rpath, f) async def _get_file(self, rpath: str, lpath: str, **_kwargs: Any) -> None: + res = urlparse(lpath) + if res.scheme or Path(lpath).is_dir(): + # lpath need to be local file and cannot contain scheme + return + # TODO: convert to use async file system methods using LocalStore # Async functions should not open files with blocking methods like `open` - lbucket, lpath = self._split_path(lpath) rbucket, rpath = self._split_path(rpath) - if lbucket != rbucket: - err_msg = ( - f"Bucket mismatch: Source bucket '{lbucket}' and " - f"destination bucket '{rbucket}' must be the same." - ) - raise ValueError(err_msg) - - store = self._construct_store(lbucket) + # Should construct the store instance by rbucket, which is the target path + store = self._construct_store(rbucket) with open(lpath, "wb") as f: # noqa: ASYNC230 resp = await obs.get_async(store, rpath) @@ -346,20 +349,25 @@ async def _get_file(self, rpath: str, lpath: str, **_kwargs: Any) -> None: f.write(buffer) async def _info(self, path: str, **_kwargs: Any) -> dict[str, Any]: - bucket, path = self._split_path(path) + bucket, path_no_bucket = self._split_path(path) store = self._construct_store(bucket) - head = await obs.head_async(store, path) - return { - # Required of `info`: (?) - "name": head["path"], - "size": head["size"], - "type": "directory" if head["path"].endswith("/") else "file", - # Implementation-specific keys - "e_tag": head["e_tag"], - "last_modified": head["last_modified"], - "version": head["version"], - } + try: + head = await obs.head_async(store, path_no_bucket) + return { + # Required of `info`: (?) + "name": head["path"], + "size": head["size"], + "type": "directory" if head["path"].endswith("/") else "file", + # Implementation-specific keys + "e_tag": head["e_tag"], + "last_modified": head["last_modified"], + "version": head["version"], + } + except FileNotFoundError: + # use info in fsspec.AbstractFileSystem + loop = asyncio.get_running_loop() + return await loop.run_in_executor(None, super().info, path, **_kwargs) @staticmethod def _fill_bucket_name(path: str, bucket: str) -> str: @@ -391,27 +399,26 @@ async def _ls( result = await obs.list_with_delimiter_async(store, path) objects = result["objects"] prefs = result["common_prefixes"] - if detail: - return [ - { - "name": self._fill_bucket_name(obj["path"], bucket), - "size": obj["size"], - "type": "file", - "e_tag": obj["e_tag"], - } - for obj in objects - ] + [ - { - "name": self._fill_bucket_name(pref, bucket), - "size": 0, - "type": "directory", - } - for pref in prefs - ] - return sorted( - [self._fill_bucket_name(obj["path"], bucket) for obj in objects] - + [self._fill_bucket_name(pref, bucket) for pref in prefs], - ) + files = [ + { + "name": self._fill_bucket_name(obj["path"], bucket), + "size": obj["size"], + "type": "file", + "e_tag": obj["e_tag"], + } + for obj in objects + ] + [ + { + "name": self._fill_bucket_name(pref, bucket), + "size": 0, + "type": "directory", + } + for pref in prefs + ] + if not files: + raise FileNotFoundError(path) + + return files if detail else sorted(o["name"] for o in files) def _open( self, From 4896ba3994dd65ee0811575056744dd614e679fb Mon Sep 17 00:00:00 2001 From: machichima Date: Sun, 16 Feb 2025 13:35:23 +0800 Subject: [PATCH 28/51] fix: declare lru_cache in __init__ --- obstore/python/obstore/fsspec.py | 26 ++++++-------------------- 1 file changed, 6 insertions(+), 20 deletions(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index 85c74d82..0718bd3a 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -35,10 +35,9 @@ from __future__ import annotations import asyncio -import weakref from collections import defaultdict from collections.abc import Callable, Coroutine -from functools import lru_cache, wraps +from functools import lru_cache from pathlib import Path from typing import TYPE_CHECKING, Any, Literal, TypeVar, overload from urllib.parse import urlparse @@ -70,23 +69,6 @@ F = TypeVar("F", bound=Callable[..., Any]) -def weak_lru(maxsize: int = 128, typed: bool = False) -> Callable[[F], F]: - """LRU Cache decorator that keeps a weak reference to 'self'.""" - - def wrapper(func: F) -> F: - @lru_cache(maxsize, typed) - def _func(_self, *args, **kwargs) -> F: # noqa: ANN001, ANN002, ANN003 - return func(_self(), *args, **kwargs) - - @wraps(func) - def inner(self, *args, **kwargs) -> F: # noqa: ANN001, ANN002, ANN003 - return _func(weakref.ref(self), *args, **kwargs) - - return inner # type: ignore[return-value] - - return wrapper - - class AsyncFsspecStore(fsspec.asyn.AsyncFileSystem): """An fsspec implementation based on a obstore Store. @@ -111,6 +93,7 @@ def __init__( # noqa: PLR0913 client_options: ClientConfig | None = None, retry_config: RetryConfig | None = None, asynchronous: bool = False, + max_cache_size: int = 10, loop: Any = None, batch_size: int | None = None, ) -> None: @@ -129,6 +112,8 @@ def __init__( # noqa: PLR0913 asynchronous: Set to `True` if this instance is meant to be be called using the fsspec async API. This should only be set to true when running within a coroutine. + max_cache_size (int, optional): The maximum number of items the cache should + store. Defaults to 10. loop: since both fsspec/python and tokio/rust may be using loops, this should be kept `None` for now, and will not be used. batch_size: some operations on many files will batch their requests; if you @@ -151,6 +136,8 @@ def __init__( # noqa: PLR0913 self.client_options = client_options self.retry_config = retry_config + self._construct_store = lru_cache(maxsize=max_cache_size)(self._construct_store) + super().__init__( *args, asynchronous=asynchronous, @@ -195,7 +182,6 @@ def _split_path(self, path: str) -> tuple[str, str]: file_path = "/".join(path_li[1:]) return (bucket, file_path) - @weak_lru(maxsize=10) def _construct_store(self, bucket: str) -> ObjectStore: return from_url( url=f"{self.protocol}://{bucket}", From c9378b8ef72c36360d68af1ac3473b3286783eb6 Mon Sep 17 00:00:00 2001 From: machichima Date: Sun, 16 Feb 2025 13:35:39 +0800 Subject: [PATCH 29/51] fix: make AsyncFsspecStore cachable --- obstore/python/obstore/fsspec.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index 0718bd3a..3f91c919 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -76,7 +76,7 @@ class AsyncFsspecStore(fsspec.asyn.AsyncFileSystem): fsspec-style object. """ - cachable = False + cachable = True def __init__( # noqa: PLR0913 self, From 549a4aca6144604a50fc7e141096c4e370ed8cf2 Mon Sep 17 00:00:00 2001 From: machichima Date: Sun, 16 Feb 2025 13:36:25 +0800 Subject: [PATCH 30/51] test: for cache constructed store and filesystem obj --- tests/test_fsspec.py | 85 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 85 insertions(+) diff --git a/tests/test_fsspec.py b/tests/test_fsspec.py index d46d332f..e5ee3185 100644 --- a/tests/test_fsspec.py +++ b/tests/test_fsspec.py @@ -1,7 +1,9 @@ from __future__ import annotations +import gc import os from typing import TYPE_CHECKING +from unittest.mock import patch import fsspec import pyarrow.parquet as pq @@ -81,6 +83,89 @@ def fs(s3_store_config: S3Config): return fsspec.filesystem("s3", config=s3_store_config) +def test_construct_store_cache_diff_bucket_name(s3_store_config: S3Config): + register("s3") + fs: AsyncFsspecStore = fsspec.filesystem( + "s3", + config=s3_store_config, + asynchronous=True, + max_cache_size=5, + ) + + bucket_names = [f"bucket{i}" for i in range(20)] # 20 unique buckets + + with patch.object( + fs, + "_construct_store", + wraps=fs._construct_store, + ) as mock_construct: + for bucket in bucket_names: + fs._construct_store(bucket) + + # Since the cache is set to 16, only the first 16 unique calls should be cached + assert mock_construct.cache_info().currsize == 5, ( + "Cache should only store 5 cache" + ) + assert mock_construct.cache_info().hits == 0, "Cache should hits 0 times" + assert mock_construct.cache_info().misses == 20, "Cache should miss 20 times" + + # test garbage collector + fs = None + assert gc.collect() > 0 + + +def test_construct_store_cache_same_bucket_name(s3_store_config: S3Config): + register("s3") + fs = fsspec.filesystem( + "s3", + config=s3_store_config, + asynchronous=True, + max_cache_size=5, + ) + + bucket_names = ["bucket" for _ in range(20)] + + with patch.object( + fs, + "_construct_store", + wraps=fs._construct_store, + ) as mock_construct: + for bucket in bucket_names: + fs._construct_store(bucket) + + assert mock_construct.cache_info().currsize == 1, ( + "Cache should only store 1 cache" + ) + assert mock_construct.cache_info().hits == 20 - 1, ( + "Cache should hits 20-1 times" + ) + assert mock_construct.cache_info().misses == 1, "Cache should only miss once" + + # test garbage collector + fs = None + assert gc.collect() > 0 + + +def test_fsspec_filesystem_cache(s3_store_config: S3Config): + """Test caching behavior of fsspec.filesystem with the _Cached metaclass.""" + register("s3") + + # call fsspec.filesystem() multiple times with the same parameters + fs1 = fsspec.filesystem("s3", config=s3_store_config) + fs2 = fsspec.filesystem("s3", config=s3_store_config) + + # Same parameters should return the same instance + assert fs1 is fs2, ( + "fsspec.filesystem() with the same parameters should return the cached instance" + ) + + # Changing parameters should create a new instance + fs3 = fsspec.filesystem("s3", config=s3_store_config, asynchronous=True) + assert fs1 is not fs3, ( + "fsspec.filesystem() with different parameters should return a new instance" + ) + + def test_list(fs: AsyncFsspecStore): out = fs.ls(f"{TEST_BUCKET_NAME}", detail=False) assert out == [f"{TEST_BUCKET_NAME}/afile"] From a93fe2efbc00e77e517dc0e350f4cfea124a1958 Mon Sep 17 00:00:00 2001 From: machichima Date: Sun, 16 Feb 2025 13:36:51 +0800 Subject: [PATCH 31/51] build: remove dependencies --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index d06f5b52..2f12586e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ version = "0.1.0" description = "Add your description here" readme = "README.md" requires-python = ">=3.11" -dependencies = ["cachetools>=5.5.1"] +dependencies = [] [tool.uv] dev-dependencies = [ From a54a3fe25319493652e74846bfa9edbec6c4c1c6 Mon Sep 17 00:00:00 2001 From: machichima Date: Wed, 19 Feb 2025 21:50:44 +0800 Subject: [PATCH 32/51] fix: prevent send folder path to cat_file --- obstore/python/obstore/fsspec.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index 3f91c919..df080128 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -247,6 +247,32 @@ async def _cat_file( range_bytes = await obs.get_range_async(store, path, start=start, end=end) return range_bytes.to_bytes() + async def _cat( + self, + path: str, + recursive: bool = False, + on_error: str = "raise", + batch_size: int | None = None, + **_kwargs: Any, + ) -> bytes | dict[str, bytes]: + paths = await self._expand_path(path, recursive=recursive) + + # Filter out directories + files = [p for p in paths if not await self._isdir(p)] + + if not files: + err_msg = f"No valid files found in {path}" + raise FileNotFoundError(err_msg) + + # Call the original _cat only on files + return await super()._cat( + files, + recursive=False, + on_error=on_error, + batch_size=batch_size, + **_kwargs, + ) + async def _cat_ranges( # noqa: PLR0913 self, paths: list[str], From c804a18865a5f26c476acef99c9700d416df709c Mon Sep 17 00:00:00 2001 From: machichima Date: Wed, 19 Feb 2025 21:51:07 +0800 Subject: [PATCH 33/51] fix: enable cp folders --- obstore/python/obstore/fsspec.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index df080128..31ed9f2c 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -66,9 +66,6 @@ ) -F = TypeVar("F", bound=Callable[..., Any]) - - class AsyncFsspecStore(fsspec.asyn.AsyncFileSystem): """An fsspec implementation based on a obstore Store. @@ -208,10 +205,6 @@ async def _cp_file(self, path1: str, path2: str, **_kwargs: Any) -> None: store = self._construct_store(bucket1) - is_dir1, is_dir2 = await asyncio.gather(self._isdir(path1), self._isdir(path2)) - if is_dir1 or is_dir2: - raise NotImplementedError("Copying directories is not supported") - return await obs.copy_async(store, path1_no_bucket, path2_no_bucket) async def _pipe_file( From 6c2c513ce1e8f35345e3a72c4a3f220af1ea5eeb Mon Sep 17 00:00:00 2001 From: machichima Date: Wed, 19 Feb 2025 21:53:34 +0800 Subject: [PATCH 34/51] lint --- obstore/python/obstore/fsspec.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index 31ed9f2c..074fdb02 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -36,10 +36,10 @@ import asyncio from collections import defaultdict -from collections.abc import Callable, Coroutine +from collections.abc import Coroutine from functools import lru_cache from pathlib import Path -from typing import TYPE_CHECKING, Any, Literal, TypeVar, overload +from typing import TYPE_CHECKING, Any, Literal, overload from urllib.parse import urlparse import fsspec.asyn From c6392f233b1ba13606e057825c48774cf32d6552 Mon Sep 17 00:00:00 2001 From: machichima Date: Sun, 23 Feb 2025 14:14:52 +0800 Subject: [PATCH 35/51] fix: clobber=False to prevent re-register and cause memory leak If register multiple time, and each of them have their instance, the cache does not work and will end up with multiple instances with same config --- obstore/python/obstore/fsspec.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index 074fdb02..2888af3c 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -529,5 +529,5 @@ def register(protocol: str | list[str], asynchronous: bool = False) -> None: "asynchronous": asynchronous, }, # Assign protocol dynamically ), - clobber=True, + clobber=False, ) From 347e63efb1cd6cbf5cb01c507aa1cc607c1e83b0 Mon Sep 17 00:00:00 2001 From: machichima Date: Sun, 23 Feb 2025 14:15:07 +0800 Subject: [PATCH 36/51] test: clean up after each test to prevent memory leak --- tests/test_fsspec.py | 28 ++++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/tests/test_fsspec.py b/tests/test_fsspec.py index e5ee3185..e7a19b66 100644 --- a/tests/test_fsspec.py +++ b/tests/test_fsspec.py @@ -8,6 +8,7 @@ import fsspec import pyarrow.parquet as pq import pytest +from fsspec.registry import _registry from obstore.fsspec import AsyncFsspecStore, register from tests.conftest import TEST_BUCKET_NAME @@ -16,6 +17,23 @@ from obstore.store import S3Config +@pytest.fixture +def fs(s3_store_config: S3Config): + register("s3") + return fsspec.filesystem("s3", config=s3_store_config) + + +@pytest.fixture(autouse=True) +def cleanup_after_test(): + """Cleanup function to run after each test.""" + yield # Runs the test first + + # clear the registered implementations after each test + _registry.clear() + + gc.collect() + + def test_register(): """Test if register() creates and registers a subclass for a given protocol.""" register("s3") # Register the "s3" protocol dynamically @@ -56,13 +74,13 @@ def test_register_invalid_types(): register(123) # Not a string or list with pytest.raises(TypeError, match="All protocols in the list must be strings"): - register(["s3", 42]) # List contains a non-string + register(["test", 42]) # List contains a non-string with pytest.raises( ValueError, match="Protocol names in the list must be non-empty strings", ): - register(["s3", ""]) # List contains a non-string + register(["test1", ""]) # List contains a non-string with pytest.raises( TypeError, @@ -77,12 +95,6 @@ def test_register_invalid_types(): register([]) # Empty list is invalid -@pytest.fixture -def fs(s3_store_config: S3Config): - register("s3") - return fsspec.filesystem("s3", config=s3_store_config) - - def test_construct_store_cache_diff_bucket_name(s3_store_config: S3Config): register("s3") fs: AsyncFsspecStore = fsspec.filesystem( From 9e423f5a58031b5713129f754397fc8b61057c76 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Mon, 24 Feb 2025 11:13:50 -0500 Subject: [PATCH 37/51] Simplify protocol registration --- obstore/python/obstore/fsspec.py | 33 ++++++++------------------------ 1 file changed, 8 insertions(+), 25 deletions(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index 2888af3c..46e3617d 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -36,7 +36,6 @@ import asyncio from collections import defaultdict -from collections.abc import Coroutine from functools import lru_cache from pathlib import Path from typing import TYPE_CHECKING, Any, Literal, overload @@ -50,7 +49,7 @@ from obstore.store import from_url if TYPE_CHECKING: - from collections.abc import Coroutine + from collections.abc import Coroutine, Iterable from obstore import Bytes from obstore.store import ( @@ -470,7 +469,7 @@ def read(self, length: int = -1) -> Any: return data -def register(protocol: str | list[str], asynchronous: bool = False) -> None: +def register(protocol: str | Iterable[str], *, asynchronous: bool = False) -> None: """Dynamically register a subclass of AsyncFsspecStore for the given protocol(s). This function creates a new subclass of AsyncFsspecStore with the specified @@ -494,31 +493,15 @@ def register(protocol: str | list[str], asynchronous: bool = False) -> None: - This avoids modifying the original AsyncFsspecStore class. """ - # Ensure protocol is of type str or list - if not isinstance(protocol, str | list): - err_msg = ( - f"Protocol must be a string or a list of strings, got {type(protocol)}" - ) - raise TypeError(err_msg) - - # Ensure protocol is not None or empty - if not protocol: - raise ValueError( - "Protocol must be a non-empty string or a list of non-empty strings.", - ) + if isinstance(protocol, str): + _register(protocol, asynchronous=asynchronous) + return - if isinstance(protocol, list): - # Ensure all elements are strings - if not all(isinstance(p, str) for p in protocol): - raise TypeError("All protocols in the list must be strings.") - # Ensure no empty strings in the list - if not all(p for p in protocol): - raise ValueError("Protocol names in the list must be non-empty strings.") + for p in protocol: + _register(p, asynchronous=asynchronous) - for p in protocol: - register(p) - return +def _register(protocol: str, *, asynchronous: bool) -> None: fsspec.register_implementation( protocol, type( From 096845c78e2f0198564cd8bcc30cd726e0fa5305 Mon Sep 17 00:00:00 2001 From: machichima Date: Tue, 25 Feb 2025 22:03:12 +0800 Subject: [PATCH 38/51] fix+test: register check types --- obstore/python/obstore/fsspec.py | 12 ++++++++++++ tests/test_fsspec.py | 14 +++++++------- 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index 46e3617d..a93d51fa 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -493,6 +493,10 @@ def register(protocol: str | Iterable[str], *, asynchronous: bool = False) -> No - This avoids modifying the original AsyncFsspecStore class. """ + if not protocol: + raise ValueError( + "Protocol must be a non-empty string or list", + ) if isinstance(protocol, str): _register(protocol, asynchronous=asynchronous) return @@ -502,6 +506,14 @@ def register(protocol: str | Iterable[str], *, asynchronous: bool = False) -> No def _register(protocol: str, *, asynchronous: bool) -> None: + if not protocol: + raise ValueError( + "Protocol must be a non-empty string", + ) + if not isinstance(protocol, str): + err_msg = f"Protocol must be a string, got {type(protocol).__name__}" + raise TypeError(err_msg) + fsspec.register_implementation( protocol, type( diff --git a/tests/test_fsspec.py b/tests/test_fsspec.py index e7a19b66..5b7ecc25 100644 --- a/tests/test_fsspec.py +++ b/tests/test_fsspec.py @@ -69,28 +69,28 @@ def test_register_invalid_types(): """Test that register rejects invalid input types.""" with pytest.raises( TypeError, - match="Protocol must be a string or a list of strings", + match="'int' object is not iterable", ): - register(123) # Not a string or list + register(123) - with pytest.raises(TypeError, match="All protocols in the list must be strings"): + with pytest.raises(TypeError, match="Protocol must be a string, got int"): register(["test", 42]) # List contains a non-string with pytest.raises( ValueError, - match="Protocol names in the list must be non-empty strings", + match="Protocol must be a non-empty string", ): register(["test1", ""]) # List contains a non-string with pytest.raises( - TypeError, - match="Protocol must be a string or a list of strings", + ValueError, + match="Protocol must be a non-empty string or list", ): register(None) # None is invalid with pytest.raises( ValueError, - match="Protocol must be a non-empty string or a list of non-empty strings", + match="Protocol must be a non-empty string or list", ): register([]) # Empty list is invalid From 5ea2ba81e5df25977f9de3ed0b295fd67ed2aefc Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Tue, 25 Feb 2025 23:33:19 -0500 Subject: [PATCH 39/51] small edits --- obstore/python/obstore/fsspec.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index a93d51fa..950b3f26 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -72,6 +72,7 @@ class AsyncFsspecStore(fsspec.asyn.AsyncFileSystem): fsspec-style object. """ + # https://github.com/fsspec/filesystem_spec/blob/56054c0a30ceedab4c0e6a0f7e429666773baf6d/docs/source/features.rst#instance-caching cachable = True def __init__( # noqa: PLR0913 @@ -98,7 +99,9 @@ def __init__( # noqa: PLR0913 Args: config: Configuration for the cloud storage provider, which can be one of S3Config, S3ConfigInput, GCSConfig, GCSConfigInput, AzureConfig, - or AzureConfigInput. If None, no cloud storage configuration is applied. + or AzureConfigInput. Any of these values will be applied after checking + for environment variables. If `None`, no cloud storage configuration is + applied beyond what is found in environment variables. client_options: Additional options for configuring the client. retry_config: Configuration for handling request errors. args: positional arguments passed on to the `fsspec.asyn.AsyncFileSystem` @@ -108,8 +111,9 @@ def __init__( # noqa: PLR0913 asynchronous: Set to `True` if this instance is meant to be be called using the fsspec async API. This should only be set to true when running within a coroutine. - max_cache_size (int, optional): The maximum number of items the cache should - store. Defaults to 10. + max_cache_size (int, optional): The maximum number of stores the cache + should keep. A cached store is kept internally for each bucket name. + Defaults to 10. loop: since both fsspec/python and tokio/rust may be using loops, this should be kept `None` for now, and will not be used. batch_size: some operations on many files will batch their requests; if you @@ -132,6 +136,7 @@ def __init__( # noqa: PLR0913 self.client_options = client_options self.retry_config = retry_config + # https://stackoverflow.com/a/68550238 self._construct_store = lru_cache(maxsize=max_cache_size)(self._construct_store) super().__init__( @@ -145,11 +150,10 @@ def _split_path(self, path: str) -> tuple[str, str]: """Split bucket and file path. Args: - path (str): Input path, like `s3://mybucket/path/to/file` + path: Input path, like `s3://mybucket/path/to/file` Returns: - tuple[str, str]: with the first element as bucket name and second be - the file path inside the bucket + (bucket name, file path inside the bucket) Examples: >>> split_path("s3://mybucket/path/to/file") @@ -484,7 +488,7 @@ def register(protocol: str | Iterable[str], *, asynchronous: bool = False) -> No Example: >>> register("s3") - >>> register("s3", asynchronous=True) # Registers an async-store for "s3" + >>> register("s3", asynchronous=True) # Registers an async store for "s3" >>> register(["gcs", "abfs"]) # Registers both "gcs" and "abfs" Notes: From 12560f4a8cbf93a160d323c192763c65e0a82ef6 Mon Sep 17 00:00:00 2001 From: machichima Date: Wed, 26 Feb 2025 22:28:39 +0800 Subject: [PATCH 40/51] fix+test: update conftest --- tests/conftest.py | 9 ++++++--- tests/test_fsspec.py | 12 +++++++----- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index bc536307..633c3254 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,5 +1,6 @@ import boto3 import pytest +from typing import TYPE_CHECKING import urllib3 from botocore import UNSIGNED from botocore.client import Config @@ -7,6 +8,9 @@ from obstore.store import S3Store +if TYPE_CHECKING: + from obstore.store import S3ConfigInput, ClientConfig + TEST_BUCKET_NAME = "test" @@ -52,10 +56,9 @@ def s3_store(s3: str): @pytest.fixture -def s3_store_config(s3: str): +def s3_store_config(s3: str) -> "S3ConfigInput": return { "AWS_ENDPOINT_URL": s3, "AWS_REGION": "us-east-1", - "AWS_SKIP_SIGNATURE": "True", - "AWS_ALLOW_HTTP": "true", + "AWS_SKIP_SIGNATURE": True, } diff --git a/tests/test_fsspec.py b/tests/test_fsspec.py index 5b7ecc25..92c6b6cc 100644 --- a/tests/test_fsspec.py +++ b/tests/test_fsspec.py @@ -20,7 +20,7 @@ @pytest.fixture def fs(s3_store_config: S3Config): register("s3") - return fsspec.filesystem("s3", config=s3_store_config) + return fsspec.filesystem("s3", config=s3_store_config, client_options={"allow_http": True}) @pytest.fixture(autouse=True) @@ -100,6 +100,7 @@ def test_construct_store_cache_diff_bucket_name(s3_store_config: S3Config): fs: AsyncFsspecStore = fsspec.filesystem( "s3", config=s3_store_config, + client_options={"allow_http": True}, asynchronous=True, max_cache_size=5, ) @@ -131,6 +132,7 @@ def test_construct_store_cache_same_bucket_name(s3_store_config: S3Config): fs = fsspec.filesystem( "s3", config=s3_store_config, + client_options={"allow_http": True}, asynchronous=True, max_cache_size=5, ) @@ -163,8 +165,8 @@ def test_fsspec_filesystem_cache(s3_store_config: S3Config): register("s3") # call fsspec.filesystem() multiple times with the same parameters - fs1 = fsspec.filesystem("s3", config=s3_store_config) - fs2 = fsspec.filesystem("s3", config=s3_store_config) + fs1 = fsspec.filesystem("s3", config=s3_store_config, client_options={"allow_http": True}) + fs2 = fsspec.filesystem("s3", config=s3_store_config, client_options={"allow_http": True}) # Same parameters should return the same instance assert fs1 is fs2, ( @@ -172,7 +174,7 @@ def test_fsspec_filesystem_cache(s3_store_config: S3Config): ) # Changing parameters should create a new instance - fs3 = fsspec.filesystem("s3", config=s3_store_config, asynchronous=True) + fs3 = fsspec.filesystem("s3", config=s3_store_config, client_options={"allow_http": True}, asynchronous=True) assert fs1 is not fs3, ( "fsspec.filesystem() with different parameters should return a new instance" ) @@ -192,7 +194,7 @@ def test_list(fs: AsyncFsspecStore): @pytest.mark.asyncio async def test_list_async(s3_store_config: S3Config): register("s3") - fs = fsspec.filesystem("s3", config=s3_store_config, asynchronous=True) + fs = fsspec.filesystem("s3", config=s3_store_config, client_options={"allow_http": True}, asynchronous=True) out = await fs._ls(f"{TEST_BUCKET_NAME}", detail=False) assert out == [f"{TEST_BUCKET_NAME}/afile"] From 6e200e96bfc8869ec4c90df64f83078e205e9d3f Mon Sep 17 00:00:00 2001 From: machichima Date: Wed, 26 Feb 2025 22:44:25 +0800 Subject: [PATCH 41/51] style: format --- tests/conftest.py | 5 +++-- tests/test_fsspec.py | 32 +++++++++++++++++++++++++++----- 2 files changed, 30 insertions(+), 7 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 633c3254..c8929196 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,6 +1,7 @@ +from typing import TYPE_CHECKING + import boto3 import pytest -from typing import TYPE_CHECKING import urllib3 from botocore import UNSIGNED from botocore.client import Config @@ -9,7 +10,7 @@ from obstore.store import S3Store if TYPE_CHECKING: - from obstore.store import S3ConfigInput, ClientConfig + from obstore.store import S3ConfigInput TEST_BUCKET_NAME = "test" diff --git a/tests/test_fsspec.py b/tests/test_fsspec.py index 92c6b6cc..26726b55 100644 --- a/tests/test_fsspec.py +++ b/tests/test_fsspec.py @@ -20,7 +20,11 @@ @pytest.fixture def fs(s3_store_config: S3Config): register("s3") - return fsspec.filesystem("s3", config=s3_store_config, client_options={"allow_http": True}) + return fsspec.filesystem( + "s3", + config=s3_store_config, + client_options={"allow_http": True}, + ) @pytest.fixture(autouse=True) @@ -165,8 +169,16 @@ def test_fsspec_filesystem_cache(s3_store_config: S3Config): register("s3") # call fsspec.filesystem() multiple times with the same parameters - fs1 = fsspec.filesystem("s3", config=s3_store_config, client_options={"allow_http": True}) - fs2 = fsspec.filesystem("s3", config=s3_store_config, client_options={"allow_http": True}) + fs1 = fsspec.filesystem( + "s3", + config=s3_store_config, + client_options={"allow_http": True}, + ) + fs2 = fsspec.filesystem( + "s3", + config=s3_store_config, + client_options={"allow_http": True}, + ) # Same parameters should return the same instance assert fs1 is fs2, ( @@ -174,7 +186,12 @@ def test_fsspec_filesystem_cache(s3_store_config: S3Config): ) # Changing parameters should create a new instance - fs3 = fsspec.filesystem("s3", config=s3_store_config, client_options={"allow_http": True}, asynchronous=True) + fs3 = fsspec.filesystem( + "s3", + config=s3_store_config, + client_options={"allow_http": True}, + asynchronous=True, + ) assert fs1 is not fs3, ( "fsspec.filesystem() with different parameters should return a new instance" ) @@ -194,7 +211,12 @@ def test_list(fs: AsyncFsspecStore): @pytest.mark.asyncio async def test_list_async(s3_store_config: S3Config): register("s3") - fs = fsspec.filesystem("s3", config=s3_store_config, client_options={"allow_http": True}, asynchronous=True) + fs = fsspec.filesystem( + "s3", + config=s3_store_config, + client_options={"allow_http": True}, + asynchronous=True, + ) out = await fs._ls(f"{TEST_BUCKET_NAME}", detail=False) assert out == [f"{TEST_BUCKET_NAME}/afile"] From 4a653b3060de6c348cc3cb0ab3c26cefe9b228a2 Mon Sep 17 00:00:00 2001 From: machichima Date: Wed, 26 Feb 2025 22:45:47 +0800 Subject: [PATCH 42/51] feat: enable setting protocol in __init__ --- obstore/python/obstore/fsspec.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index 950b3f26..4d4b06e7 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -78,6 +78,7 @@ class AsyncFsspecStore(fsspec.asyn.AsyncFileSystem): def __init__( # noqa: PLR0913 self, *args: Any, + protocol: str | None = None, config: ( S3Config | S3ConfigInput @@ -97,6 +98,9 @@ def __init__( # noqa: PLR0913 """Construct a new AsyncFsspecStore. Args: + protocol: The storage protocol to use, such as "s3", + "gcs", or "abfs". If `None`, the default class-level protocol + is used. Default to None. config: Configuration for the cloud storage provider, which can be one of S3Config, S3ConfigInput, GCSConfig, GCSConfigInput, AzureConfig, or AzureConfigInput. Any of these values will be applied after checking @@ -132,6 +136,10 @@ def __init__( # noqa: PLR0913 ``` """ + if protocol is None: + self._protocol = self.protocol + else: + self._protocol = protocol self.config = config self.client_options = client_options self.retry_config = retry_config @@ -162,15 +170,15 @@ def _split_path(self, path: str) -> tuple[str, str]: """ protocol_with_bucket = ["s3", "s3a", "gcs", "gs", "abfs", "https", "http"] - if self.protocol not in protocol_with_bucket: + if self._protocol not in protocol_with_bucket: # no bucket name in path return "", path res = urlparse(path) if res.scheme: # path is in url format - if res.scheme != self.protocol: - err_msg = f"Expect protocol to be {self.protocol}. Got {res.scheme}" + if res.scheme != self._protocol: + err_msg = f"Expect protocol to be {self._protocol}. Got {res.scheme}" raise ValueError(err_msg) return (res.netloc, res.path) @@ -184,7 +192,7 @@ def _split_path(self, path: str) -> tuple[str, str]: def _construct_store(self, bucket: str) -> ObjectStore: return from_url( - url=f"{self.protocol}://{bucket}", + url=f"{self._protocol}://{bucket}", config=self.config, client_options=self.client_options, retry_config=self.retry_config or None, From f5147bcc75646d180f57eb1112f76290c20328e9 Mon Sep 17 00:00:00 2001 From: machichima Date: Wed, 26 Feb 2025 22:54:01 +0800 Subject: [PATCH 43/51] docs: update example in docstring --- obstore/python/obstore/fsspec.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index 4d4b06e7..8f35361e 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -129,9 +129,10 @@ def __init__( # noqa: PLR0913 from obstore.fsspec import AsyncFsspecStore from obstore.store import HTTPStore - store = HTTPStore.from_url("https://example.com") - fsspec_store = AsyncFsspecStore(store) - resp = fsspec_store.cat("/") + store = AsyncFsspecStore( + protocol="https", + ) + resp = store.cat("https://example.com") assert resp.startswith(b"") ``` From 45c402036ed5f3a51df77cd4b7bad1e3dd138d84 Mon Sep 17 00:00:00 2001 From: machichima Date: Wed, 26 Feb 2025 23:09:47 +0800 Subject: [PATCH 44/51] fix: better split path way --- obstore/python/obstore/fsspec.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index 8f35361e..bc7a2610 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -184,12 +184,10 @@ def _split_path(self, path: str) -> tuple[str, str]: return (res.netloc, res.path) # path not in url format - if "/" not in path: + path_li = path.split("/", 1) + if len(path_li) == 1: return path, "" - path_li = path.split("/") - bucket = path_li[0] - file_path = "/".join(path_li[1:]) - return (bucket, file_path) + return (path_li[0], path_li[1]) def _construct_store(self, bucket: str) -> ObjectStore: return from_url( From 4e1f4e7d0222f2c2c6ca076b1ede47b52e73a5f2 Mon Sep 17 00:00:00 2001 From: machichima Date: Thu, 27 Feb 2025 22:39:11 +0800 Subject: [PATCH 45/51] fix: split path for protocol with no bucket properly --- obstore/python/obstore/fsspec.py | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index bc7a2610..a15f56f2 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -171,17 +171,21 @@ def _split_path(self, path: str) -> tuple[str, str]: """ protocol_with_bucket = ["s3", "s3a", "gcs", "gs", "abfs", "https", "http"] + # Parse the path as a URL + parsed = urlparse(path) + + # If the protocol doesn't support buckets, return empty bucket and full path if self._protocol not in protocol_with_bucket: - # no bucket name in path - return "", path - - res = urlparse(path) - if res.scheme: - # path is in url format - if res.scheme != self._protocol: - err_msg = f"Expect protocol to be {self._protocol}. Got {res.scheme}" + return ( + "", + f"{parsed.netloc}/{parsed.path.lstrip('/')}" if parsed.scheme else path, + ) + + if parsed.scheme: + if parsed.scheme != self._protocol: + err_msg = f"Expect protocol to be {self._protocol}. Got {parsed.scheme}" raise ValueError(err_msg) - return (res.netloc, res.path) + return (parsed.netloc, parsed.path.lstrip("/")) # path not in url format path_li = path.split("/", 1) From a212996420d7ff7f1aca92bb1286aa0af72dd259 Mon Sep 17 00:00:00 2001 From: machichima Date: Thu, 27 Feb 2025 22:39:24 +0800 Subject: [PATCH 46/51] test: for split path --- tests/test_fsspec.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/tests/test_fsspec.py b/tests/test_fsspec.py index 26726b55..6250783b 100644 --- a/tests/test_fsspec.py +++ b/tests/test_fsspec.py @@ -197,6 +197,32 @@ def test_fsspec_filesystem_cache(s3_store_config: S3Config): ) +def test_split_path(fs: AsyncFsspecStore): + # in url format, with bucket + assert fs._split_path("s3://mybucket/path/to/file") == ("mybucket", "path/to/file") + assert fs._split_path("s3://data-bucket/") == ("data-bucket", "") + + # path format, with bucket + assert fs._split_path("mybucket/path/to/file") == ("mybucket", "path/to/file") + assert fs._split_path("data-bucket/") == ("data-bucket", "") + + # url format, wrong porotocol + with pytest.raises(ValueError, match="Expect protocol to be s3. Got gs"): + fs._split_path("gs://data-bucket/") + + # in url format, without bucket + fs._protocol = "file" + assert fs._split_path("file:///mybucket/path/to/file") == ( + "", + "/mybucket/path/to/file", + ) + assert fs._split_path("file:///data-bucket/") == ("", "/data-bucket/") + + # path format, without bucket + assert fs._split_path("/mybucket/path/to/file") == ("", "/mybucket/path/to/file") + assert fs._split_path("/data-bucket/") == ("", "/data-bucket/") + + def test_list(fs: AsyncFsspecStore): out = fs.ls(f"{TEST_BUCKET_NAME}", detail=False) assert out == [f"{TEST_BUCKET_NAME}/afile"] From 278cc38a5c9ce4b5167974664332ed0bd37a03ab Mon Sep 17 00:00:00 2001 From: machichima Date: Thu, 27 Feb 2025 22:54:00 +0800 Subject: [PATCH 47/51] refactor: take out _fill_bucket_name function --- obstore/python/obstore/fsspec.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index a15f56f2..4f9bfd46 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -388,10 +388,6 @@ async def _info(self, path: str, **_kwargs: Any) -> dict[str, Any]: loop = asyncio.get_running_loop() return await loop.run_in_executor(None, super().info, path, **_kwargs) - @staticmethod - def _fill_bucket_name(path: str, bucket: str) -> str: - return f"{bucket}/{path}" - @overload async def _ls( self, @@ -420,7 +416,7 @@ async def _ls( prefs = result["common_prefixes"] files = [ { - "name": self._fill_bucket_name(obj["path"], bucket), + "name": f"{bucket}/{obj['path']}", "size": obj["size"], "type": "file", "e_tag": obj["e_tag"], @@ -428,7 +424,7 @@ async def _ls( for obj in objects ] + [ { - "name": self._fill_bucket_name(pref, bucket), + "name": f"{bucket}/{pref}", "size": 0, "type": "directory", } From 92a70f5966b0ee155508504cf112a2cbd7044c9f Mon Sep 17 00:00:00 2001 From: machichima Date: Thu, 27 Feb 2025 22:56:31 +0800 Subject: [PATCH 48/51] refactor: take out runtime type check for register --- obstore/python/obstore/fsspec.py | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index 4f9bfd46..cc0b526d 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -504,10 +504,6 @@ def register(protocol: str | Iterable[str], *, asynchronous: bool = False) -> No - This avoids modifying the original AsyncFsspecStore class. """ - if not protocol: - raise ValueError( - "Protocol must be a non-empty string or list", - ) if isinstance(protocol, str): _register(protocol, asynchronous=asynchronous) return @@ -517,14 +513,6 @@ def register(protocol: str | Iterable[str], *, asynchronous: bool = False) -> No def _register(protocol: str, *, asynchronous: bool) -> None: - if not protocol: - raise ValueError( - "Protocol must be a non-empty string", - ) - if not isinstance(protocol, str): - err_msg = f"Protocol must be a string, got {type(protocol).__name__}" - raise TypeError(err_msg) - fsspec.register_implementation( protocol, type( From 969dbff8d69571869c25b4d70a1bb95cb2fd942a Mon Sep 17 00:00:00 2001 From: machichima Date: Thu, 27 Feb 2025 22:59:14 +0800 Subject: [PATCH 49/51] test: remove test register invalid type As the runtime type check is removed in register for simplification, this test is no longer needed --- tests/test_fsspec.py | 30 ------------------------------ 1 file changed, 30 deletions(-) diff --git a/tests/test_fsspec.py b/tests/test_fsspec.py index 6250783b..23fc384b 100644 --- a/tests/test_fsspec.py +++ b/tests/test_fsspec.py @@ -69,36 +69,6 @@ def test_register(): assert issubclass(fsspec.get_filesystem_class("abfs"), AsyncFsspecStore) -def test_register_invalid_types(): - """Test that register rejects invalid input types.""" - with pytest.raises( - TypeError, - match="'int' object is not iterable", - ): - register(123) - - with pytest.raises(TypeError, match="Protocol must be a string, got int"): - register(["test", 42]) # List contains a non-string - - with pytest.raises( - ValueError, - match="Protocol must be a non-empty string", - ): - register(["test1", ""]) # List contains a non-string - - with pytest.raises( - ValueError, - match="Protocol must be a non-empty string or list", - ): - register(None) # None is invalid - - with pytest.raises( - ValueError, - match="Protocol must be a non-empty string or list", - ): - register([]) # Empty list is invalid - - def test_construct_store_cache_diff_bucket_name(s3_store_config: S3Config): register("s3") fs: AsyncFsspecStore = fsspec.filesystem( From e443668b060369790f42321d4c1d04cb4ea8f106 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Fri, 28 Feb 2025 17:14:43 -0500 Subject: [PATCH 50/51] Switch to checking if protocol does not require bucket --- obstore/python/obstore/fsspec.py | 9 +++++---- tests/test_fsspec.py | 4 ++-- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index cc0b526d..a410a0cb 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -169,13 +169,13 @@ def _split_path(self, path: str) -> tuple[str, str]: ['mybucket', 'path/to/file'] """ - protocol_with_bucket = ["s3", "s3a", "gcs", "gs", "abfs", "https", "http"] + protocol_without_bucket = {"file", "memory"} # Parse the path as a URL parsed = urlparse(path) - # If the protocol doesn't support buckets, return empty bucket and full path - if self._protocol not in protocol_with_bucket: + # If the protocol doesn't require buckets, return empty bucket and full path + if self._protocol in protocol_without_bucket: return ( "", f"{parsed.netloc}/{parsed.path.lstrip('/')}" if parsed.scheme else path, @@ -191,6 +191,7 @@ def _split_path(self, path: str) -> tuple[str, str]: path_li = path.split("/", 1) if len(path_li) == 1: return path, "" + return (path_li[0], path_li[1]) def _construct_store(self, bucket: str) -> ObjectStore: @@ -198,7 +199,7 @@ def _construct_store(self, bucket: str) -> ObjectStore: url=f"{self._protocol}://{bucket}", config=self.config, client_options=self.client_options, - retry_config=self.retry_config or None, + retry_config=self.retry_config, ) async def _rm_file(self, path: str, **_kwargs: Any) -> None: diff --git a/tests/test_fsspec.py b/tests/test_fsspec.py index 23fc384b..629f04bc 100644 --- a/tests/test_fsspec.py +++ b/tests/test_fsspec.py @@ -97,7 +97,7 @@ def test_construct_store_cache_diff_bucket_name(s3_store_config: S3Config): assert mock_construct.cache_info().misses == 20, "Cache should miss 20 times" # test garbage collector - fs = None + del fs assert gc.collect() > 0 @@ -130,7 +130,7 @@ def test_construct_store_cache_same_bucket_name(s3_store_config: S3Config): assert mock_construct.cache_info().misses == 1, "Cache should only miss once" # test garbage collector - fs = None + del fs assert gc.collect() > 0 From 49493484521c452cf97ed5a6e429b54a728d3047 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Fri, 28 Feb 2025 17:20:15 -0500 Subject: [PATCH 51/51] Warn on unknown protocol --- obstore/python/obstore/fsspec.py | 28 +++++++++++++++++++++++----- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index a410a0cb..d704860a 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -35,6 +35,7 @@ from __future__ import annotations import asyncio +import warnings from collections import defaultdict from functools import lru_cache from pathlib import Path @@ -64,6 +65,18 @@ S3ConfigInput, ) +SUPPORTED_PROTOCOLS = { + "s3", + "s3a", + "gcs", + "gs", + "abfs", + "https", + "http", + "file", + "memory", +} + class AsyncFsspecStore(fsspec.asyn.AsyncFileSystem): """An fsspec implementation based on a obstore Store. @@ -124,14 +137,12 @@ def __init__( # noqa: PLR0913 are seeing timeouts, you may want to set this number smaller than the defaults, which are determined in `fsspec.asyn._get_batch_size`. - Example: + **Examples:** + ```py from obstore.fsspec import AsyncFsspecStore - from obstore.store import HTTPStore - store = AsyncFsspecStore( - protocol="https", - ) + store = AsyncFsspecStore(protocol="https") resp = store.cat("https://example.com") assert resp.startswith(b"") ``` @@ -141,6 +152,13 @@ def __init__( # noqa: PLR0913 self._protocol = self.protocol else: self._protocol = protocol + + if self._protocol not in SUPPORTED_PROTOCOLS: + warnings.warn( + f"Unknown protocol: {self._protocol}; requests may fail.", + stacklevel=2, + ) + self.config = config self.client_options = client_options self.retry_config = retry_config