Skip to content

Commit f8f33bf

Browse files
committed
add fsspec implementation
1 parent 6622f52 commit f8f33bf

File tree

1 file changed

+89
-0
lines changed

1 file changed

+89
-0
lines changed
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
from functools import singledispatch
2+
import fsspec
3+
from fsspec.spec import AbstractFileSystem
4+
from typing import Union
5+
from mara_storage import storages
6+
import pathlib
7+
8+
9+
@singledispatch
10+
def filesystem(storage: Union[str, storages.Storage], **storage_options) -> AbstractFileSystem:
11+
"""
12+
Returns a fsspec compatible filesystem
13+
14+
Args:
15+
db: the storage as alias or class
16+
**kargs: additional arguments to be passed to the
17+
"""
18+
raise NotImplementedError(f'Please implement filesystem for type "{storage.__class__.__name__}"')
19+
20+
21+
@filesystem.register(str)
22+
def __(storage_alias: str, **kargs):
23+
return filesystem(storages.storage(storage_alias), **kargs)
24+
25+
26+
@filesystem.register(storages.LocalStorage)
27+
def __(storage: storages.LocalStorage, **kargs):
28+
return fsspec.filesystem('file',
29+
root_marker=kargs.pop('root_marker', str(pathlib.Path(storage.base_path).absolute())),
30+
**kargs)
31+
32+
33+
@filesystem.register(storages.SftpStorage)
34+
def __(storage: storages.SftpStorage, **kargs):
35+
return fsspec.filesystem('sftp',
36+
host=kargs.pop('host', storage.host),
37+
hostname=kargs.pop('hostname', storage.host),
38+
port=kargs.pop('port', storage.port or 22),
39+
username=kargs.pop('username', storage.user),
40+
password=kargs.pop('password', storage.password),
41+
key_filename=kargs.pop('key_filename', storage.identity_file),
42+
**kargs)
43+
44+
45+
@filesystem.register(storages.GoogleCloudStorage)
46+
def __(storage: storages.GoogleCloudStorage, **kargs):
47+
return fsspec.filesystem('gcs',
48+
project=kargs.pop('project', storage.project_id),
49+
default_location=kargs.pop('default_location', storage.location),
50+
**kargs)
51+
52+
53+
54+
@singledispatch
55+
def build_path(storage: Union[str, storages.Storage], path: str) -> str:
56+
"""
57+
Builds a path for fsspec including storage specific information e.g. the
58+
storage container or bucked which is defined in mara in the Storage class.
59+
60+
Args:
61+
storage: the storage as alias or class
62+
path: the path inside the storage. E.g. `my_table/*.parquet`
63+
64+
Returns:
65+
A absolute URI or path to be used inside fsspec. E.g. `my_container/path`
66+
"""
67+
raise NotImplementedError(f'Please implement build_uri for type "{storage.__class__.__name__}"')
68+
69+
70+
@build_path.register(str)
71+
def __(storage_alias: str, path: str):
72+
return build_path(storages.storage(storage_alias), path=path)
73+
74+
75+
@build_path.register(storages.LocalStorage)
76+
def __(storage: storages.LocalStorage, path: str):
77+
return str((pathlib.Path(storage.base_path) / path).absolute())
78+
79+
80+
@build_path.register(storages.GoogleCloudStorage)
81+
def __(storage: storages.GoogleCloudStorage, path: str):
82+
if not storage.bucket_name:
83+
raise ValueError(f"Storage {storage.__class__.__name__} must have a buckt name")
84+
return f'{storage.bucket_name}/{path}'
85+
86+
87+
@build_path.register(storages.SftpStorage)
88+
def __(storage: storages.SftpStorage, path: str):
89+
return path

0 commit comments

Comments
 (0)