Skip to content

Commit ae1198b

Browse files
authored
feat(to_storage): cleanup, add tests, add filepath strategy (#1461)
1 parent ebff1aa commit ae1198b

File tree

5 files changed

+121
-11
lines changed

5 files changed

+121
-11
lines changed

docs/guide/remotes.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,29 @@ DataChain uses [fsspec](https://filesystem-spec.readthedocs.io/en/latest/) to in
1919
- Azure Blob Storage: `az://container-name/path/to/data`
2020
- Hugging Face: `hf://dataset-name`
2121

22+
## Export placement strategies
23+
24+
Both `File.export` and `DataChain.to_storage` expose a `placement`
25+
parameter that controls how the target path is built for each file. Choose the
26+
strategy that fits your destination layout:
27+
28+
- `filename`: keep only the original file name (no directories). Simple, but
29+
colliding names overwrite each other.
30+
- `filepath`: preserve the relative directory structure inside the dataset.
31+
This is the right choice when you want to mirror the source folder layout at
32+
the destination.
33+
- `fullpath`: for remote sources, prefix paths with the storage host (for
34+
example `s3://bucket/data/a.jpg` exports to `<output>/bucket/data/a.jpg`).
35+
Local sources behave the same as `filepath`.
36+
- `etag`: use the file ETag with the original extension to guarantee unique
37+
names when your storage exposes object digests.
38+
- `checksum`: reserved for future use. Calling it currently raises
39+
`NotImplementedError`.
40+
41+
Relative output directories such as `"."` or `".."`, as well as absolute
42+
paths, are supported. The placement strategy simply determines the appended
43+
sub-path.
44+
2245
## Extra configuration
2346
For the configuration parameters to the filesystem, you can pass the key and value pair as client_config dictionary that will be passed to the respective filesystem.
2447

src/datachain/lib/dc/datachain.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2695,7 +2695,19 @@ def to_storage(
26952695
output: Path to the target directory for exporting files.
26962696
signal: Name of the signal to export files from.
26972697
placement: The method to use for naming exported files.
2698-
The possible values are: "filename", "etag", "fullpath", and "checksum".
2698+
The possible values are: "filename", "etag", "fullpath",
2699+
"filepath", and "checksum".
2700+
Example path translations for an object located at
2701+
``s3://bucket/data/img.jpg`` and exported to ``./out``:
2702+
2703+
- "filename" -> ``./out/img.jpg`` (no directories)
2704+
- "filepath" -> ``./out/data/img.jpg`` (relative path kept)
2705+
- "fullpath" -> ``./out/bucket/data/img.jpg`` (remote host kept)
2706+
- "etag" -> ``./out/<etag>.jpg`` (unique name via object digest)
2707+
2708+
Local sources behave like "filepath" for "fullpath" placement.
2709+
Relative destinations such as "." or ".." and absolute paths
2710+
are supported for every strategy.
26992711
link_type: Method to use for exporting files.
27002712
Falls back to `'copy'` if symlinking fails.
27012713
num_threads: number of threads to use for exporting files.

src/datachain/lib/file.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
logger = logging.getLogger("datachain")
4343

4444
# how to create file path when exporting
45-
ExportPlacement = Literal["filename", "etag", "fullpath", "checksum"]
45+
ExportPlacement = Literal["filename", "etag", "fullpath", "checksum", "filepath"]
4646

4747
FileType = Literal["binary", "text", "image", "video", "audio"]
4848
EXPORT_FILES_MAX_THREADS = 5
@@ -644,6 +644,8 @@ def get_destination_path(
644644
source = urlparse(self.source)
645645
if source.scheme and source.scheme != "file":
646646
path = posixpath.join(source.netloc, path)
647+
elif placement == "filepath":
648+
path = unquote(self.get_path_normalized())
647649
elif placement == "checksum":
648650
raise NotImplementedError("Checksum placement not implemented yet")
649651
else:

tests/func/test_datachain.py

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@
77
from collections.abc import Iterator
88
from datetime import datetime, timedelta, timezone
99
from pathlib import Path, PurePosixPath
10+
from typing import cast
1011
from unittest.mock import Mock, patch
12+
from urllib.parse import urlparse
1113

1214
import numpy as np
1315
import pandas as pd
@@ -259,7 +261,7 @@ def test_read_file(cloud_test_catalog, use_cache):
259261
assert bool(file.get_local_path()) is use_cache
260262

261263

262-
@pytest.mark.parametrize("placement", ["fullpath", "filename"])
264+
@pytest.mark.parametrize("placement", ["fullpath", "filename", "filepath"])
263265
@pytest.mark.parametrize("use_map", [True, False])
264266
@pytest.mark.parametrize("use_cache", [True, False])
265267
@pytest.mark.parametrize("file_type", ["", "binary", "text"])
@@ -305,13 +307,28 @@ def test_to_storage(
305307
"dog4": "ruff",
306308
}
307309

308-
for file in df.to_values("file"):
310+
def _expected_destination_rel(file_obj: File, placement: str) -> Path:
311+
rel_path = PurePosixPath(file_obj.path).as_posix()
312+
309313
if placement == "filename":
310-
file_path = file.name
311-
else:
312-
file_path = file.get_full_name()
313-
with open(tmp_dir / "output" / file_path) as f:
314-
assert f.read() == expected[file.name]
314+
return Path(file_obj.name)
315+
if placement == "filepath":
316+
return Path(rel_path)
317+
if placement == "fullpath":
318+
parsed = urlparse(file_obj.source)
319+
full_rel = rel_path
320+
if parsed.scheme and parsed.scheme != "file":
321+
full_rel = posixpath.join(parsed.netloc, rel_path)
322+
return Path(full_rel)
323+
raise AssertionError(f"Unsupported placement: {placement}")
324+
325+
output_root = tmp_dir / "output"
326+
for file_record in df.to_values("file"):
327+
file_obj = cast("File", file_record)
328+
destination_rel = _expected_destination_rel(file_obj, placement)
329+
330+
with (output_root / destination_rel).open() as f:
331+
assert f.read() == expected[file_obj.name]
315332

316333
assert mapper.call_count == len(expected)
317334

tests/unit/lib/test_file.py

Lines changed: 58 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import json
2+
import posixpath
23
from pathlib import Path
34
from unittest.mock import Mock
45

@@ -50,7 +51,7 @@ def test_cache_get_path(catalog: Catalog):
5051
def test_get_destination_path_wrong_strategy():
5152
file = create_file("s3://mybkt")
5253
with pytest.raises(ValueError):
53-
file.get_destination_path("", "wrong")
54+
file.get_destination_path("", "wrong") # type: ignore[arg-type]
5455

5556

5657
def test_get_destination_path_filename_strategy():
@@ -88,6 +89,55 @@ def test_get_destination_path_fullpath_strategy_file_source(catalog, tmp_path):
8889
)
8990

9091

92+
def test_get_destination_path_filepath_strategy(catalog):
93+
file = create_file("s3://mybkt")
94+
file._set_stream(catalog, False)
95+
assert (
96+
file.get_destination_path("output", "filepath") == "output/dir1/dir2/test.txt"
97+
)
98+
99+
100+
def test_get_destination_path_filepath_strategy_empty_output(catalog):
101+
file = create_file("s3://mybkt")
102+
file._set_stream(catalog, False)
103+
assert file.get_destination_path("", "filepath") == "dir1/dir2/test.txt"
104+
105+
106+
@pytest.mark.parametrize("output", [".", ".."])
107+
@pytest.mark.parametrize(
108+
("placement", "expected"),
109+
[
110+
("filename", "test.txt"),
111+
("filepath", "dir1/dir2/test.txt"),
112+
("fullpath", "mybkt/dir1/dir2/test.txt"),
113+
],
114+
)
115+
def test_get_destination_path_relative_output(catalog, output, placement, expected):
116+
file = create_file("s3://mybkt")
117+
file._set_stream(catalog, False)
118+
assert file.get_destination_path(output, placement) == posixpath.join(
119+
output, expected
120+
)
121+
122+
123+
@pytest.mark.parametrize(
124+
("placement", "expected_suffix"),
125+
[
126+
("filename", "test.txt"),
127+
("filepath", "dir1/dir2/test.txt"),
128+
("fullpath", "mybkt/dir1/dir2/test.txt"),
129+
],
130+
)
131+
def test_get_destination_path_absolute_output(
132+
catalog, tmp_path, placement, expected_suffix
133+
):
134+
file = create_file("s3://mybkt")
135+
file._set_stream(catalog, False)
136+
output = tmp_path / "dest"
137+
expected = f"{output.as_posix()}/{expected_suffix}"
138+
assert file.get_destination_path(output.as_posix(), placement) == expected
139+
140+
91141
def test_read_binary_data(tmp_path, catalog: Catalog):
92142
file_name = "myfile"
93143
data = b"some\x00data\x00is\x48\x65\x6c\x57\x6f\x72\x6c\x64\xff\xffheRe"
@@ -376,7 +426,13 @@ def test_export_with_symlink(tmp_path, catalog, use_cache):
376426
file.export(tmp_path / "dir", link_type="symlink", use_cache=use_cache)
377427
assert (tmp_path / "dir" / "myfile.txt").is_symlink()
378428

379-
dst = Path(file.get_local_path()) if use_cache else path
429+
if use_cache:
430+
cached_path = file.get_local_path()
431+
assert cached_path is not None
432+
dst = Path(cached_path)
433+
else:
434+
dst = path
435+
380436
assert (tmp_path / "dir" / "myfile.txt").resolve() == dst
381437

382438

0 commit comments

Comments
 (0)