Skip to content

Apply obstore as storage backend #3033

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 40 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
58ba73c
feat: enable obstore for minio in local
machichima Dec 28, 2024
79ea46d
feat: enable obstore write to remote minio s3
machichima Dec 29, 2024
caaa657
feat: use obstore for gcs
machichima Dec 30, 2024
7ba66e2
feat: use obstore for azure blob storage (abfs)
machichima Jan 1, 2025
0ef7c05
fix: wrong file path for get_filesystem_for_path
machichima Jan 4, 2025
17bde4a
build(Dockerfile.dev): add obstore
machichima Jan 4, 2025
353f000
build(pyproject): add obstore
machichima Jan 5, 2025
7f0782a
fix: add storage specific obstore class
machichima Jan 5, 2025
04bdf20
fix: path error for some file source
machichima Jan 5, 2025
0189419
feat: enable setting retries for s3
machichima Jan 5, 2025
deb9f3d
test: modify test for obstore s3
machichima Jan 5, 2025
0ca6dbc
test: use mock patch for obstore test
machichima Jan 8, 2025
42cc75f
fix: use correct anon key for s3 and azure
machichima Jan 8, 2025
a1c99ec
fix: use defined DEFAULT_BLOCK_SIZE
machichima Jan 8, 2025
9c7e8db
build: update obstore to 0.3.0b10
machichima Jan 10, 2025
6fba41b
feat: lru_cache & enable read public bucket
machichima Jan 11, 2025
99f9ede
test: update test
machichima Jan 11, 2025
f317ff7
feat: override storage options for s3 and azxure
machichima Jan 12, 2025
749a7fe
test: update test
machichima Jan 12, 2025
31d8880
fix: update storage config
machichima Jan 15, 2025
9645dc7
fix: update obstore client_options
machichima Jan 23, 2025
5ac0766
refactor: comments + config format
machichima Jan 28, 2025
8a0f262
fix: add bucket to get_fsspec_storage_options
machichima Jan 28, 2025
4009f80
test: update test
machichima Jan 28, 2025
89cdc64
Merge branch 'master' of github.com:flyteorg/flytekit into apply-obstore
machichima Mar 4, 2025
222df10
feat: adapt usage of newest obstore version
machichima Mar 4, 2025
e40fedf
fix: update storage config
machichima Mar 6, 2025
4dcb8a3
style: lint
machichima Mar 6, 2025
eb27d76
test: update test
machichima Mar 6, 2025
aad1b5e
fix: set async to true & update syntax
machichima Mar 18, 2025
d79c61d
build: set obstore version and remove s3fs, gcsfs, adlfs
machichima Mar 18, 2025
2a67bac
fix: _ANON to _SKIP_SIGNATURE
machichima Mar 18, 2025
3244d82
fix: remove redundant gs_setup_args function
machichima Mar 18, 2025
83f4db5
feat: add abfss
machichima Mar 18, 2025
6346fc3
fix: also add abfss in get_filesystem
machichima Mar 18, 2025
dfbf854
build: udpate dependencies
machichima Mar 18, 2025
54c57ed
build: add botocore
machichima Mar 19, 2025
4beb285
fix: mock listdir
machichima Mar 20, 2025
568aa90
build: add dependence s3fs in async-fsspec plugin
machichima Mar 20, 2025
c90a2ff
build: upgrade obstore to 0.6.0
machichima Apr 8, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Dockerfile.dev
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ RUN SETUPTOOLS_SCM_PRETEND_VERSION_FOR_FLYTEKIT=$PSEUDO_VERSION \
-e /flytekit \
-e /flytekit/plugins/flytekit-deck-standard \
-e /flytekit/plugins/flytekit-flyteinteractive \
obstore==0.6.0 \
markdown \
pandas \
pillow \
Expand Down
170 changes: 124 additions & 46 deletions flytekit/core/data_persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import pathlib
import tempfile
import typing
from datetime import timedelta
from time import sleep
from typing import Any, Dict, Optional, Union, cast
from uuid import UUID
Expand All @@ -32,23 +33,28 @@
from decorator import decorator
from fsspec.asyn import AsyncFileSystem
from fsspec.utils import get_protocol
from obstore.exceptions import GenericError
from obstore.fsspec import register
from typing_extensions import Unpack

