-
Notifications
You must be signed in to change notification settings - Fork 14
[FEAT] Create obstore store in fsspec on demand #198
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
kylebarron
merged 58 commits into
developmentseed:main
from
machichima:obstore-instance-in-fsspec
Feb 28, 2025
Merged
Changes from 5 commits
Commits
Show all changes
58 commits
Select commit
Hold shift + click to select a range
909b5b0
feat: split bucket from path + construct store
machichima 29464a7
feat: remove store + add protocol + apply to all methods
machichima a0d9e1d
feat: inherit from AsyncFsspecStore to specify protocol
machichima 6614906
fix: correctly split protocol if exists in path
machichima 75c738e
feat: use urlparse to extract protocol
machichima 2209839
Merge branch 'main' into obstore-instance-in-fsspec
kylebarron 46c6b59
update typing
kylebarron 9ab35e1
fix: unbounded error
machichima cb80495
fix: remove redundant import
machichima b6a3d3a
feat: add register() to register AsyncFsspecStore for provided protocol
machichima 68cdff9
feat: add validation for protocol in register()
machichima fa5b539
test: for register()
machichima b704779
feat: add async parameter for register()
machichima 61deac4
test: test async store created by register()
machichima 4bc1599
feat: add http(s) into protocol_with_bucket list
machichima 4dc9143
feat: ls return path with bucket name
machichima fb607d0
feat: enable re-register same protocol
machichima b74948a
test: update pytest fixture to use register()
machichima f6ba27c
test: update test with new path format
machichima 30250cf
fix: mkdocs build error
machichima 4a8e6fc
Merge branch 'main' into obstore-instance-in-fsspec
machichima 27a0ac7
fix: error when merging
machichima d2d0235
build: add some ruff ignore
machichima 1f97703
fix: ruff error
machichima b002afb
build: add cachetools dependencies
machichima 9088104
Merge branch 'main' into obstore-instance-in-fsspec
machichima 897beb0
better scoping of lints
kylebarron 0726999
lint
kylebarron 5b87c46
fix: update lru_cache + clean class attribute
machichima 79a03f7
Merge branch 'main' into obstore-instance-in-fsspec
machichima dc4215d
fix some bugs when using get/put/cp/info/ls
machichima f59152b
Merge branch 'main' into obstore-instance-in-fsspec
machichima 4896ba3
fix: declare lru_cache in __init__
machichima c9378b8
fix: make AsyncFsspecStore cachable
machichima 549a4ac
test: for cache constructed store and filesystem obj
machichima a93fe2e
build: remove dependencies
machichima a54a3fe
fix: prevent send folder path to cat_file
machichima c804a18
fix: enable cp folders
machichima 6c2c513
lint
machichima c6392f2
fix: clobber=False to prevent re-register and cause memory leak
machichima 347e63e
test: clean up after each test to prevent memory leak
machichima 69bbed6
Merge branch 'main' into obstore-instance-in-fsspec
machichima 9e423f5
Simplify protocol registration
kylebarron 096845c
fix+test: register check types
machichima 5ea2ba8
small edits
kylebarron 12560f4
fix+test: update conftest
machichima 6e200e9
style: format
machichima 4a653b3
feat: enable setting protocol in __init__
machichima f5147bc
docs: update example in docstring
machichima 45c4020
fix: better split path way
machichima 4e1f4e7
fix: split path for protocol with no bucket properly
machichima a212996
test: for split path
machichima 278cc38
refactor: take out _fill_bucket_name function
machichima 92a70f5
refactor: take out runtime type check for register
machichima 969dbff
test: remove test register invalid type
machichima 6c409c8
Merge branch 'main' into obstore-instance-in-fsspec
kylebarron e443668
Switch to checking if protocol does not require bucket
kylebarron 4949348
Warn on unknown protocol
kylebarron File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,15 +22,17 @@ | |
|
||
from __future__ import annotations | ||
|
||
from urllib.parse import urlparse | ||
import asyncio | ||
from collections import defaultdict | ||
from functools import lru_cache | ||
from typing import Any, Coroutine, Dict, List, Tuple | ||
|
||
import fsspec.asyn | ||
import fsspec.spec | ||
|
||
import obstore as obs | ||
|
||
from obstore.store import from_url | ||
|
||
class AsyncFsspecStore(fsspec.asyn.AsyncFileSystem): | ||
"""An fsspec implementation based on a obstore Store. | ||
|
@@ -43,8 +45,10 @@ class AsyncFsspecStore(fsspec.asyn.AsyncFileSystem): | |
|
||
def __init__( | ||
self, | ||
store: obs.store.ObjectStore, | ||
*args, | ||
config: dict[str, Any] = {}, | ||
client_options: dict[str, Any] = {}, | ||
retry_config: dict[str, Any] = {}, | ||
asynchronous: bool = False, | ||
loop=None, | ||
batch_size: int | None = None, | ||
|
@@ -75,26 +79,79 @@ def __init__( | |
``` | ||
""" | ||
|
||
self.store = store | ||
self.config = config | ||
self.client_options = client_options | ||
self.retry_config = retry_config | ||
|
||
super().__init__( | ||
*args, asynchronous=asynchronous, loop=loop, batch_size=batch_size | ||
) | ||
|
||
def _split_path(self, path: str) -> Tuple[str, str]: | ||
""" | ||
Split bucket and file path | ||
|
||
Args: | ||
path (str): Input path, like `s3://mybucket/path/to/file` | ||
|
||
Examples: | ||
>>> split_path("s3://mybucket/path/to/file") | ||
['mybucket', 'path/to/file'] | ||
""" | ||
|
||
protocol_with_bucket = ["s3", "s3a", "gcs", "gs", "abfs"] | ||
|
||
if not self.protocol in protocol_with_bucket: | ||
# no bucket name in path | ||
return "", path | ||
|
||
res = urlparse(path) | ||
if res.scheme: | ||
if res.scheme != self.protocol: | ||
raise ValueError(f"Expect protocol to be {self.protocol}. Got {res.schema}") | ||
kylebarron marked this conversation as resolved.
Show resolved
Hide resolved
|
||
path = res.netloc + res.path | ||
|
||
if "/" not in path: | ||
return path, "" | ||
kylebarron marked this conversation as resolved.
Show resolved
Hide resolved
|
||
else: | ||
path_li = path.split("/") | ||
bucket = path_li[0] | ||
file_path = "/".join(path_li[1:]) | ||
return (bucket, file_path) | ||
|
||
@lru_cache(maxsize=10) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be nice if this cache size could be user specified but we can come back to it |
||
def _construct_store(self, bucket: str): | ||
return from_url( | ||
url=f"{self.protocol}://{bucket}", | ||
config=self.config, | ||
client_options=self.client_options, | ||
retry_config=self.retry_config if self.retry_config else None, | ||
) | ||
|
||
async def _rm_file(self, path, **kwargs): | ||
return await obs.delete_async(self.store, path) | ||
bucket, path = self._split_path(path) | ||
store = self._construct_store(bucket) | ||
return await obs.delete_async(store, path) | ||
|
||
async def _cp_file(self, path1, path2, **kwargs): | ||
return await obs.copy_async(self.store, path1, path2) | ||
bucket, path = self._split_path(path) | ||
store = self._construct_store(bucket) | ||
return await obs.copy_async(store, path1, path2) | ||
|
||
async def _pipe_file(self, path, value, **kwargs): | ||
return await obs.put_async(self.store, path, value) | ||
bucket, path = self._split_path(path) | ||
store = self._construct_store(bucket) | ||
return await obs.put_async(store, path, value) | ||
|
||
async def _cat_file(self, path, start=None, end=None, **kwargs): | ||
bucket, path = self._split_path(path) | ||
store = self._construct_store(bucket) | ||
|
||
if start is None and end is None: | ||
resp = await obs.get_async(self.store, path) | ||
return await resp.bytes_async() | ||
resp = await obs.get_async(store, path) | ||
return (await resp.bytes_async()).to_bytes() | ||
|
||
range_bytes = await obs.get_range_async(self.store, path, start=start, end=end) | ||
range_bytes = await obs.get_range_async(store, path, start=start, end=end) | ||
return range_bytes.to_bytes() | ||
|
||
async def _cat_ranges( | ||
|
@@ -107,6 +164,9 @@ async def _cat_ranges( | |
on_error="return", | ||
**kwargs, | ||
): | ||
bucket, path = self._split_path(path) | ||
store = self._construct_store(bucket) | ||
|
||
if isinstance(starts, int): | ||
starts = [starts] * len(paths) | ||
if isinstance(ends, int): | ||
|
@@ -122,7 +182,7 @@ async def _cat_ranges( | |
for path, ranges in per_file_requests.items(): | ||
offsets = [r[0] for r in ranges] | ||
ends = [r[1] for r in ranges] | ||
fut = obs.get_ranges_async(self.store, path, starts=offsets, ends=ends) | ||
fut = obs.get_ranges_async(store, path, starts=offsets, ends=ends) | ||
futs.append(fut) | ||
|
||
result = await asyncio.gather(*futs) | ||
|
@@ -137,17 +197,24 @@ async def _cat_ranges( | |
return output_buffers | ||
|
||
async def _put_file(self, lpath, rpath, **kwargs): | ||
bucket, path = self._split_path(path) | ||
store = self._construct_store(bucket) | ||
with open(lpath, "rb") as f: | ||
await obs.put_async(self.store, rpath, f) | ||
await obs.put_async(store, rpath, f) | ||
|
||
async def _get_file(self, rpath, lpath, **kwargs): | ||
bucket, path = self._split_path(path) | ||
store = self._construct_store(bucket) | ||
with open(lpath, "wb") as f: | ||
resp = await obs.get_async(self.store, rpath) | ||
resp = await obs.get_async(store, rpath) | ||
async for buffer in resp.stream(): | ||
f.write(buffer) | ||
|
||
async def _info(self, path, **kwargs): | ||
head = await obs.head_async(self.store, path) | ||
bucket, path = self._split_path(path) | ||
store = self._construct_store(bucket) | ||
|
||
head = await obs.head_async(store, path) | ||
return { | ||
# Required of `info`: (?) | ||
"name": head["path"], | ||
|
@@ -160,7 +227,10 @@ async def _info(self, path, **kwargs): | |
} | ||
|
||
async def _ls(self, path, detail=True, **kwargs): | ||
result = await obs.list_with_delimiter_async(self.store, path) | ||
bucket, path = self._split_path(path) | ||
store = self._construct_store(bucket) | ||
|
||
result = await obs.list_with_delimiter_async(store, path) | ||
objects = result["objects"] | ||
prefs = result["common_prefixes"] | ||
if detail: | ||
|
@@ -178,6 +248,9 @@ async def _ls(self, path, detail=True, **kwargs): | |
|
||
def _open(self, path, mode="rb", **kwargs): | ||
"""Return raw bytes-mode file-like from the file-system""" | ||
bucket, path = self._split_path(path) | ||
store = self._construct_store(bucket) | ||
|
||
return BufferedFileSimple(self, path, mode, **kwargs) | ||
|
||
|
||
|
@@ -201,3 +274,13 @@ def read(self, length: int = -1): | |
data = self.fs.cat_file(self.path, self.loc, self.loc + length) | ||
self.loc += length | ||
return data | ||
|
||
|
||
class S3FsspecStore(AsyncFsspecStore): | ||
protocol = "s3" | ||
|
||
class GCSFsspecStore(AsyncFsspecStore): | ||
protocol = "gs" | ||
|
||
class AzureFsspecStore(AsyncFsspecStore): | ||
protocol = "abfs" |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we allow these, store should be optional?
And before merge we should enable typing overloads for better typing. You can see how from_url is implemented
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I use store here for deciding the store Interface (whether it is S3Store, GCSStore, ...), so that in AsyncFsspecStore we don't need to decide the interface based on the protocol.
Maybe there's a better way of deciding the store interface?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll have a look at the typing later on
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh that's confusing because store is the type of the class and not an instance.
We should be able to use the from_url top level function directly here?