diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index 3e5bc765..d704860a 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -30,26 +30,52 @@ integration. """ -# ruff: noqa: ANN401 -# Dynamically typed expressions (typing.Any) are disallowed -# ruff: noqa: PTH123 -# `open()` should be replaced by `Path.open()` +# ruff: noqa: ANN401, PTH123, FBT001, FBT002 from __future__ import annotations import asyncio +import warnings from collections import defaultdict +from functools import lru_cache +from pathlib import Path from typing import TYPE_CHECKING, Any, Literal, overload +from urllib.parse import urlparse import fsspec.asyn import fsspec.spec import obstore as obs +from obstore import Bytes +from obstore.store import from_url if TYPE_CHECKING: - from collections.abc import Coroutine + from collections.abc import Coroutine, Iterable from obstore import Bytes + from obstore.store import ( + AzureConfig, + AzureConfigInput, + ClientConfig, + GCSConfig, + GCSConfigInput, + ObjectStore, + RetryConfig, + S3Config, + S3ConfigInput, + ) + +SUPPORTED_PROTOCOLS = { + "s3", + "s3a", + "gcs", + "gs", + "abfs", + "https", + "http", + "file", + "memory", +} class AsyncFsspecStore(fsspec.asyn.AsyncFileSystem): @@ -59,20 +85,42 @@ class AsyncFsspecStore(fsspec.asyn.AsyncFileSystem): fsspec-style object. """ - cachable = False + # https://github.com/fsspec/filesystem_spec/blob/56054c0a30ceedab4c0e6a0f7e429666773baf6d/docs/source/features.rst#instance-caching + cachable = True - def __init__( + def __init__( # noqa: PLR0913 self, - store: obs.store.ObjectStore, *args: Any, + protocol: str | None = None, + config: ( + S3Config + | S3ConfigInput + | GCSConfig + | GCSConfigInput + | AzureConfig + | AzureConfigInput + | None + ) = None, + client_options: ClientConfig | None = None, + retry_config: RetryConfig | None = None, asynchronous: bool = False, + max_cache_size: int = 10, loop: Any = None, batch_size: int | None = None, ) -> None: """Construct a new AsyncFsspecStore. Args: - store: a configured instance of one of the store classes in `obstore.store`. + protocol: The storage protocol to use, such as "s3", + "gcs", or "abfs". If `None`, the default class-level protocol + is used. Default to None. + config: Configuration for the cloud storage provider, which can be one of + S3Config, S3ConfigInput, GCSConfig, GCSConfigInput, AzureConfig, + or AzureConfigInput. Any of these values will be applied after checking + for environment variables. If `None`, no cloud storage configuration is + applied beyond what is found in environment variables. + client_options: Additional options for configuring the client. + retry_config: Configuration for handling request errors. args: positional arguments passed on to the `fsspec.asyn.AsyncFileSystem` constructor. @@ -80,25 +128,44 @@ def __init__( asynchronous: Set to `True` if this instance is meant to be be called using the fsspec async API. This should only be set to true when running within a coroutine. + max_cache_size (int, optional): The maximum number of stores the cache + should keep. A cached store is kept internally for each bucket name. + Defaults to 10. loop: since both fsspec/python and tokio/rust may be using loops, this should be kept `None` for now, and will not be used. batch_size: some operations on many files will batch their requests; if you are seeing timeouts, you may want to set this number smaller than the defaults, which are determined in `fsspec.asyn._get_batch_size`. - Example: + **Examples:** + ```py from obstore.fsspec import AsyncFsspecStore - from obstore.store import HTTPStore - store = HTTPStore.from_url("https://example.com") - fsspec_store = AsyncFsspecStore(store) - resp = fsspec_store.cat("/") + store = AsyncFsspecStore(protocol="https") + resp = store.cat("https://example.com") assert resp.startswith(b"") ``` """ - self.store = store + if protocol is None: + self._protocol = self.protocol + else: + self._protocol = protocol + + if self._protocol not in SUPPORTED_PROTOCOLS: + warnings.warn( + f"Unknown protocol: {self._protocol}; requests may fail.", + stacklevel=2, + ) + + self.config = config + self.client_options = client_options + self.retry_config = retry_config + + # https://stackoverflow.com/a/68550238 + self._construct_store = lru_cache(maxsize=max_cache_size)(self._construct_store) + super().__init__( *args, asynchronous=asynchronous, @@ -106,11 +173,72 @@ def __init__( batch_size=batch_size, ) + def _split_path(self, path: str) -> tuple[str, str]: + """Split bucket and file path. + + Args: + path: Input path, like `s3://mybucket/path/to/file` + + Returns: + (bucket name, file path inside the bucket) + + Examples: + >>> split_path("s3://mybucket/path/to/file") + ['mybucket', 'path/to/file'] + + """ + protocol_without_bucket = {"file", "memory"} + + # Parse the path as a URL + parsed = urlparse(path) + + # If the protocol doesn't require buckets, return empty bucket and full path + if self._protocol in protocol_without_bucket: + return ( + "", + f"{parsed.netloc}/{parsed.path.lstrip('/')}" if parsed.scheme else path, + ) + + if parsed.scheme: + if parsed.scheme != self._protocol: + err_msg = f"Expect protocol to be {self._protocol}. Got {parsed.scheme}" + raise ValueError(err_msg) + return (parsed.netloc, parsed.path.lstrip("/")) + + # path not in url format + path_li = path.split("/", 1) + if len(path_li) == 1: + return path, "" + + return (path_li[0], path_li[1]) + + def _construct_store(self, bucket: str) -> ObjectStore: + return from_url( + url=f"{self._protocol}://{bucket}", + config=self.config, + client_options=self.client_options, + retry_config=self.retry_config, + ) + async def _rm_file(self, path: str, **_kwargs: Any) -> None: - return await obs.delete_async(self.store, path) + bucket, path = self._split_path(path) + store = self._construct_store(bucket) + return await obs.delete_async(store, path) async def _cp_file(self, path1: str, path2: str, **_kwargs: Any) -> None: - return await obs.copy_async(self.store, path1, path2) + bucket1, path1_no_bucket = self._split_path(path1) + bucket2, path2_no_bucket = self._split_path(path2) + + if bucket1 != bucket2: + err_msg = ( + f"Bucket mismatch: Source bucket '{bucket1}' and " + f"destination bucket '{bucket2}' must be the same." + ) + raise ValueError(err_msg) + + store = self._construct_store(bucket1) + + return await obs.copy_async(store, path1_no_bucket, path2_no_bucket) async def _pipe_file( self, @@ -119,7 +247,9 @@ async def _pipe_file( mode: str = "overwrite", # noqa: ARG002 **_kwargs: Any, ) -> Any: - return await obs.put_async(self.store, path, value) + bucket, path = self._split_path(path) + store = self._construct_store(bucket) + return await obs.put_async(store, path, value) async def _cat_file( self, @@ -128,8 +258,11 @@ async def _cat_file( end: int | None = None, **_kwargs: Any, ) -> bytes: + bucket, path = self._split_path(path) + store = self._construct_store(bucket) + if start is None and end is None: - resp = await obs.get_async(self.store, path) + resp = await obs.get_async(store, path) return (await resp.bytes_async()).to_bytes() if start is None or end is None: @@ -137,9 +270,35 @@ async def _cat_file( "cat_file not implemented for start=None xor end=None", ) - range_bytes = await obs.get_range_async(self.store, path, start=start, end=end) + range_bytes = await obs.get_range_async(store, path, start=start, end=end) return range_bytes.to_bytes() + async def _cat( + self, + path: str, + recursive: bool = False, + on_error: str = "raise", + batch_size: int | None = None, + **_kwargs: Any, + ) -> bytes | dict[str, bytes]: + paths = await self._expand_path(path, recursive=recursive) + + # Filter out directories + files = [p for p in paths if not await self._isdir(p)] + + if not files: + err_msg = f"No valid files found in {path}" + raise FileNotFoundError(err_msg) + + # Call the original _cat only on files + return await super()._cat( + files, + recursive=False, + on_error=on_error, + batch_size=batch_size, + **_kwargs, + ) + async def _cat_ranges( # noqa: PLR0913 self, paths: list[str], @@ -165,9 +324,12 @@ async def _cat_ranges( # noqa: PLR0913 futs: list[Coroutine[Any, Any, list[Bytes]]] = [] for path, ranges in per_file_requests.items(): + bucket, path_no_bucket = self._split_path(path) + store = self._construct_store(bucket) + offsets = [r[0] for r in ranges] ends = [r[1] for r in ranges] - fut = obs.get_ranges_async(self.store, path, starts=offsets, ends=ends) + fut = obs.get_ranges_async(store, path_no_bucket, starts=offsets, ends=ends) futs.append(fut) result = await asyncio.gather(*futs) @@ -192,31 +354,58 @@ async def _put_file( mode: str = "overwrite", # noqa: ARG002 **_kwargs: Any, ) -> None: + if not Path(lpath).is_file(): + err_msg = f"File {lpath} not found in local" + raise FileNotFoundError(err_msg) + # TODO: convert to use async file system methods using LocalStore # Async functions should not open files with blocking methods like `open` + rbucket, rpath = self._split_path(rpath) + + # Should construct the store instance by rbucket, which is the target path + store = self._construct_store(rbucket) + with open(lpath, "rb") as f: # noqa: ASYNC230 - await obs.put_async(self.store, rpath, f) + await obs.put_async(store, rpath, f) async def _get_file(self, rpath: str, lpath: str, **_kwargs: Any) -> None: + res = urlparse(lpath) + if res.scheme or Path(lpath).is_dir(): + # lpath need to be local file and cannot contain scheme + return + # TODO: convert to use async file system methods using LocalStore # Async functions should not open files with blocking methods like `open` + rbucket, rpath = self._split_path(rpath) + + # Should construct the store instance by rbucket, which is the target path + store = self._construct_store(rbucket) + with open(lpath, "wb") as f: # noqa: ASYNC230 - resp = await obs.get_async(self.store, rpath) + resp = await obs.get_async(store, rpath) async for buffer in resp.stream(): f.write(buffer) async def _info(self, path: str, **_kwargs: Any) -> dict[str, Any]: - head = await obs.head_async(self.store, path) - return { - # Required of `info`: (?) - "name": head["path"], - "size": head["size"], - "type": "directory" if head["path"].endswith("/") else "file", - # Implementation-specific keys - "e_tag": head["e_tag"], - "last_modified": head["last_modified"], - "version": head["version"], - } + bucket, path_no_bucket = self._split_path(path) + store = self._construct_store(bucket) + + try: + head = await obs.head_async(store, path_no_bucket) + return { + # Required of `info`: (?) + "name": head["path"], + "size": head["size"], + "type": "directory" if head["path"].endswith("/") else "file", + # Implementation-specific keys + "e_tag": head["e_tag"], + "last_modified": head["last_modified"], + "version": head["version"], + } + except FileNotFoundError: + # use info in fsspec.AbstractFileSystem + loop = asyncio.get_running_loop() + return await loop.run_in_executor(None, super().info, path, **_kwargs) @overload async def _ls( @@ -229,36 +418,48 @@ async def _ls( async def _ls( self, path: str, - detail: Literal[True] = True, # noqa: FBT002 + detail: Literal[True] = True, **_kwargs: Any, ) -> list[dict[str, Any]]: ... async def _ls( self, path: str, - detail: bool = True, # noqa: FBT001, FBT002 + detail: bool = True, **_kwargs: Any, ) -> list[dict[str, Any]] | list[str]: - result = await obs.list_with_delimiter_async(self.store, path) + bucket, path = self._split_path(path) + store = self._construct_store(bucket) + + result = await obs.list_with_delimiter_async(store, path) objects = result["objects"] prefs = result["common_prefixes"] - if detail: - return [ - { - "name": obj["path"], - "size": obj["size"], - "type": "file", - "e_tag": obj["e_tag"], - } - for obj in objects - ] + [{"name": obj, "size": 0, "type": "directory"} for obj in prefs] - return sorted([obj["path"] for obj in objects] + prefs) + files = [ + { + "name": f"{bucket}/{obj['path']}", + "size": obj["size"], + "type": "file", + "e_tag": obj["e_tag"], + } + for obj in objects + ] + [ + { + "name": f"{bucket}/{pref}", + "size": 0, + "type": "directory", + } + for pref in prefs + ] + if not files: + raise FileNotFoundError(path) + + return files if detail else sorted(o["name"] for o in files) def _open( self, path: str, mode: str = "rb", block_size: Any = None, # noqa: ARG002 - autocommit: Any = True, # noqa: ARG002, FBT002 + autocommit: Any = True, # noqa: ARG002 cache_options: Any = None, # noqa: ARG002 **kwargs: Any, ) -> BufferedFileSimple: @@ -296,3 +497,50 @@ def read(self, length: int = -1) -> Any: data = self.fs.cat_file(self.path, self.loc, self.loc + length) self.loc += length return data + + +def register(protocol: str | Iterable[str], *, asynchronous: bool = False) -> None: + """Dynamically register a subclass of AsyncFsspecStore for the given protocol(s). + + This function creates a new subclass of AsyncFsspecStore with the specified + protocol and registers it with fsspec. If multiple protocols are provided, + the function registers each one individually. + + Args: + protocol (str | list[str]): A single protocol (e.g., "s3", "gcs", "abfs") or + a list of protocols to register AsyncFsspecStore for. + asynchronous (bool, optional): If True, the registered store will support + asynchronous operations. Defaults to False. + + Example: + >>> register("s3") + >>> register("s3", asynchronous=True) # Registers an async store for "s3" + >>> register(["gcs", "abfs"]) # Registers both "gcs" and "abfs" + + Notes: + - Each protocol gets a dynamically generated subclass named + `AsyncFsspecStore_`. + - This avoids modifying the original AsyncFsspecStore class. + + """ + if isinstance(protocol, str): + _register(protocol, asynchronous=asynchronous) + return + + for p in protocol: + _register(p, asynchronous=asynchronous) + + +def _register(protocol: str, *, asynchronous: bool) -> None: + fsspec.register_implementation( + protocol, + type( + f"AsyncFsspecStore_{protocol}", # Unique class name + (AsyncFsspecStore,), # Base class + { + "protocol": protocol, + "asynchronous": asynchronous, + }, # Assign protocol dynamically + ), + clobber=False, + ) diff --git a/tests/conftest.py b/tests/conftest.py index 4ba7efff..c8929196 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,3 +1,5 @@ +from typing import TYPE_CHECKING + import boto3 import pytest import urllib3 @@ -7,6 +9,9 @@ from obstore.store import S3Store +if TYPE_CHECKING: + from obstore.store import S3ConfigInput + TEST_BUCKET_NAME = "test" @@ -49,3 +54,12 @@ def s3_store(s3: str): aws_skip_signature=True, client_options={"allow_http": True}, ) + + +@pytest.fixture +def s3_store_config(s3: str) -> "S3ConfigInput": + return { + "AWS_ENDPOINT_URL": s3, + "AWS_REGION": "us-east-1", + "AWS_SKIP_SIGNATURE": True, + } diff --git a/tests/test_fsspec.py b/tests/test_fsspec.py index b303d6b5..629f04bc 100644 --- a/tests/test_fsspec.py +++ b/tests/test_fsspec.py @@ -1,101 +1,321 @@ +from __future__ import annotations + +import gc import os +from typing import TYPE_CHECKING +from unittest.mock import patch +import fsspec import pyarrow.parquet as pq import pytest +from fsspec.registry import _registry + +from obstore.fsspec import AsyncFsspecStore, register +from tests.conftest import TEST_BUCKET_NAME -import obstore as obs -from obstore.fsspec import AsyncFsspecStore -from obstore.store import S3Store +if TYPE_CHECKING: + from obstore.store import S3Config @pytest.fixture -def fs(s3_store: S3Store): - return AsyncFsspecStore(s3_store) +def fs(s3_store_config: S3Config): + register("s3") + return fsspec.filesystem( + "s3", + config=s3_store_config, + client_options={"allow_http": True}, + ) + + +@pytest.fixture(autouse=True) +def cleanup_after_test(): + """Cleanup function to run after each test.""" + yield # Runs the test first + + # clear the registered implementations after each test + _registry.clear() + + gc.collect() + + +def test_register(): + """Test if register() creates and registers a subclass for a given protocol.""" + register("s3") # Register the "s3" protocol dynamically + fs_class = fsspec.get_filesystem_class("s3") + + assert issubclass( + fs_class, + AsyncFsspecStore, + ), "Registered class should be a subclass of AsyncFsspecStore" + assert fs_class.protocol == "s3", ( + "Registered class should have the correct protocol" + ) + + # Ensure a new instance of the registered store can be created + fs_instance = fs_class() + assert isinstance( + fs_instance, + AsyncFsspecStore, + ), "Registered class should be instantiable" + + # test register asynchronous + register("gcs", asynchronous=True) # Register the "s3" protocol dynamically + fs_class = fsspec.get_filesystem_class("gcs") + assert fs_class.asynchronous, "Registered class should be asynchronous" + + # test multiple registrations + register(["file", "abfs"]) + assert issubclass(fsspec.get_filesystem_class("file"), AsyncFsspecStore) + assert issubclass(fsspec.get_filesystem_class("abfs"), AsyncFsspecStore) + + +def test_construct_store_cache_diff_bucket_name(s3_store_config: S3Config): + register("s3") + fs: AsyncFsspecStore = fsspec.filesystem( + "s3", + config=s3_store_config, + client_options={"allow_http": True}, + asynchronous=True, + max_cache_size=5, + ) + + bucket_names = [f"bucket{i}" for i in range(20)] # 20 unique buckets + + with patch.object( + fs, + "_construct_store", + wraps=fs._construct_store, + ) as mock_construct: + for bucket in bucket_names: + fs._construct_store(bucket) + + # Since the cache is set to 16, only the first 16 unique calls should be cached + assert mock_construct.cache_info().currsize == 5, ( + "Cache should only store 5 cache" + ) + assert mock_construct.cache_info().hits == 0, "Cache should hits 0 times" + assert mock_construct.cache_info().misses == 20, "Cache should miss 20 times" + + # test garbage collector + del fs + assert gc.collect() > 0 + + +def test_construct_store_cache_same_bucket_name(s3_store_config: S3Config): + register("s3") + fs = fsspec.filesystem( + "s3", + config=s3_store_config, + client_options={"allow_http": True}, + asynchronous=True, + max_cache_size=5, + ) + + bucket_names = ["bucket" for _ in range(20)] + + with patch.object( + fs, + "_construct_store", + wraps=fs._construct_store, + ) as mock_construct: + for bucket in bucket_names: + fs._construct_store(bucket) + + assert mock_construct.cache_info().currsize == 1, ( + "Cache should only store 1 cache" + ) + assert mock_construct.cache_info().hits == 20 - 1, ( + "Cache should hits 20-1 times" + ) + assert mock_construct.cache_info().misses == 1, "Cache should only miss once" + + # test garbage collector + del fs + assert gc.collect() > 0 + + +def test_fsspec_filesystem_cache(s3_store_config: S3Config): + """Test caching behavior of fsspec.filesystem with the _Cached metaclass.""" + register("s3") + + # call fsspec.filesystem() multiple times with the same parameters + fs1 = fsspec.filesystem( + "s3", + config=s3_store_config, + client_options={"allow_http": True}, + ) + fs2 = fsspec.filesystem( + "s3", + config=s3_store_config, + client_options={"allow_http": True}, + ) + + # Same parameters should return the same instance + assert fs1 is fs2, ( + "fsspec.filesystem() with the same parameters should return the cached instance" + ) + + # Changing parameters should create a new instance + fs3 = fsspec.filesystem( + "s3", + config=s3_store_config, + client_options={"allow_http": True}, + asynchronous=True, + ) + assert fs1 is not fs3, ( + "fsspec.filesystem() with different parameters should return a new instance" + ) + + +def test_split_path(fs: AsyncFsspecStore): + # in url format, with bucket + assert fs._split_path("s3://mybucket/path/to/file") == ("mybucket", "path/to/file") + assert fs._split_path("s3://data-bucket/") == ("data-bucket", "") + + # path format, with bucket + assert fs._split_path("mybucket/path/to/file") == ("mybucket", "path/to/file") + assert fs._split_path("data-bucket/") == ("data-bucket", "") + + # url format, wrong porotocol + with pytest.raises(ValueError, match="Expect protocol to be s3. Got gs"): + fs._split_path("gs://data-bucket/") + + # in url format, without bucket + fs._protocol = "file" + assert fs._split_path("file:///mybucket/path/to/file") == ( + "", + "/mybucket/path/to/file", + ) + assert fs._split_path("file:///data-bucket/") == ("", "/data-bucket/") + + # path format, without bucket + assert fs._split_path("/mybucket/path/to/file") == ("", "/mybucket/path/to/file") + assert fs._split_path("/data-bucket/") == ("", "/data-bucket/") def test_list(fs: AsyncFsspecStore): - out = fs.ls("", detail=False) - assert out == ["afile"] - fs.pipe_file("dir/bfile", b"data") - out = fs.ls("", detail=False) - assert out == ["afile", "dir"] - out = fs.ls("", detail=True) + out = fs.ls(f"{TEST_BUCKET_NAME}", detail=False) + assert out == [f"{TEST_BUCKET_NAME}/afile"] + fs.pipe_file(f"{TEST_BUCKET_NAME}/dir/bfile", b"data") + out = fs.ls(f"{TEST_BUCKET_NAME}", detail=False) + assert out == [f"{TEST_BUCKET_NAME}/afile", f"{TEST_BUCKET_NAME}/dir"] + out = fs.ls(f"{TEST_BUCKET_NAME}", detail=True) assert out[0]["type"] == "file" assert out[1]["type"] == "directory" @pytest.mark.asyncio -async def test_list_async(s3_store: S3Store): - fs = AsyncFsspecStore(s3_store, asynchronous=True) - out = await fs._ls("", detail=False) - assert out == ["afile"] - await fs._pipe_file("dir/bfile", b"data") - out = await fs._ls("", detail=False) - assert out == ["afile", "dir"] - out = await fs._ls("", detail=True) +async def test_list_async(s3_store_config: S3Config): + register("s3") + fs = fsspec.filesystem( + "s3", + config=s3_store_config, + client_options={"allow_http": True}, + asynchronous=True, + ) + + out = await fs._ls(f"{TEST_BUCKET_NAME}", detail=False) + assert out == [f"{TEST_BUCKET_NAME}/afile"] + await fs._pipe_file(f"{TEST_BUCKET_NAME}/dir/bfile", b"data") + out = await fs._ls(f"{TEST_BUCKET_NAME}", detail=False) + assert out == [f"{TEST_BUCKET_NAME}/afile", f"{TEST_BUCKET_NAME}/dir"] + out = await fs._ls(f"{TEST_BUCKET_NAME}", detail=True) assert out[0]["type"] == "file" assert out[1]["type"] == "directory" @pytest.mark.network def test_remote_parquet(): - store = obs.store.HTTPStore.from_url("https://github.com") - fs = AsyncFsspecStore(store) - url = "opengeospatial/geoparquet/raw/refs/heads/main/examples/example.parquet" + register("https") + fs = fsspec.filesystem("https") + url = "github.com/opengeospatial/geoparquet/raw/refs/heads/main/examples/example.parquet" # noqa: E501 + pq.read_metadata(url, filesystem=fs) + + # also test with full url + url = "https://github.com/opengeospatial/geoparquet/raw/refs/heads/main/examples/example.parquet" pq.read_metadata(url, filesystem=fs) def test_multi_file_ops(fs: AsyncFsspecStore): - data = {"dir/test1": b"test data1", "dir/test2": b"test data2"} + data = { + f"{TEST_BUCKET_NAME}/dir/test1": b"test data1", + f"{TEST_BUCKET_NAME}/dir/test2": b"test data2", + } fs.pipe(data) out = fs.cat(list(data)) assert out == data - out = fs.cat("dir", recursive=True) + out = fs.cat(f"{TEST_BUCKET_NAME}/dir", recursive=True) assert out == data - fs.cp("dir", "dir2", recursive=True) - out = fs.find("", detail=False) - assert out == ["afile", "dir/test1", "dir/test2", "dir2/test1", "dir2/test2"] - fs.rm(["dir", "dir2"], recursive=True) - out = fs.find("", detail=False) - assert out == ["afile"] + fs.cp(f"{TEST_BUCKET_NAME}/dir", f"{TEST_BUCKET_NAME}/dir2", recursive=True) + out = fs.find(f"{TEST_BUCKET_NAME}", detail=False) + assert out == [ + f"{TEST_BUCKET_NAME}/afile", + f"{TEST_BUCKET_NAME}/dir/test1", + f"{TEST_BUCKET_NAME}/dir/test2", + f"{TEST_BUCKET_NAME}/dir2/test1", + f"{TEST_BUCKET_NAME}/dir2/test2", + ] + fs.rm([f"{TEST_BUCKET_NAME}/dir", f"{TEST_BUCKET_NAME}/dir2"], recursive=True) + out = fs.find(f"{TEST_BUCKET_NAME}", detail=False) + assert out == [f"{TEST_BUCKET_NAME}/afile"] def test_cat_ranges_one(fs: AsyncFsspecStore): data1 = os.urandom(10000) - fs.pipe_file("data1", data1) + fs.pipe_file(f"{TEST_BUCKET_NAME}/data1", data1) # single range - out = fs.cat_ranges(["data1"], [10], [20]) + out = fs.cat_ranges([f"{TEST_BUCKET_NAME}/data1"], [10], [20]) assert out == [data1[10:20]] # range oob - out = fs.cat_ranges(["data1"], [0], [11000]) + out = fs.cat_ranges([f"{TEST_BUCKET_NAME}/data1"], [0], [11000]) assert out == [data1] # two disjoint ranges, one file - out = fs.cat_ranges(["data1", "data1"], [10, 40], [20, 60]) + out = fs.cat_ranges( + [f"{TEST_BUCKET_NAME}/data1", f"{TEST_BUCKET_NAME}/data1"], + [10, 40], + [20, 60], + ) assert out == [data1[10:20], data1[40:60]] # two adjoining ranges, one file - out = fs.cat_ranges(["data1", "data1"], [10, 30], [20, 60]) + out = fs.cat_ranges( + [f"{TEST_BUCKET_NAME}/data1", f"{TEST_BUCKET_NAME}/data1"], + [10, 30], + [20, 60], + ) assert out == [data1[10:20], data1[30:60]] # two overlapping ranges, one file - out = fs.cat_ranges(["data1", "data1"], [10, 15], [20, 60]) + out = fs.cat_ranges( + [f"{TEST_BUCKET_NAME}/data1", f"{TEST_BUCKET_NAME}/data1"], + [10, 15], + [20, 60], + ) assert out == [data1[10:20], data1[15:60]] # completely overlapping ranges, one file - out = fs.cat_ranges(["data1", "data1"], [10, 0], [20, 60]) + out = fs.cat_ranges( + [f"{TEST_BUCKET_NAME}/data1", f"{TEST_BUCKET_NAME}/data1"], + [10, 0], + [20, 60], + ) assert out == [data1[10:20], data1[0:60]] def test_cat_ranges_two(fs: AsyncFsspecStore): data1 = os.urandom(10000) data2 = os.urandom(10000) - fs.pipe({"data1": data1, "data2": data2}) + fs.pipe({f"{TEST_BUCKET_NAME}/data1": data1, f"{TEST_BUCKET_NAME}/data2": data2}) # single range in each file - out = fs.cat_ranges(["data1", "data2"], [10, 10], [20, 20]) + out = fs.cat_ranges( + [f"{TEST_BUCKET_NAME}/data1", f"{TEST_BUCKET_NAME}/data2"], + [10, 10], + [20, 20], + ) assert out == [data1[10:20], data2[10:20]] @@ -120,4 +340,4 @@ def test_atomic_write(fs: AsyncFsspecStore): def test_cat_ranges_error(fs: AsyncFsspecStore): with pytest.raises(ValueError): # noqa: PT011 - fs.cat_ranges(["path"], [], []) + fs.cat_ranges([f"{TEST_BUCKET_NAME}/path"], [], [])