from flytekit import configuration
from flytekit.configuration import DataConfig
from flytekit.core.local_fsspec import FlyteLocalFileSystem
from flytekit.core.utils import timeit
from flytekit.exceptions.system import FlyteDownloadDataException, FlyteUploadDataException
from flytekit.exceptions.system import (
FlyteDownloadDataException,
FlyteUploadDataException,
)
from flytekit.exceptions.user import FlyteAssertion, FlyteDataNotFoundException
from flytekit.interfaces.random import random
from flytekit.loggers import logger
from flytekit.utils.asyn import loop_manager

# Refer to https://github.com/fsspec/s3fs/blob/50bafe4d8766c3b2a4e1fc09669cf02fb2d71454/s3fs/core.py#L198
# Refer to https://github.com/developmentseed/obstore/blob/33654fc37f19a657689eb93327b621e9f9e01494/obstore/python/obstore/store/_aws.pyi#L11
# for key and secret
_FSSPEC_S3_KEY_ID = "key"
_FSSPEC_S3_SECRET = "secret"
_ANON = "anon"
_FSSPEC_S3_KEY_ID = "access_key_id"
_FSSPEC_S3_SECRET = "secret_access_key"
_SKIP_SIGNATURE = "skip_signature"

Uploadable = typing.Union[str, os.PathLike, pathlib.Path, bytes, io.BufferedReader, io.BytesIO, io.StringIO]

Expand All @@ -57,58 +63,102 @@
_WRITE_SIZE_CHUNK_BYTES = int(os.environ.get("_F_P_WRITE_CHUNK_SIZE", "26214400")) # 25 * 2**20


def s3_setup_args(s3_cfg: configuration.S3Config, anonymous: bool = False) -> Dict[str, Any]:
kwargs: Dict[str, Any] = {
"cache_regions": True,
}
if s3_cfg.access_key_id:
kwargs[_FSSPEC_S3_KEY_ID] = s3_cfg.access_key_id
def s3_setup_args(s3_cfg: configuration.S3Config, anonymous: bool = False, **kwargs) -> Dict[str, Any]:
"""
Setup s3 storage, bucket is needed to create obstore store object
"""

config: Dict[str, Any] = {}

if s3_cfg.secret_access_key:
kwargs[_FSSPEC_S3_SECRET] = s3_cfg.secret_access_key
if _FSSPEC_S3_KEY_ID in kwargs or s3_cfg.access_key_id:
config[_FSSPEC_S3_KEY_ID] = kwargs.pop(_FSSPEC_S3_KEY_ID, s3_cfg.access_key_id)
if _FSSPEC_S3_SECRET in kwargs or s3_cfg.secret_access_key:
config[_FSSPEC_S3_SECRET] = kwargs.pop(_FSSPEC_S3_SECRET, s3_cfg.secret_access_key)
if "endpoint_url" in kwargs or s3_cfg.endpoint:
config["endpoint_url"] = kwargs.pop("endpoint_url", s3_cfg.endpoint)

# S3fs takes this as a special arg
if s3_cfg.endpoint is not None:
kwargs["client_kwargs"] = {"endpoint_url": s3_cfg.endpoint}
retries = kwargs.pop("retries", s3_cfg.retries)
backoff = kwargs.pop("backoff", s3_cfg.backoff)

if anonymous:
kwargs[_ANON] = True
config[_SKIP_SIGNATURE] = True

retry_config = {
"max_retries": retries,
"backoff": {
"base": 2,
"init_backoff": backoff,
"max_backoff": timedelta(seconds=16),
},
"retry_timeout": timedelta(minutes=3),
}

client_options = {"timeout": "99999s", "allow_http": True}

