-
Notifications
You must be signed in to change notification settings - Fork 60
🪣 Support for Blob Storage #955
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
b9f402c
bafff0c
21a2457
e683956
54797ae
f6535fc
b5c6a0f
6f1488e
decdc81
cd8f110
e885a0d
0c87737
3518072
e6d6f07
9f6d773
de65c4d
99c1215
909ed03
e35e6ac
3b95d80
643d39e
511cf00
2837cb4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
# Create a local bucket for testing access to BLOBS | ||
|
||
In this example there exists: | ||
- A `docker-compose.yml` file capable of instantiating and running a [Minio](https://min.io/) container. | ||
- A configuration yaml file `s3_style_storage.yml` which contains information tiled needs to authenticate with the bucket storage system and write / read Binary Large Objects (BLOBS) through the Zaar adapter. | ||
|
||
## How to run this example: | ||
1. In one terminal window, navigate to the directory where the `docker-compose.yml` and `s3_style_storage.yml` are. | ||
2. Run `docker compose up` with adequate permissions. | ||
3. Open another terminal window in the same location and run `tiled serve config s3_style_storage.yml --api-key secret` | ||
4. You will need to create a `storage` directory in `/example_configs/bucket_storage` for the sqlite database. | ||
5. Create an `ipython` session and run the following commands to write array data as a BLOB in a bucket: | ||
```python | ||
from tiled.client import from_uri | ||
c = from_uri('http://localhost:8000', api_key='secret') | ||
c.write_array([1,2,3]) | ||
``` | ||
6. You will be able to see the written data in the bucket if you log in to the minio container, exposed on your machine at `http://localhost:9001/login`. </br> Use testing credentials `minioadmin` for both fields. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
version: "3.2" | ||
services: | ||
minio: | ||
image: minio/minio:latest | ||
ports: | ||
- 9000:9000 | ||
- 9001:9001 | ||
volumes: | ||
- minio-data:/data | ||
environment: | ||
MINIO_ROOT_USER: "minioadmin" | ||
MINIO_ROOT_PASSWORD: "minioadmin" | ||
command: server /data --console-address :9001 | ||
restart: unless-stopped | ||
|
||
create-bucket: | ||
image: minio/mc:latest | ||
environment: | ||
MC_HOST_minio: http://minioadmin:minioadmin@minio:9000 | ||
entrypoint: | ||
- sh | ||
- -c | ||
- | | ||
until mc ls minio > /dev/null 2>&1; do | ||
sleep 0.5 | ||
done | ||
|
||
mc mb --ignore-existing minio/buck | ||
|
||
volumes: | ||
minio-data: |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
authentication: | ||
allow_anonymous_access: false | ||
trees: | ||
- path: / | ||
tree: catalog | ||
args: | ||
uri: "sqlite:///storage/catalog.db" | ||
writable_storage: | ||
bucket: | ||
uri: "http://localhost:9000/buck" | ||
key: "minioadmin" | ||
secret: "minioadmin" | ||
init_if_not_exists: true |
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -3,8 +3,9 @@ | |||||||||
import os | ||||||||||
from collections.abc import Mapping | ||||||||||
from typing import Any, Iterator, List, Optional, Tuple, Union, cast | ||||||||||
from urllib.parse import quote_plus | ||||||||||
from urllib.parse import quote_plus, urlparse | ||||||||||
|
||||||||||
import s3fs | ||||||||||
import zarr.core | ||||||||||
import zarr.hierarchy | ||||||||||
import zarr.storage | ||||||||||
|
@@ -49,16 +50,28 @@ def init_storage( | |||||||||
|
||||||||||
""" | ||||||||||
data_source = copy.deepcopy(data_source) # Do not mutate caller input. | ||||||||||
data_uri = storage.uri + "".join( | ||||||||||
f"/{quote_plus(segment)}" for segment in path_parts | ||||||||||
) | ||||||||||
# Zarr requires evenly-sized chunks within each dimension. | ||||||||||
# Use the first chunk along each dimension. | ||||||||||
zarr_chunks = tuple(dim[0] for dim in data_source.structure.chunks) | ||||||||||
shape = tuple(dim[0] * len(dim) for dim in data_source.structure.chunks) | ||||||||||
directory = path_from_uri(data_uri) | ||||||||||
directory.mkdir(parents=True, exist_ok=True) | ||||||||||
store = zarr.storage.DirectoryStore(str(directory)) | ||||||||||
if storage.bucket: | ||||||||||
data_uri = storage.bucket.uri | ||||||||||
s3 = s3fs.S3FileSystem( | ||||||||||
client_kwargs={"endpoint_url": data_uri}, | ||||||||||
key=storage.bucket.key, | ||||||||||
secret=storage.bucket.secret, | ||||||||||
use_ssl=False, | ||||||||||
) | ||||||||||
store = s3fs.S3Map( | ||||||||||
root="".join(f"/{quote_plus(segment)}" for segment in path_parts), s3=s3 | ||||||||||
) | ||||||||||
else: | ||||||||||
data_uri = storage.get("filesystem") + "".join( | ||||||||||
f"/{quote_plus(segment)}" for segment in path_parts | ||||||||||
) | ||||||||||
directory = path_from_uri(data_uri) | ||||||||||
directory.mkdir(parents=True, exist_ok=True) | ||||||||||
store = zarr.storage.DirectoryStore(str(directory)) | ||||||||||
zarr.storage.init_array( | ||||||||||
store, | ||||||||||
shape=shape, | ||||||||||
|
@@ -365,9 +378,12 @@ def from_catalog( | |||||||||
/, | ||||||||||
**kwargs: Optional[Any], | ||||||||||
) -> Union[ZarrGroupAdapter, ArrayAdapter]: | ||||||||||
zarr_obj = zarr.open( | ||||||||||
path_from_uri(data_source.assets[0].data_uri) | ||||||||||
) # Group or Array | ||||||||||
parsed = urlparse(data_source.assets[0].data_uri) | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Take a look at what There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Lines 71 to 74 in f3331ef
|
||||||||||
if parsed.scheme in {"http", "https", "s3"}: | ||||||||||
uri = data_source.assets[0].data_uri | ||||||||||
else: | ||||||||||
uri = path_from_uri(data_source.assets[0].data_uri) | ||||||||||
zarr_obj = zarr.open(uri) # Group or Array | ||||||||||
if node.structure_family == StructureFamily.container: | ||||||||||
return ZarrGroupAdapter( | ||||||||||
zarr_obj, | ||||||||||
|
@@ -394,4 +410,4 @@ def from_uris( | |||||||||
return ZarrGroupAdapter(zarr_obj, **kwargs) | ||||||||||
else: | ||||||||||
structure = ArrayStructure.from_array(zarr_obj) | ||||||||||
return ZarrArrayAdapter(zarr_obj, structure=structure, **kwargs) | ||||||||||
return ZarrArrayAdapter(zarr_obj, structure=structure, **kwargs) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,6 +9,7 @@ | |
__all__ = [ | ||
"EmbeddedSQLStorage", | ||
"FileStorage", | ||
"BucketStorage", | ||
"SQLStorage", | ||
"Storage", | ||
"get_storage", | ||
|
@@ -19,6 +20,7 @@ | |
@dataclasses.dataclass(frozen=True) | ||
class Storage: | ||
"Base class for representing storage location" | ||
|
||
uri: str | ||
|
||
def __post_init__(self): | ||
|
@@ -34,6 +36,20 @@ def path(self): | |
return path_from_uri(self.uri) | ||
|
||
|
||
@dataclasses.dataclass(frozen=True) | ||
class BucketStorage: | ||
"Bucket storage location for BLOBS" | ||
uri: str | ||
key: Optional[str] | ||
secret: Optional[str] | ||
|
||
def __post_init__(self): | ||
object.__setattr__(self, "uri", ensure_uri(self.uri)) | ||
parsed_uri = urlparse(self.uri) | ||
if not parsed_uri.path or parsed_uri.path == "/": | ||
raise ValueError(f"URI must contain a path attribute: {self.uri}") | ||
|
||
|
||
@dataclasses.dataclass(frozen=True) | ||
class EmbeddedSQLStorage(Storage): | ||
"File-based SQL database storage location" | ||
|
@@ -42,6 +58,7 @@ class EmbeddedSQLStorage(Storage): | |
@dataclasses.dataclass(frozen=True) | ||
class SQLStorage(Storage): | ||
"File-based SQL database storage location" | ||
|
||
username: Optional[str] = None | ||
password: Optional[str] = None | ||
|
||
|
@@ -92,6 +109,8 @@ def parse_storage(item: Union[Path, str]) -> Storage: | |
result = FileStorage(item) | ||
elif scheme == "postgresql": | ||
result = SQLStorage(item) | ||
elif scheme == "bucket": | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There won't be a bucket scheme. Instead, this function must be extended to accept We currently accept SQL creds like this only: - postgresql://username:password@host:port/database but we could additionally accept a more structured input: - uri: postgresql://host:port/database
username: username
password: password And buckets would of course be similar. |
||
result = BucketStorage(item) | ||
elif scheme in {"sqlite", "duckdb"}: | ||
result = EmbeddedSQLStorage(item) | ||
else: | ||
|
@@ -112,4 +131,4 @@ def register_storage(storage: Storage) -> None: | |
|
||
def get_storage(uri: str) -> Storage: | ||
"Look up Storage by URI." | ||
return _STORAGE[uri] | ||
return _STORAGE[uri] |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -574,11 +574,13 @@ class UnsupportedQueryType(TypeError): | |
|
||
class Conflicts(Exception): | ||
"Prompts the server to send 409 Conflicts with message" | ||
|
||
pass | ||
|
||
|
||
class BrokenLink(Exception): | ||
"Prompts the server to send 410 Gone with message" | ||
|
||
pass | ||
|
||
|
||
|
@@ -733,7 +735,8 @@ def path_from_uri(uri) -> Path: | |
path = Path(parsed.path[1:]) | ||
else: | ||
raise ValueError( | ||
"Supported schemes are 'file', 'sqlite', and 'duckdb'. " | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No change to this function. Now, unlike when we started this PR, |
||
"Supported schemes are 'file', 'sqlite', and 'duckdb'." | ||
"For bucket storage, 'http', 'https', and 's3' are supported." | ||
f"Did not recognize scheme {parsed.scheme!r}" | ||
) | ||
return path | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the old model, the adapter was handed multiple storage options and had to pick the one it wanted. Now, the caller in
tiled.catalog.adapter
picks one storage option and passes just that one in.tiled/tiled/catalog/adapter.py
Lines 672 to 695 in f3331ef
So, the task here is to check
isinstance(storage, BucketStorage)
versusFileStorage
. Additionally, thesupported_storage
attribute on this class should be extended to includeBucketStorage
. This tells the caller to offerBucketStorage
if that is the highest-priority item inwritable_storage
.