-
Notifications
You must be signed in to change notification settings - Fork 14
[FEAT] Create obstore store in fsspec on demand #198
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
909b5b0
29464a7
a0d9e1d
6614906
75c738e
2209839
46c6b59
9ab35e1
cb80495
b6a3d3a
68cdff9
fa5b539
b704779
61deac4
4bc1599
4dc9143
fb607d0
b74948a
f6ba27c
30250cf
4a8e6fc
27a0ac7
d2d0235
1f97703
b002afb
9088104
897beb0
0726999
5b87c46
79a03f7
dc4215d
f59152b
4896ba3
c9378b8
549a4ac
a93fe2e
a54a3fe
c804a18
6c2c513
c6392f2
347e63e
69bbed6
9e423f5
096845c
5ea2ba8
12560f4
6e200e9
4a653b3
f5147bc
45c4020
4e1f4e7
a212996
278cc38
92a70f5
969dbff
6c409c8
e443668
4949348
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -24,13 +24,14 @@ | |||||||||||
|
||||||||||||
import asyncio | ||||||||||||
from collections import defaultdict | ||||||||||||
from functools import lru_cache | ||||||||||||
from typing import Any, Coroutine, Dict, List, Tuple | ||||||||||||
|
||||||||||||
import fsspec.asyn | ||||||||||||
import fsspec.spec | ||||||||||||
|
||||||||||||
import obstore as obs | ||||||||||||
|
||||||||||||
from obstore.store import from_url | ||||||||||||
|
||||||||||||
class AsyncFsspecStore(fsspec.asyn.AsyncFileSystem): | ||||||||||||
"""An fsspec implementation based on a obstore Store. | ||||||||||||
|
@@ -43,8 +44,10 @@ class AsyncFsspecStore(fsspec.asyn.AsyncFileSystem): | |||||||||||
|
||||||||||||
def __init__( | ||||||||||||
self, | ||||||||||||
store: obs.store.ObjectStore, | ||||||||||||
*args, | ||||||||||||
config: dict[str, Any] = {}, | ||||||||||||
client_options: dict[str, Any] = {}, | ||||||||||||
retry_config: dict[str, Any] = {}, | ||||||||||||
asynchronous: bool = False, | ||||||||||||
loop=None, | ||||||||||||
batch_size: int | None = None, | ||||||||||||
|
@@ -75,26 +78,79 @@ def __init__( | |||||||||||
``` | ||||||||||||
""" | ||||||||||||
|
||||||||||||
self.store = store | ||||||||||||
self.config = config | ||||||||||||
self.client_options = client_options | ||||||||||||
self.retry_config = retry_config | ||||||||||||
|
||||||||||||
super().__init__( | ||||||||||||
*args, asynchronous=asynchronous, loop=loop, batch_size=batch_size | ||||||||||||
) | ||||||||||||
|
||||||||||||
def _split_path(self, path: str) -> Tuple[str, str]: | ||||||||||||
""" | ||||||||||||
Split bucket and file path | ||||||||||||
|
||||||||||||
Args: | ||||||||||||
path (str): Input path, like `s3://mybucket/path/to/file` | ||||||||||||
|
||||||||||||
Examples: | ||||||||||||
>>> split_path("s3://mybucket/path/to/file") | ||||||||||||
['mybucket', 'path/to/file'] | ||||||||||||
""" | ||||||||||||
|
||||||||||||
protocol_with_bucket = ["s3", "s3a", "gcs", "gs", "abfs"] | ||||||||||||
|
||||||||||||
if not self.protocol in protocol_with_bucket: | ||||||||||||
# no bucket name in path | ||||||||||||
return "", path | ||||||||||||
|
||||||||||||
if path.startswith(self.protocol + "://"): | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Assuming that this function will always receive something a URL like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It will not always be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I use urlparse like this here, which works for both obstore/obstore/python/obstore/fsspec.py Lines 108 to 112 in 75c738e
|
||||||||||||
path = path[len(self.protocol) + 3 :] | ||||||||||||
elif path.startswith(self.protocol + "::"): | ||||||||||||
kylebarron marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||
path = path[len(self.protocol) + 2 :] | ||||||||||||
path = path.rstrip("/") | ||||||||||||
|
||||||||||||
if "/" not in path: | ||||||||||||
return path, "" | ||||||||||||
kylebarron marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||
else: | ||||||||||||
path_li = path.split("/") | ||||||||||||
bucket = path_li[0] | ||||||||||||
file_path = "/".join(path_li[1:]) | ||||||||||||
return (bucket, file_path) | ||||||||||||
|
||||||||||||
@lru_cache(maxsize=10) | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be nice if this cache size could be user specified but we can come back to it |
||||||||||||
def _construct_store(self, bucket: str): | ||||||||||||
return from_url( | ||||||||||||
url=f"{self.protocol}://{bucket}", | ||||||||||||
config=self.config, | ||||||||||||
client_options=self.client_options, | ||||||||||||
retry_config=self.retry_config if self.retry_config else None, | ||||||||||||
) | ||||||||||||
|
||||||||||||
async def _rm_file(self, path, **kwargs): | ||||||||||||
return await obs.delete_async(self.store, path) | ||||||||||||
bucket, path = self._split_path(path) | ||||||||||||
store = self._construct_store(bucket) | ||||||||||||
return await obs.delete_async(store, path) | ||||||||||||
|
||||||||||||
async def _cp_file(self, path1, path2, **kwargs): | ||||||||||||
return await obs.copy_async(self.store, path1, path2) | ||||||||||||
bucket, path = self._split_path(path) | ||||||||||||
store = self._construct_store(bucket) | ||||||||||||
return await obs.copy_async(store, path1, path2) | ||||||||||||
|
||||||||||||
async def _pipe_file(self, path, value, **kwargs): | ||||||||||||
return await obs.put_async(self.store, path, value) | ||||||||||||
bucket, path = self._split_path(path) | ||||||||||||
store = self._construct_store(bucket) | ||||||||||||
return await obs.put_async(store, path, value) | ||||||||||||
|
||||||||||||
async def _cat_file(self, path, start=None, end=None, **kwargs): | ||||||||||||
bucket, path = self._split_path(path) | ||||||||||||
store = self._construct_store(bucket) | ||||||||||||
|
||||||||||||
if start is None and end is None: | ||||||||||||
resp = await obs.get_async(self.store, path) | ||||||||||||
return await resp.bytes_async() | ||||||||||||
resp = await obs.get_async(store, path) | ||||||||||||
return (await resp.bytes_async()).to_bytes() | ||||||||||||
|
||||||||||||
range_bytes = await obs.get_range_async(self.store, path, start=start, end=end) | ||||||||||||
range_bytes = await obs.get_range_async(store, path, start=start, end=end) | ||||||||||||
return range_bytes.to_bytes() | ||||||||||||
|
||||||||||||
async def _cat_ranges( | ||||||||||||
|
@@ -107,6 +163,9 @@ async def _cat_ranges( | |||||||||||
on_error="return", | ||||||||||||
**kwargs, | ||||||||||||
): | ||||||||||||
bucket, path = self._split_path(path) | ||||||||||||
store = self._construct_store(bucket) | ||||||||||||
|
||||||||||||
if isinstance(starts, int): | ||||||||||||
starts = [starts] * len(paths) | ||||||||||||
if isinstance(ends, int): | ||||||||||||
|
@@ -122,7 +181,7 @@ async def _cat_ranges( | |||||||||||
for path, ranges in per_file_requests.items(): | ||||||||||||
offsets = [r[0] for r in ranges] | ||||||||||||
ends = [r[1] for r in ranges] | ||||||||||||
fut = obs.get_ranges_async(self.store, path, starts=offsets, ends=ends) | ||||||||||||
fut = obs.get_ranges_async(store, path, starts=offsets, ends=ends) | ||||||||||||
futs.append(fut) | ||||||||||||
|
||||||||||||
result = await asyncio.gather(*futs) | ||||||||||||
|
@@ -137,17 +196,24 @@ async def _cat_ranges( | |||||||||||
return output_buffers | ||||||||||||
|
||||||||||||
async def _put_file(self, lpath, rpath, **kwargs): | ||||||||||||
bucket, path = self._split_path(path) | ||||||||||||
store = self._construct_store(bucket) | ||||||||||||
with open(lpath, "rb") as f: | ||||||||||||
await obs.put_async(self.store, rpath, f) | ||||||||||||
await obs.put_async(store, rpath, f) | ||||||||||||
|
||||||||||||
async def _get_file(self, rpath, lpath, **kwargs): | ||||||||||||
bucket, path = self._split_path(path) | ||||||||||||
store = self._construct_store(bucket) | ||||||||||||
with open(lpath, "wb") as f: | ||||||||||||
resp = await obs.get_async(self.store, rpath) | ||||||||||||
resp = await obs.get_async(store, rpath) | ||||||||||||
async for buffer in resp.stream(): | ||||||||||||
f.write(buffer) | ||||||||||||
|
||||||||||||
async def _info(self, path, **kwargs): | ||||||||||||
head = await obs.head_async(self.store, path) | ||||||||||||
bucket, path = self._split_path(path) | ||||||||||||
store = self._construct_store(bucket) | ||||||||||||
|
||||||||||||
head = await obs.head_async(store, path) | ||||||||||||
return { | ||||||||||||
# Required of `info`: (?) | ||||||||||||
"name": head["path"], | ||||||||||||
|
@@ -160,7 +226,10 @@ async def _info(self, path, **kwargs): | |||||||||||
} | ||||||||||||
|
||||||||||||
async def _ls(self, path, detail=True, **kwargs): | ||||||||||||
result = await obs.list_with_delimiter_async(self.store, path) | ||||||||||||
bucket, path = self._split_path(path) | ||||||||||||
store = self._construct_store(bucket) | ||||||||||||
|
||||||||||||
result = await obs.list_with_delimiter_async(store, path) | ||||||||||||
objects = result["objects"] | ||||||||||||
prefs = result["common_prefixes"] | ||||||||||||
if detail: | ||||||||||||
|
@@ -178,6 +247,9 @@ async def _ls(self, path, detail=True, **kwargs): | |||||||||||
|
||||||||||||
def _open(self, path, mode="rb", **kwargs): | ||||||||||||
"""Return raw bytes-mode file-like from the file-system""" | ||||||||||||
bucket, path = self._split_path(path) | ||||||||||||
store = self._construct_store(bucket) | ||||||||||||
|
||||||||||||
return BufferedFileSimple(self, path, mode, **kwargs) | ||||||||||||
|
||||||||||||
|
||||||||||||
|
@@ -201,3 +273,13 @@ def read(self, length: int = -1): | |||||||||||
data = self.fs.cat_file(self.path, self.loc, self.loc + length) | ||||||||||||
self.loc += length | ||||||||||||
return data | ||||||||||||
|
||||||||||||
|
||||||||||||
class S3FsspecStore(AsyncFsspecStore): | ||||||||||||
protocol = "s3" | ||||||||||||
|
||||||||||||
class GCSFsspecStore(AsyncFsspecStore): | ||||||||||||
protocol = "gs" | ||||||||||||
|
||||||||||||
class AzureFsspecStore(AsyncFsspecStore): | ||||||||||||
protocol = "abfs" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we allow these, store should be optional?
And before merge we should enable typing overloads for better typing. You can see how from_url is implemented
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I use store here for deciding the store Interface (whether it is S3Store, GCSStore, ...), so that in AsyncFsspecStore we don't need to decide the interface based on the protocol.
Maybe there's a better way of deciding the store interface?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll have a look at the typing later on
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh that's confusing because store is the type of the class and not an instance.
We should be able to use the from_url top level function directly here?