if config:
kwargs["config"] = config
kwargs["client_options"] = client_options or None
kwargs["retry_config"] = retry_config or None

return kwargs


def azure_setup_args(azure_cfg: configuration.AzureBlobStorageConfig, anonymous: bool = False) -> Dict[str, Any]:
kwargs: Dict[str, Any] = {}

if azure_cfg.account_name:
kwargs["account_name"] = azure_cfg.account_name
if azure_cfg.account_key:
kwargs["account_key"] = azure_cfg.account_key
if azure_cfg.client_id:
kwargs["client_id"] = azure_cfg.client_id
if azure_cfg.client_secret:
kwargs["client_secret"] = azure_cfg.client_secret
if azure_cfg.tenant_id:
kwargs["tenant_id"] = azure_cfg.tenant_id
kwargs[_ANON] = anonymous
def azure_setup_args(
azure_cfg: configuration.AzureBlobStorageConfig,
anonymous: bool = False,
**kwargs,
) -> Dict[str, Any]:
"""
Setup azure blob storage, bucket is needed to create obstore store object
"""

config: Dict[str, Any] = {}

if "account_name" in kwargs or azure_cfg.account_name:
config["account_name"] = kwargs.get("account_name", azure_cfg.account_name)
if "account_key" in kwargs or azure_cfg.account_key:
config["account_key"] = kwargs.get("account_key", azure_cfg.account_key)
if "client_id" in kwargs or azure_cfg.client_id:
config["client_id"] = kwargs.get("client_id", azure_cfg.client_id)
if "client_secret" in kwargs or azure_cfg.client_secret:
config["client_secret"] = kwargs.get("client_secret", azure_cfg.client_secret)
if "tenant_id" in kwargs or azure_cfg.tenant_id:
config["tenant_id"] = kwargs.get("tenant_id", azure_cfg.tenant_id)

if anonymous:
config[_SKIP_SIGNATURE] = True

client_options = {"timeout": "99999s", "allow_http": "true"}

if config:
kwargs["config"] = config
kwargs["client_options"] = client_options

return kwargs


def get_fsspec_storage_options(
protocol: str, data_config: typing.Optional[DataConfig] = None, anonymous: bool = False, **kwargs
protocol: str,
data_config: typing.Optional[DataConfig] = None,
anonymous: bool = False,
**kwargs,
) -> Dict[str, Any]:
data_config = data_config or DataConfig.auto()

if protocol == "file":
return {"auto_mkdir": True, **kwargs}
if protocol == "s3":
return {**s3_setup_args(data_config.s3, anonymous=anonymous), **kwargs}
return {
**s3_setup_args(data_config.s3, anonymous=anonymous, **kwargs),
**kwargs,
}
if protocol == "gs":
if anonymous:
kwargs["token"] = _ANON
return kwargs
if protocol in ("abfs", "abfss"):
return {**azure_setup_args(data_config.azure, anonymous=anonymous), **kwargs}
return {
**azure_setup_args(data_config.azure, anonymous=anonymous, **kwargs),
**kwargs,
}
return {}


