Skip to content

Commit e166926

Browse files
authored
Update universal-pathlib to >=0.3.8 and use upath.extensions.ProxyUPath (#60519)
* pyproject.toml: update universal-pathlib version boundaries * task_sdk.io.path: fix test regarding relative_to * tests: adjust the lazy_load test to reflect caching is done via STORE_CACHE * tests: update tests to register fake remote filesystem in fsspec * airflow.sdk.io.path: implement ObjectStoragePath via ProxyUPath * airflow.sdk.io.path: provide a basic implementation for copy_into and move_into * airflow.sdk.io.path: fix __str__ method * airflow.sdk.io.path: docstring fixes * update spelling_wordlist.txt
1 parent 122146f commit e166926

File tree

4 files changed

+105
-56
lines changed

4 files changed

+105
-56
lines changed

airflow-core/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ dependencies = [
144144
"termcolor>=3.0.0",
145145
"typing-extensions>=4.14.1",
146146
# https://github.com/apache/airflow/issues/56369 , rework universal-pathlib usage
147-
"universal-pathlib>=0.2.6,<0.3.0",
147+
"universal-pathlib>=0.3.8",
148148
"uuid6>=2024.7.10",
149149
"apache-airflow-task-sdk<1.3.0,>=1.2.0",
150150
# pre-installed providers

docs/spelling_wordlist.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
aarch
22
abc
3+
AbstractFileSystem
34
accessor
45
AccessSecretVersionResponse
56
accountmaking
@@ -736,6 +737,7 @@ fqdn
736737
frontend
737738
fs
738739
fsGroup
740+
fsspec
739741
fullname
740742
func
741743
Fundera

task-sdk/src/airflow/sdk/io/path.py

Lines changed: 76 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,21 @@
1717

1818
from __future__ import annotations
1919

20-
import contextlib
21-
import os
2220
import shutil
23-
from collections.abc import Mapping
2421
from typing import TYPE_CHECKING, Any, ClassVar
2522
from urllib.parse import urlsplit
2623

2724
from fsspec.utils import stringify_path
28-
from upath.implementations.cloud import CloudPath
29-
from upath.registry import get_upath_class
25+
from upath import UPath
26+
from upath.extensions import ProxyUPath
3027

3128
from airflow.sdk.io.stat import stat_result
3229
from airflow.sdk.io.store import attach
3330

3431
if TYPE_CHECKING:
3532
from fsspec import AbstractFileSystem
33+
from typing_extensions import Self
34+
from upath.types import JoinablePathLike
3635

3736

3837
class _TrackingFileWrapper:
@@ -77,42 +76,48 @@ def __exit__(self, exc_type, exc_val, exc_tb):
7776
self._obj.__exit__(exc_type, exc_val, exc_tb)
7877

7978

80-
class ObjectStoragePath(CloudPath):
79+
class ObjectStoragePath(ProxyUPath):
8180
"""A path-like object for object storage."""
8281

8382
__version__: ClassVar[int] = 1
8483

85-
_protocol_dispatch = False
86-
8784
sep: ClassVar[str] = "/"
8885
root_marker: ClassVar[str] = "/"
8986

9087
__slots__ = ("_hash_cached",)
9188

92-
@classmethod
93-
def _transform_init_args(
94-
cls,
95-
args: tuple[str | os.PathLike, ...],
96-
protocol: str,
97-
storage_options: dict[str, Any],
98-
) -> tuple[tuple[str | os.PathLike, ...], str, dict[str, Any]]:
99-
"""Extract conn_id from the URL and set it as a storage option."""
89+
def __init__(
90+
self,
91+
*args: JoinablePathLike,
92+
protocol: str | None = None,
93+
conn_id: str | None = None,
94+
**storage_options: Any,
95+
) -> None:
96+
# ensure conn_id is always set in storage_options
97+
storage_options.setdefault("conn_id", None)
98+
# parse conn_id from args if provided
10099
if args:
101100
arg0 = args[0]
102-
parsed_url = urlsplit(stringify_path(arg0))
103-
userinfo, have_info, hostinfo = parsed_url.netloc.rpartition("@")
104-
if have_info:
105-
storage_options.setdefault("conn_id", userinfo or None)
106-
parsed_url = parsed_url._replace(netloc=hostinfo)
107-
args = (parsed_url.geturl(),) + args[1:]
108-
protocol = protocol or parsed_url.scheme
109-
return args, protocol, storage_options
101+
if isinstance(arg0, type(self)):
102+
storage_options["conn_id"] = arg0.storage_options.get("conn_id")
103+
else:
104+
parsed_url = urlsplit(stringify_path(arg0))
105+
userinfo, have_info, hostinfo = parsed_url.netloc.rpartition("@")
106+
if have_info:
107+
conn_id = storage_options["conn_id"] = userinfo or None
108+
parsed_url = parsed_url._replace(netloc=hostinfo)
109+
args = (parsed_url.geturl(),) + args[1:]
110+
protocol = protocol or parsed_url.scheme
111+
# override conn_id if explicitly provided
112+
if conn_id is not None:
113+
storage_options["conn_id"] = conn_id
114+
super().__init__(*args, protocol=protocol, **storage_options)
110115

111-
@classmethod
112-
def _fs_factory(
113-
cls, urlpath: str, protocol: str, storage_options: Mapping[str, Any]
114-
) -> AbstractFileSystem:
115-
return attach(protocol or "file", storage_options.get("conn_id")).fs
116+
@property
117+
def fs(self) -> AbstractFileSystem:
118+
"""Return the filesystem for this path, using airflow's attach mechanism."""
119+
conn_id = self.storage_options.get("conn_id")
120+
return attach(self.protocol or "file", conn_id).fs
116121

117122
def __hash__(self) -> int:
118123
self._hash_cached: int
@@ -181,12 +186,7 @@ def samefile(self, other_path: Any) -> bool:
181186
and st["ino"] == other_st["ino"]
182187
)
183188

184-
def _scandir(self):
185-
# Emulate os.scandir(), which returns an object that can be used as a
186-
# context manager.
187-
return contextlib.nullcontext(self.iterdir())
188-
189-
def replace(self, target) -> ObjectStoragePath:
189+
def replace(self, target) -> Self:
190190
"""
191191
Rename this path to the target path, overwriting if that path exists.
192192
@@ -199,16 +199,12 @@ def replace(self, target) -> ObjectStoragePath:
199199
return self.rename(target)
200200

201201
@classmethod
202-
def cwd(cls):
203-
if cls is ObjectStoragePath:
204-
return get_upath_class("").cwd()
205-
raise NotImplementedError
202+
def cwd(cls) -> Self:
203+
return cls._from_upath(UPath.cwd())
206204

207205
@classmethod
208-
def home(cls):
209-
if cls is ObjectStoragePath:
210-
return get_upath_class("").home()
211-
raise NotImplementedError
206+
def home(cls) -> Self:
207+
return cls._from_upath(UPath.home())
212208

213209
# EXTENDED OPERATIONS
214210

@@ -299,7 +295,7 @@ def _cp_file(self, dst: ObjectStoragePath, **kwargs):
299295
# make use of system dependent buffer size
300296
shutil.copyfileobj(f1, f2, **kwargs)
301297

302-
def copy(self, dst: str | ObjectStoragePath, recursive: bool = False, **kwargs) -> None:
298+
def copy(self, dst: str | ObjectStoragePath, recursive: bool = False, **kwargs) -> None: # type: ignore[override]
303299
"""
304300
Copy file(s) from this path to another location.
305301
@@ -370,7 +366,23 @@ def copy(self, dst: str | ObjectStoragePath, recursive: bool = False, **kwargs)
370366
# remote file -> remote dir
371367
self._cp_file(dst, **kwargs)
372368

373-
def move(self, path: str | ObjectStoragePath, recursive: bool = False, **kwargs) -> None:
369+
def copy_into(self, target_dir: str | ObjectStoragePath, recursive: bool = False, **kwargs) -> None: # type: ignore[override]
370+
"""
371+
Copy file(s) from this path into another directory.
372+
373+
:param target_dir: Destination directory
374+
:param recursive: If True, copy directories recursively.
375+
376+
kwargs: Additional keyword arguments to be passed to the underlying implementation.
377+
"""
378+
if isinstance(target_dir, str):
379+
target_dir = ObjectStoragePath(target_dir)
380+
if not target_dir.is_dir():
381+
raise NotADirectoryError(f"Destination {target_dir} is not a directory.")
382+
dst_path = target_dir / self.name
383+
self.copy(dst_path, recursive=recursive, **kwargs)
384+
385+
def move(self, path: str | ObjectStoragePath, recursive: bool = False, **kwargs) -> None: # type: ignore[override]
374386
"""
375387
Move file(s) from this path to another location.
376388
@@ -394,6 +406,23 @@ def move(self, path: str | ObjectStoragePath, recursive: bool = False, **kwargs)
394406
self.copy(path, recursive=recursive, **kwargs)
395407
self.unlink()
396408

409+
def move_into(self, target_dir: str | ObjectStoragePath, recursive: bool = False, **kwargs) -> None: # type: ignore[override]
410+
"""
411+
Move file(s) from this path into another directory.
412+
413+
:param target_dir: Destination directory
414+
:param recursive: bool
415+
If True, move directories recursively.
416+
417+
kwargs: Additional keyword arguments to be passed to the underlying implementation.
418+
"""
419+
if isinstance(target_dir, str):
420+
target_dir = ObjectStoragePath(target_dir)
421+
if not target_dir.is_dir():
422+
raise NotADirectoryError(f"Destination {target_dir} is not a directory.")
423+
dst_path = target_dir / self.name
424+
self.move(dst_path, recursive=recursive, **kwargs)
425+
397426
def serialize(self) -> dict[str, Any]:
398427
_kwargs = {**self.storage_options}
399428
conn_id = _kwargs.pop("conn_id", None)
@@ -417,6 +446,6 @@ def deserialize(cls, data: dict, version: int) -> ObjectStoragePath:
417446

418447
def __str__(self):
419448
conn_id = self.storage_options.get("conn_id")
420-
if self._protocol and conn_id:
421-
return f"{self._protocol}://{conn_id}@{self.path}"
449+
if self.protocol and conn_id:
450+
return f"{self.protocol}://{conn_id}@{self.path}"
422451
return super().__str__()

task-sdk/tests/task_sdk/io/test_path.py

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -70,16 +70,19 @@ def test_home():
7070
def test_lazy_load():
7171
o = ObjectStoragePath("file:///tmp/foo")
7272
with pytest.raises(AttributeError):
73-
assert o._fs_cached
73+
assert o.__wrapped__._fs_cached
7474

75+
# ObjectStoragePath overrides .fs and provides cached filesystems via the STORE_CACHE
7576
assert o.fs is not None
76-
assert o._fs_cached
77+
78+
with pytest.raises(AttributeError):
79+
assert o.__wrapped__._fs_cached
7780
# Clear the cache to avoid side effects in other tests below
7881
_STORE_CACHE.clear()
7982

8083

8184
class _FakeRemoteFileSystem(MemoryFileSystem):
82-
protocol = ("s3", "fakefs", "ffs", "ffs2")
85+
protocol = ("s3", "fake", "fakefs", "ffs", "ffs2")
8386
root_marker = ""
8487
store: ClassVar[dict[str, Any]] = {}
8588
pseudo_dirs = [""]
@@ -99,6 +102,21 @@ def _strip_protocol(cls, path):
99102
return path
100103

101104

105+
@pytest.fixture(scope="module", autouse=True)
106+
def register_fake_remote_filesystem():
107+
# Register the fake filesystem with fsspec so UPath can discover it
108+
from fsspec.registry import _registry as fsspec_implementation_registry, register_implementation
109+
110+
old_registry = fsspec_implementation_registry.copy()
111+
try:
112+
for proto in _FakeRemoteFileSystem.protocol:
113+
register_implementation(proto, _FakeRemoteFileSystem, clobber=True)
114+
yield
115+
finally:
116+
fsspec_implementation_registry.clear()
117+
fsspec_implementation_registry.update(old_registry)
118+
119+
102120
class TestAttach:
103121
FAKE = "ffs:///fake"
104122
MNT = "ffs:///mnt/warehouse"
@@ -168,10 +186,6 @@ def test_standard_extended_api(self, fake_files, fn, args, fn2, path, expected_a
168186

169187

170188
class TestRemotePath:
171-
@pytest.fixture(autouse=True)
172-
def fake_fs(self, monkeypatch):
173-
monkeypatch.setattr(ObjectStoragePath, "_fs_factory", lambda *a, **k: _FakeRemoteFileSystem())
174-
175189
def test_bucket_key_protocol(self):
176190
bucket = "bkt"
177191
key = "yek"
@@ -262,7 +276,11 @@ def test_relative_to(self, tmp_path, target):
262276
o1 = ObjectStoragePath(f"file://{target}")
263277
o2 = ObjectStoragePath(f"file://{tmp_path.as_posix()}")
264278
o3 = ObjectStoragePath(f"file:///{uuid.uuid4()}")
265-
assert o1.relative_to(o2) == o1
279+
# relative_to returns the relative path from o2 to o1
280+
relative = o1.relative_to(o2)
281+
# The relative path should be the basename (uuid) of the target
282+
expected_relative = target.split("/")[-1]
283+
assert str(relative) == expected_relative
266284
with pytest.raises(ValueError, match="is not in the subpath of"):
267285
o1.relative_to(o3)
268286

0 commit comments

Comments
 (0)