diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 82aa31db5..3d3ee50ea 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -62,6 +62,12 @@ jobs: shell: bash -l {0} run: source continuous_integration/scripts/start_redis.sh + - name: Start Minio service in container. + #TODO: This product is leaving open-source container distribution + # Find a new image or product to use + # https://github.com/minio/minio/issues/21647#issuecomment-3418675115 + shell: bash -l {0} + run: source continuous_integration/scripts/start_minio.sh - name: Ensure example data is migrated to current catalog database schema. # The example data is expected to be kept up to date to the latest Tiled @@ -84,6 +90,7 @@ jobs: # Provide test suite with PostgreSQL and Redis databases to use. TILED_TEST_POSTGRESQL_URI: postgresql://postgres:secret@localhost:5432 TILED_TEST_REDIS: redis://localhost:6379 + TILED_TEST_BUCKET: http://minioadmin:minioadmin@localhost:9000/buck # TODO Reinstate after finding a new image to use # https://github.com/bluesky/tiled/issues/1109 # # Opt in to LDAPAuthenticator tests. diff --git a/continuous_integration/docker-configs/minio-docker-compose.yml b/continuous_integration/docker-configs/minio-docker-compose.yml new file mode 100644 index 000000000..563835eed --- /dev/null +++ b/continuous_integration/docker-configs/minio-docker-compose.yml @@ -0,0 +1,29 @@ +version: "3.2" +services: + minio: + image: minio/minio:latest + ports: + - 9000:9000 + - 9001:9001 + volumes: + - minio-data:/data + environment: + MINIO_ROOT_USER: "minioadmin" + MINIO_ROOT_PASSWORD: "minioadmin" + command: server /data --console-address :9001 + restart: unless-stopped + create-bucket: + image: minio/mc:latest + environment: + MC_HOST_minio: http://minioadmin:minioadmin@minio:9000 + entrypoint: + - sh + - -c + - | + until mc ls minio > /dev/null 2>&1; do + sleep 0.5 + done + + mc mb --ignore-existing minio/buck +volumes: + minio-data: diff --git a/continuous_integration/scripts/start_minio.sh b/continuous_integration/scripts/start_minio.sh new file mode 100644 index 000000000..567addf6e --- /dev/null +++ b/continuous_integration/scripts/start_minio.sh @@ -0,0 +1,7 @@ +#!/bin/bash +set -e + +# Start MinIO server in docker container +docker pull minio/minio:latest +docker compose -f continuous_integration/docker-configs/minio-docker-compose.yml up -d +docker ps diff --git a/example_configs/bucket_storage/bucket_storage.yml b/example_configs/bucket_storage/bucket_storage.yml new file mode 100644 index 000000000..8dd88a503 --- /dev/null +++ b/example_configs/bucket_storage/bucket_storage.yml @@ -0,0 +1,17 @@ +authentication: + allow_anonymous_access: false +trees: + - path: / + tree: catalog + args: + uri: "sqlite:///storage/catalog.db" + writable_storage: + - provider: s3 + uri: "http://localhost:9000" + config: + access_key_id: "minioadmin" + secret_access_key: "minioadmin" + bucket: "buck" + virtual_hosted_style_request: False + client_options: {"allow_http": True} + init_if_not_exists: true diff --git a/example_configs/bucket_storage/docker-compose.yml b/example_configs/bucket_storage/docker-compose.yml new file mode 100644 index 000000000..563835eed --- /dev/null +++ b/example_configs/bucket_storage/docker-compose.yml @@ -0,0 +1,29 @@ +version: "3.2" +services: + minio: + image: minio/minio:latest + ports: + - 9000:9000 + - 9001:9001 + volumes: + - minio-data:/data + environment: + MINIO_ROOT_USER: "minioadmin" + MINIO_ROOT_PASSWORD: "minioadmin" + command: server /data --console-address :9001 + restart: unless-stopped + create-bucket: + image: minio/mc:latest + environment: + MC_HOST_minio: http://minioadmin:minioadmin@minio:9000 + entrypoint: + - sh + - -c + - | + until mc ls minio > /dev/null 2>&1; do + sleep 0.5 + done + + mc mb --ignore-existing minio/buck +volumes: + minio-data: diff --git a/example_configs/bucket_storage/readme.md b/example_configs/bucket_storage/readme.md new file mode 100644 index 000000000..03977b990 --- /dev/null +++ b/example_configs/bucket_storage/readme.md @@ -0,0 +1,18 @@ +# Create a local bucket for testing access to BLOBS + +In this example there exists: +- A `docker-compose.yml` file capable of instantiating and running a [Minio](https://min.io/) container. +- A configuration yaml file `bucket_storage.yml` which contains information tiled needs to authenticate with the bucket storage system and write / read Binary Large Objects (BLOBS) through the Zaar adapter. + +## How to run this example: +1. In one terminal window, navigate to the directory where the `docker-compose.yml` and `bucket_storage.yml` are. +2. Run `docker compose up` with adequate permissions. +3. Open another terminal window in the same location and run `tiled serve config bucket_storage.yml --api-key secret` +4. You will need to create a `storage` directory in `/example_configs/bucket_storage` for the sqlite database. +5. Create an `ipython` session and run the following commands to write array data as a BLOB in a bucket: +```python +from tiled.client import from_uri +c = from_uri('http://localhost:8000', api_key='secret') +c.write_array([1,2,3]) +``` +6. You will be able to see the written data in the bucket if you log in to the minio container, exposed on your machine at `http://localhost:9001/login`.
Use testing credentials `minioadmin` for both fields. diff --git a/pixi.toml b/pixi.toml index e2c60d2da..018a03b06 100644 --- a/pixi.toml +++ b/pixi.toml @@ -54,6 +54,7 @@ flake8 = "*" ipython = "*" ldap3 = "*" matplotlib = "*" +minio = "*" mistune = "*" myst-parser = "*" numpydoc = "*" diff --git a/pyproject.toml b/pyproject.toml index 9ced32718..98cb9fffc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -80,10 +80,12 @@ all = [ "jinja2", "jmespath", "lz4", + "minio", "ndindex", "numcodecs", "numpy", "numba >=0.59.0", # indirect, pinned to assist uv solve + "obstore", "openpyxl", "packaging", "pandas", @@ -156,6 +158,7 @@ dev = [ "ldap3", "locust", "matplotlib", + "minio", "mistune", "myst-parser", "numpydoc", @@ -242,10 +245,12 @@ server = [ "jinja2", "jmespath", "lz4", + "minio", "ndindex", "numba >=0.59.0", # indirect, pinned to assist uv solve "numcodecs", "numpy", + "obstore", "openpyxl", "packaging", "pandas", diff --git a/tiled/_tests/conftest.py b/tiled/_tests/conftest.py index a45459916..2112bd1e5 100644 --- a/tiled/_tests/conftest.py +++ b/tiled/_tests/conftest.py @@ -4,6 +4,7 @@ import tempfile from pathlib import Path from typing import Any +from urllib.parse import urlparse import asyncpg import pytest @@ -313,6 +314,40 @@ def redis_uri(): raise pytest.skip("No TILED_TEST_REDIS configured") +@pytest.fixture +def minio_uri(): + if uri := os.getenv("TILED_TEST_BUCKET"): + from minio import Minio + from minio.deleteobjects import DeleteObject + + # For convenience, we split the bucket from a string + url = urlparse(uri) + bucket = url.path.lstrip("/") + uri = url._replace(netloc="{}:{}".format(url.hostname, url.port), path="") + + client = Minio( + uri.geturl(), + access_key=url.username, + secret_key=url.password, + secure=False, + ) + + # Reset the state of the bucket after each test. + if client.bucket_exists(bucket): + delete_object_list = map( + lambda x: DeleteObject(x.object_name), + client.list_objects(bucket, recursive=True), + ) + errors = client.remove_objects(bucket, delete_object_list) + for error in errors: + print("error occurred when deleting object", error) + else: + client.make_bucket(bucket) + + else: + raise pytest.skip("No TILED_TEST_BUCKET configured") + + @pytest.fixture(scope="function") def tiled_websocket_context(tmpdir, redis_uri): """Fixture that provides a Tiled context with websocket support.""" diff --git a/tiled/_tests/test_writing.py b/tiled/_tests/test_writing.py index 1643e0dac..d022867ce 100644 --- a/tiled/_tests/test_writing.py +++ b/tiled/_tests/test_writing.py @@ -5,9 +5,11 @@ """ import base64 +import os import threading import uuid from datetime import datetime +from urllib.parse import urljoin, urlparse import awkward import dask.dataframe @@ -17,6 +19,8 @@ import pyarrow import pytest import sparse +from minio import Minio +from minio.error import S3Error from pandas.testing import assert_frame_equal from starlette.status import ( HTTP_404_NOT_FOUND, @@ -38,7 +42,7 @@ from ..structures.data_source import DataSource from ..structures.sparse import COOStructure from ..structures.table import TableStructure -from ..utils import APACHE_ARROW_FILE_MIME_TYPE, patch_mimetypes +from ..utils import APACHE_ARROW_FILE_MIME_TYPE, patch_mimetypes, sanitize_uri from ..validation_registration import ValidationRegistry from .utils import fail_with_status_code @@ -47,13 +51,55 @@ @pytest.fixture -def tree(tmpdir): - return in_memory( - writable_storage=[ - f"file://localhost{str(tmpdir / 'data')}", - f"duckdb:///{tmpdir / 'data.duckdb'}", - ] - ) +def tmp_minio_bucket(): + """Create a temporary MinIO bucket and clean it up after tests.""" + if uri := os.getenv("TILED_TEST_BUCKET"): + clean_uri, username, password = sanitize_uri(uri) + minio_client = Minio( + urlparse(clean_uri).netloc, # e.g. only "localhost:9000" + access_key=username or "minioadmin", + secret_key=password or "minioadmin", + secure=False, + ) + + bucket_name = f"test-{uuid.uuid4().hex}" + minio_client.make_bucket(bucket_name) + + try: + yield urljoin(uri, "/" + bucket_name) # full URI with credentials + finally: + # Cleanup: remove all objects and delete the bucket + try: + objects = minio_client.list_objects(bucket_name, recursive=True) + for obj in objects: + minio_client.remove_object(bucket_name, obj.object_name) + minio_client.remove_bucket(bucket_name) + except S3Error as e: + print(f"Warning: failed to delete test bucket {bucket_name}: {e}") + + else: + yield None + + +@pytest.fixture +def tree(tmpdir, tmp_minio_bucket): + writable_storage = [f"duckdb:///{tmpdir / 'data.duckdb'}"] + + if tmp_minio_bucket: + writable_storage.append( + { + "provider": "s3", + "uri": tmp_minio_bucket, + "config": { + "virtual_hosted_style_request": False, + "client_options": {"allow_http": True}, + }, + } + ) + + writable_storage.append(f"file://localhost{str(tmpdir / 'data')}") + + return in_memory(writable_storage=writable_storage) def test_write_array_full(tree): diff --git a/tiled/adapters/zarr.py b/tiled/adapters/zarr.py index 58125d2dc..d3b2d682e 100644 --- a/tiled/adapters/zarr.py +++ b/tiled/adapters/zarr.py @@ -1,9 +1,10 @@ +# mypy: ignore-errors import builtins import copy import os from importlib.metadata import version from typing import Any, Iterator, List, Optional, Set, Tuple, Union, cast -from urllib.parse import quote_plus +from urllib.parse import quote_plus, urljoin, urlparse import zarr from numpy._typing import NDArray @@ -17,7 +18,13 @@ from ..catalog.orm import Node from ..iterviews import ItemsView, KeysView, ValuesView from ..ndslice import NDSlice -from ..storage import FileStorage, Storage +from ..storage import ( + SUPPORTED_OBJECT_URI_SCHEMES, + FileStorage, + ObjectStorage, + Storage, + get_storage, +) from ..structures.array import ArrayStructure from ..structures.core import Spec, StructureFamily from ..structures.data_source import Asset, DataSource @@ -31,7 +38,8 @@ from zarr.storage import init_array as create_array else: from zarr import create_array - from zarr.storage import LocalStore + from zarr.storage import LocalStore, ObjectStore + INLINED_DEPTH = int(os.getenv("TILED_HDF5_INLINED_CONTENTS_MAX_DEPTH", "7")) @@ -60,22 +68,24 @@ def init_storage( path_parts: List[str], ) -> DataSource[ArrayStructure]: data_source = copy.deepcopy(data_source) # Do not mutate caller input. - data_uri = storage.uri + "".join( - f"/{quote_plus(segment)}" for segment in path_parts - ) + # Zarr requires evenly-sized chunks within each dimension. # Use the first chunk along each dimension. zarr_chunks = tuple(dim[0] for dim in data_source.structure.chunks) shape = tuple(dim[0] * len(dim) for dim in data_source.structure.chunks) - directory = path_from_uri(data_uri) - directory.mkdir(parents=True, exist_ok=True) - store = LocalStore(str(directory)) - create_array( - store, - shape=shape, - chunks=zarr_chunks, - dtype=data_source.structure.data_type.to_numpy_dtype(), + data_type = data_source.structure.data_type.to_numpy_dtype() + data_uri = urljoin( + storage.uri + "/", "/".join(quote_plus(segment) for segment in path_parts) ) + + if ZARR_LIB_V2: + zarr_store = LocalStore(str(path_from_uri(data_uri))) + else: + zarr_store = ObjectStore(store=storage.get_obstore_location(data_uri)) + + create_array(zarr_store, shape=shape, chunks=zarr_chunks, dtype=data_type) + + # Update data source to include the new asset data_source.assets.append( Asset( data_uri=data_uri, @@ -83,6 +93,7 @@ def init_storage( parameter="data_uri", ) ) + return data_source @property @@ -205,7 +216,7 @@ async def patch( @classmethod def supported_storage(cls) -> Set[type[Storage]]: - return {FileStorage} + return {FileStorage} if ZARR_LIB_V2 else {FileStorage, ObjectStorage} class ZarrGroupAdapter( @@ -298,10 +309,29 @@ def from_catalog( /, **kwargs: Optional[Any], ) -> Union[ZarrGroupAdapter, ZarrArrayAdapter]: - zarr_obj = zarr.open( - path_from_uri(data_source.assets[0].data_uri) - ) # Group or Array - if node.structure_family == StructureFamily.container: + is_container_type = node.structure_family == StructureFamily.container + uri = data_source.assets[0].data_uri + + if urlparse(uri).scheme == "file": + # This is a file-based Zarr storage + zarr_obj = zarr.open(path_from_uri(uri)) + + elif urlparse(uri).scheme in SUPPORTED_OBJECT_URI_SCHEMES: + # This is an object-store-based Zarr storage + storage = cast(ObjectStorage, get_storage(uri)) + _, _, prefix = storage.parse_blob_uri(uri) + zarr_store = ObjectStore(store=storage.get_obstore_location()) + # zarr_obj = zarr.open(store=zarr_store) + + if is_container_type: + zarr_obj = zarr.open_group(store=zarr_store, path=prefix) + else: + zarr_obj = zarr.open_array(store=zarr_store, path=prefix) + + else: + raise TypeError(f"Unsupported URI scheme in {uri}") + + if is_container_type: return ZarrGroupAdapter( zarr_obj, metadata=node.metadata_, diff --git a/tiled/catalog/adapter.py b/tiled/catalog/adapter.py index b8411eef4..c89f41d6f 100644 --- a/tiled/catalog/adapter.py +++ b/tiled/catalog/adapter.py @@ -74,7 +74,9 @@ from ..server.settings import DatabaseSettings from ..server.streaming import StreamingCache from ..storage import ( + SUPPORTED_OBJECT_URI_SCHEMES, FileStorage, + ObjectStorage, SQLStorage, get_storage, parse_storage, @@ -172,8 +174,8 @@ def __init__( ): self.engine = get_database_engine(database_settings) self.database_settings = database_settings - self.writable_storage = [] - self.readable_storage = set() + self.writable_storage = {} + self.readable_storage = {} # Back-compat: `writable_storage` used to be a dict: we want its values. if isinstance(writable_storage, dict): @@ -182,21 +184,23 @@ def __init__( if isinstance(writable_storage, (str, Path)): writable_storage = [writable_storage] if isinstance(readable_storage, str): - raise ValueError("readable_storage should be a list of URIs or paths") + raise ValueError( + "readable_storage should be a list of URIs, paths, or dicts" + ) for item in writable_storage or []: - self.writable_storage.append( - parse_storage( - item, pool_size=storage_pool_size, max_overflow=storage_max_overflow - ) + storage = parse_storage( + item, pool_size=storage_pool_size, max_overflow=storage_max_overflow ) + self.writable_storage[storage.uri] = storage for item in readable_storage or []: - self.readable_storage.add(parse_storage(item)) + storage = parse_storage(item) + self.readable_storage[storage.uri] = storage # Writable storage should also be readable. self.readable_storage.update(self.writable_storage) # Register all storage in a registry that enables Adapters to access # credentials (if applicable). - for item in self.readable_storage: + for item in self.readable_storage.values(): register_storage(item) self.key_maker = key_maker @@ -508,7 +512,7 @@ async def get_adapter(self): asset_path = path_from_uri(asset.data_uri) for readable_storage in { item - for item in self.context.readable_storage + for item in self.context.readable_storage.values() if isinstance(item, FileStorage) }: if ( @@ -692,13 +696,13 @@ async def create_node( ) adapter_cls = STORAGE_ADAPTERS_BY_MIMETYPE[data_source.mimetype] # Choose writable storage. Use the first writable storage item - # with a scheme that is supported by this adapter. # For - # back-compat, if an adapter does not declare `supported_storage` + # with a scheme that is supported by this adapter. + # For back-compat, if an adapter does not declare `supported_storage` # assume it supports file-based storage only. supported_storage = getattr( - adapter_cls, "supported_storage", {FileStorage} + adapter_cls, "supported_storage", lambda: {FileStorage} )() - for storage in self.context.writable_storage: + for storage in self.context.writable_storage.values(): if isinstance(storage, tuple(supported_storage)): break else: @@ -706,7 +710,7 @@ async def create_node( f"The adapter {adapter_cls} supports storage types " f"{[cls.__name__ for cls in supported_storage]} " "but the only available storage types " - f"are {self.context.writable_storage}." + f"are {self.context.writable_storage.values()}." ) data_source = await ensure_awaitable( adapter_cls.init_storage, @@ -1300,6 +1304,17 @@ def delete_asset(data_uri, is_directory, parameters=None): if cursor.fetchone()[0] == 0: cursor.execute(f'DROP TABLE IF EXISTS "{table_name}";') conn.commit() + + elif url.scheme in SUPPORTED_OBJECT_URI_SCHEMES: + storage = cast(ObjectStorage, get_storage(data_uri)) + store = storage.get_obstore_location() + + if prefix := data_uri.split(f"{storage.bucket}/", 1)[1]: + for batch in store.list(prefix=prefix): + store.delete([obj["path"] for obj in batch]) + else: + raise ValueError(f"Cannot delete the entire bucket: {storage.bucket!r}") + else: raise NotImplementedError( f"Cannot delete asset at {data_uri!r} because the scheme {url.scheme!r} is not supported." diff --git a/tiled/storage.py b/tiled/storage.py index f5d702e71..c9b1ed423 100644 --- a/tiled/storage.py +++ b/tiled/storage.py @@ -3,30 +3,36 @@ import os from abc import abstractmethod from pathlib import Path -from typing import TYPE_CHECKING, Dict, Optional, Union +from typing import TYPE_CHECKING, Dict, Literal, Optional, Union from urllib.parse import urlparse, urlunparse import sqlalchemy.pool +from .utils import ensure_uri, path_from_uri, sanitize_uri + if TYPE_CHECKING: import adbc_driver_manager.dbapi - -from .utils import ensure_uri, path_from_uri, sanitize_uri + from obstore.store import AzureStore, GCSStore, LocalStore, S3Store __all__ = [ "EmbeddedSQLStorage", "RemoteSQLStorage", "FileStorage", "SQLStorage", + "ObjectStorage", "Storage", "get_storage", "parse_storage", ] +SUPPORTED_OBJECT_URI_SCHEMES = {"http", "https"} # TODO: Add "s3", "gs", "azure", "az" + + @dataclasses.dataclass(frozen=True) class Storage: "Base class for representing storage location" + uri: str def __post_init__(self): @@ -41,6 +47,26 @@ class FileStorage(Storage): def path(self): return path_from_uri(self.uri) + def get_obstore_location(self, uri=None) -> "LocalStore": + """Get an obstore.store.LocalStore instance rooted at specified URI. + + Parameters + ---------- + uri: str, optional + """ + + if (uri is not None) and (not uri.startswith(self.uri)): + raise ValueError( + f"Requested URI {uri} is not within the base FileStorage URI {self.uri}" + ) + + from obstore.store import LocalStore + + directory = path_from_uri(uri) + directory.mkdir(parents=True, exist_ok=True) + + return LocalStore(directory) + def _ensure_writable_location(uri: str) -> Path: "Ensure path is writable to avoid a confusing error message from driver." @@ -58,6 +84,125 @@ def _ensure_writable_location(uri: str) -> Path: return filepath +@dataclasses.dataclass(frozen=True) +class ObjectStorage(Storage): + """Bucket storage location for BLOBS + + The uri should include the bucket, but not the prefix within the bucket. This + allows multiple ObjectStorage to point to different buckets with different + credentials. + + Parameters + ---------- + uri: str + Base URI, including bucket, but without prefix + provider: Literal["s3", "azure", "google"] + bucket: Optional[str] + Only required for s3 and google + username: Optional[str] + password: Optional[str] + config: dict + Additional configuration options passed to obstore store classes. + """ + + uri: str + provider: Literal["s3", "azure", "google"] + bucket: Optional[str] = None + username: Optional[str] = None + password: Optional[str] = None + config: dict = dataclasses.field(default_factory=dict) + + def __post_init__(self): + base_uri, bucket_name, _ = self.parse_blob_uri(ensure_uri(self.uri)) + base_uri, username, password = sanitize_uri(base_uri) + + if (username is not None) or (password is not None): + if ( + (self.username is not None) + or (self.password is not None) + or ("username" in self.config) + or ("password" in self.config) + ): + raise ValueError( + "Credentials passed in URI and in username, password, or config fields." + ) + object.__setattr__(self, "username", username) + object.__setattr__(self, "password", password) + + object.__setattr__(self, "uri", base_uri) + object.__setattr__(self, "bucket", bucket_name) + + @classmethod + def parse_blob_uri(cls, uri: str) -> tuple[str, str]: + """Split a blob URI into base URI, bucket name (optionally), and the prefix. + + For example, given 'http://example.com/bucket_name/path/to/blob', + return ('http://example.com', 'bucket_name', 'path/to/blob'). + """ + + # TODO: THIS NEEDS MORE WORK TO HANDLE S3, GCS, AZURE DIFFERENCES PROPERLY + # CURRENTLY ONLY HANDLES HTTP(S) STYLE URIS + + parsed_uri = urlparse(uri) + full_path = parsed_uri.path # includes bucket and the rest + bucket_name, *blob_path = full_path.strip("/").split("/", 1) + base_uri = f"{parsed_uri.scheme}://{parsed_uri.netloc}/{bucket_name}" + + return base_uri, bucket_name or None, "/".join(blob_path) + + def get_obstore_location( + self, uri=None + ) -> Union["S3Store", "AzureStore", "GCSStore"]: + """Get an obstore.store instance rooted at specified URI. + + Parameters + ---------- + uri: str, optional + The URI to use as the root for the obstore location. If not specified, use the base URI of + this ObjectStorage. + + Returns + ------- + An instance of obstore.store.S3Store, obstore.store.AzureStore, or obstore.store.GCSStore, + depending on the provider. + """ + + if (uri is not None) and (not uri.startswith(self.uri)): + raise ValueError( + f"Requested URI {uri} is not within the base ObjectStorage URI {self.uri}" + ) + + from obstore.store import AzureStore, GCSStore, S3Store + + # Build kwargs for the specific store class based on provider + if self.provider == "s3": + kwargs = { + "endpoint": self.uri.split(self.bucket, -1)[0], + "bucket": self.bucket, + } + if self.username is not None: + kwargs["access_key_id"] = self.username + if self.password is not None: + kwargs["secret_access_key"] = self.password + + elif self.provider == "azure": + kwargs = {"endpoint": self.uri} + if self.username is not None: + kwargs["client_id"] = self.username + if self.password is not None: + kwargs["client_secret"] = self.password + + elif self.provider == "google": + kwargs = {"url": self.uri.split(self.bucket, -1)[0], "bucket": self.bucket} + if self.password is not None: + kwargs["service_account_key"] = self.password + + _class = {"s3": S3Store, "azure": AzureStore, "google": GCSStore}[self.provider] + prefix = uri[len(self.uri) :].lstrip("/") if uri else None # noqa: E203 + + return _class(**kwargs, **self.config, prefix=prefix) + + @dataclasses.dataclass(frozen=True) class SQLStorage(Storage): "General purpose SQL database storage with connection pooling" @@ -146,6 +291,7 @@ def create_adbc_connection(self) -> "adbc_driver_manager.dbapi.Connection": @dataclasses.dataclass(frozen=True) class RemoteSQLStorage(SQLStorage): "Authenticated server-based SQL database storage location" + username: Optional[str] = None password: Optional[str] = None @@ -171,6 +317,9 @@ def __repr__(self): @functools.cached_property def authenticated_uri(self): + if self.username is None and self.password is None: + return self.uri + parsed_uri = urlparse(self.uri) components = ( parsed_uri.scheme, @@ -190,24 +339,33 @@ def create_adbc_connection(self) -> "adbc_driver_manager.dbapi.Connection": def parse_storage( - item: Union[Path, str], + item: Union[Path, str, dict], *, pool_size: int = 5, max_overflow: int = 10, ) -> Storage: "Create a Storage object from a URI or Path." - item = ensure_uri(item) - scheme = urlparse(item).scheme - if scheme == "file": - result = FileStorage(item) - elif scheme == "postgresql": - result = RemoteSQLStorage(item, pool_size=pool_size, max_overflow=max_overflow) - elif scheme in {"sqlite", "duckdb"}: - result = EmbeddedSQLStorage( - item, pool_size=pool_size, max_overflow=max_overflow + if isinstance(item, dict): + result = ObjectStorage( + uri=item["uri"], + provider=item["provider"], + config=item.get("config", {}), ) else: - raise ValueError(f"writable_storage item {item} has unrecognized scheme") + item = ensure_uri(item) + scheme = urlparse(item).scheme + if scheme == "file": + result = FileStorage(item) + elif scheme == "postgresql": + result = RemoteSQLStorage( + item, pool_size=pool_size, max_overflow=max_overflow + ) + elif scheme in {"sqlite", "duckdb"}: + result = EmbeddedSQLStorage( + item, pool_size=pool_size, max_overflow=max_overflow + ) + else: + raise ValueError(f"writable_storage item {item} has unrecognized scheme") return result @@ -224,4 +382,8 @@ def register_storage(storage: Storage) -> None: def get_storage(uri: str) -> Storage: "Look up Storage by URI." + + if urlparse(uri).scheme in SUPPORTED_OBJECT_URI_SCHEMES: + uri, _, _ = ObjectStorage.parse_blob_uri(uri) + return _STORAGE[uri]