Expand Down Expand Up @@ -222,19 +272,24 @@ def get_filesystem(
kwargs["auto_mkdir"] = True
return FlyteLocalFileSystem(**kwargs)
elif protocol == "s3":
s3kwargs = s3_setup_args(self._data_config.s3, anonymous=anonymous)
s3kwargs = s3_setup_args(self._data_config.s3, anonymous=anonymous, **kwargs)
s3kwargs.update(kwargs)
return fsspec.filesystem(protocol, **s3kwargs) # type: ignore
elif protocol == "gs":
if anonymous:
kwargs["token"] = _ANON
return fsspec.filesystem(protocol, **kwargs) # type: ignore
elif protocol in ("abfs", "abfss"):
azkwargs = azure_setup_args(self._data_config.azure, anonymous=anonymous, **kwargs)
azkwargs.update(kwargs)
return fsspec.filesystem(protocol, **azkwargs) # type: ignore
elif protocol == "ftp":
kwargs.update(fsspec.implementations.ftp.FTPFileSystem._get_kwargs_from_urls(path))
return fsspec.filesystem(protocol, **kwargs)

storage_options = get_fsspec_storage_options(
protocol=protocol, anonymous=anonymous, data_config=self._data_config, **kwargs
protocol=protocol,
anonymous=anonymous,
data_config=self._data_config,
**kwargs,
)
kwargs.update(storage_options)

Expand All @@ -246,7 +301,14 @@ async def get_async_filesystem_for_path(
protocol = get_protocol(path)
loop = asyncio.get_running_loop()

return self.get_filesystem(protocol, anonymous=anonymous, path=path, asynchronous=True, loop=loop, **kwargs)
return self.get_filesystem(
protocol,
anonymous=anonymous,
path=path,
asynchronous=True,
loop=loop,
**kwargs,
)

def get_filesystem_for_path(self, path: str = "", anonymous: bool = False, **kwargs) -> fsspec.AbstractFileSystem:
protocol = get_protocol(path)
Expand Down Expand Up @@ -328,7 +390,9 @@ async def get(self, from_path: str, to_path: str, recursive: bool = False, **kwa
import shutil

return shutil.copytree(
self.strip_file_header(from_path), self.strip_file_header(to_path), dirs_exist_ok=True
self.strip_file_header(from_path),
self.strip_file_header(to_path),
dirs_exist_ok=True,
)
logger.info(f"Getting {from_path} to {to_path}")
if isinstance(file_system, AsyncFileSystem):
Expand All @@ -338,10 +402,15 @@ async def get(self, from_path: str, to_path: str, recursive: bool = False, **kwa
if isinstance(dst, (str, pathlib.Path)):
return dst
return to_path
except OSError as oe:
except (OSError, GenericError) as oe:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider separate handling for different exceptions

Consider handling GenericError separately from OSError since they may require different error handling approaches. The current implementation treats them the same way which could mask important error details.

Code suggestion
Check the AI-generated fix before applying
Suggested change
except (OSError, GenericError) as oe:
except OSError as oe:
logger.debug(f"Error in getting {from_path} to {to_path} rec {recursive} {oe}")
await self._handle_get_error(file_system, from_path, to_path, recursive, oe, **kwargs)
except GenericError as ge:
logger.debug(f"Error in getting {from_path} to {to_path} rec {recursive} {ge}")
exists = True # Force anonymous filesystem retry for GenericError

Code Review Run #ab65d8


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

logger.debug(f"Error in getting {from_path} to {to_path} rec {recursive} {oe}")
if isinstance(file_system, AsyncFileSystem):
exists = await file_system._exists(from_path) # pylint: disable=W0212
try:
exists = await file_system._exists(from_path) # pylint: disable=W0212
except GenericError:
# for obstore, as it does not raise FileNotFoundError in fsspec but GenericError
# force it to try get_filesystem(anonymous=True)
exists = True
else:
exists = file_system.exists(from_path)
if not exists:
Expand Down Expand Up @@ -371,7 +440,9 @@ async def _put(self, from_path: str, to_path: str, recursive: bool = False, **kw
import shutil

return shutil.copytree(
self.strip_file_header(from_path), self.strip_file_header(to_path), dirs_exist_ok=True
self.strip_file_header(from_path),
self.strip_file_header(to_path),
dirs_exist_ok=True,
)
from_path, to_path = self.recursive_paths(from_path, to_path)
if self._execution_metadata:
Expand Down Expand Up @@ -633,7 +704,11 @@ async def async_get_data(self, remote_path: str, local_path: str, is_multipart:
get_data = loop_manager.synced(async_get_data)

async def async_put_data(
self, local_path: Union[str, os.PathLike], remote_path: str, is_multipart: bool = False, **kwargs
self,
local_path: Union[str, os.PathLike],
remote_path: str,
is_multipart: bool = False,
**kwargs,
) -> str:
"""
The implication here is that we're always going to put data to the remote location, so we .remote to ensure
Expand Down Expand Up @@ -664,6 +739,9 @@ async def async_put_data(
put_data = loop_manager.synced(async_put_data)


register(["s3", "gs", "abfs", "abfss"], asynchronous=True)


flyte_tmp_dir = tempfile.mkdtemp(prefix="flyte-")
default_local_file_access_provider = FileAccessProvider(
local_sandbox_dir=os.path.join(flyte_tmp_dir, "sandbox"),
Expand Down
6 changes: 5 additions & 1 deletion flytekit/types/structured/basic_dfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@ def get_pandas_storage_options(
from pandas.io.common import is_fsspec_url

if is_fsspec_url(uri):
return get_fsspec_storage_options(protocol=get_protocol(uri), data_config=data_config, anonymous=anonymous)
return get_fsspec_storage_options(
protocol=get_protocol(uri),
data_config=data_config,
anonymous=anonymous,
)

# Pandas does not allow storage_options for non-fsspec paths e.g. local.
return None
Expand Down
2 changes: 1 addition & 1 deletion plugins/flytekit-async-fsspec/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

microlib_name = "flytekitplugins-async-fsspec"

plugin_requires = ["flytekit"]
plugin_requires = ["flytekit", "s3fs>=2023.3.0,!=2024.3.1"]

__version__ = "0.0.0+develop"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ def decode(
current_task_metadata: StructuredDatasetMetadata,
) -> pl.DataFrame:
uri = flyte_value.uri

kwargs = get_fsspec_storage_options(
protocol=fsspec_utils.get_protocol(uri),
data_config=ctx.file_access.data_config,
Expand Down Expand Up @@ -153,7 +152,6 @@ def decode(
current_task_metadata: StructuredDatasetMetadata,
) -> pl.LazyFrame:
uri = flyte_value.uri

kwargs = get_fsspec_storage_options(
protocol=fsspec_utils.get_protocol(uri),
data_config=ctx.file_access.data_config,
Expand Down
6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ readme = { file = "README.md", content-type = "text/markdown" }
requires-python = ">=3.9,<3.13"
dependencies = [
# Please maintain an alphabetical order in the following list
"adlfs>=2023.3.0",
"aiohttp>=3.11.13",
"botocore>=1.37.15",
"click>=6.6",
"cloudpickle>=2.0.0",
"croniter>=0.3.20",
Expand All @@ -22,7 +23,6 @@ dependencies = [
"docstring-parser>=0.9.0",
"flyteidl>=1.15.1",
"fsspec>=2023.3.0",
"gcsfs>=2023.3.0",
"googleapis-common-protos>=1.57",
# Skipping those versions to account for the unwanted output coming from grpcio and grpcio-status.
# Issue being tracked in https://github.com/flyteorg/flyte/issues/6082.
Expand All @@ -38,6 +38,7 @@ dependencies = [
"marshmallow-jsonschema>=0.12.0",
"mashumaro>=3.15",
"msgpack>=1.1.0",
"obstore==0.6.0",
"protobuf!=4.25.0",
"pygments",
"python-json-logger>=2.0.0",
Expand All @@ -46,7 +47,6 @@ dependencies = [
"requests>=2.18.4",
"rich",
"rich_click",
"s3fs>=2023.3.0,!=2024.3.1",
"statsd>=3.0.0",
"typing_extensions",
"urllib3>=1.22",
Expand Down
2 changes: 1 addition & 1 deletion tests/flytekit/unit/bin/test_python_entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ def test_setup_for_fast_register():
@mock.patch("google.auth.compute_engine._metadata")
def test_setup_cloud_prefix(mock_gcs):
with setup_execution("s3://", checkpoint_path=None, prev_checkpoint=None) as ctx:
assert ctx.file_access._default_remote.protocol[0] == "s3"
assert ctx.file_access._default_remote.protocol == "s3"

with setup_execution("gs://", checkpoint_path=None, prev_checkpoint=None) as ctx:
assert "gs" in ctx.file_access._default_remote.protocol
Expand Down
Loading
Loading