Skip to content

Commit 311e744

Browse files
romain-intelclaude
andcommitted
Merge USER_CONTENT from add_to_package with user-code walker
USER_CONTENT tuples produced by FlowDecorator/StepDecorator/ FlowMutator/StepMutator add_to_package hooks were previously dropped on the floor. They are now recorded per-arcname and merged into the packaged user code: files already yielded by the flow-directory walker are skipped (dedup), and anything outside the flow dir or filtered out by the suffix/user filter still makes it into the package. Also add an example `@package_suffixes` FlowMutator that declares additional file suffixes to include in the package — mirroring the `--package-suffixes` CLI option as a decorator and demonstrating the new USER_CONTENT merging behavior. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent dfdbed7 commit 311e744

4 files changed

Lines changed: 241 additions & 6 deletions

File tree

metaflow/package/__init__.py

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,12 @@ def _module_selector(m) -> bool:
132132
self._blob_url = None
133133
self._blob = None
134134

135+
# USER_CONTENT files contributed by decorators/mutators via add_to_package.
136+
# Keyed by arcname (path relative to the flow directory) -> absolute file path.
137+
# These are merged with the files discovered by walking the flow directory;
138+
# duplicates (same arcname) are dropped to avoid conflicts.
139+
self._user_content_from_addl: Dict[str, str] = {}
140+
135141
# Update package in the system context -- it will be available
136142
# in all hooks going forward including the ones called in the
137143
# thread that is used to create the package asynchronously.
@@ -612,8 +618,22 @@ def _check_tuple(path_tuple):
612618
file_name, [deco_module_paths[file_name], file_path]
613619
)
614620
elif file_type == ContentType.USER_CONTENT:
615-
pass # For now pass -- it will be included normally but we need to
616-
# merge it.
621+
# USER_CONTENT files will be merged with the files discovered by
622+
# walking the flow directory. Track them here so we can:
623+
# 1. Include them in the package even if they live outside the
624+
# flow directory (or are excluded by the user_code_filter).
625+
# 2. Avoid duplicating files already picked up by the walker.
626+
real_path = os.path.realpath(path_tuple[0])
627+
path_tuple = (real_path, file_name, file_type)
628+
existing = self._user_content_from_addl.get(file_name)
629+
if existing is None:
630+
self._user_content_from_addl[file_name] = real_path
631+
elif existing != real_path:
632+
raise NonUniqueFileNameToFilePathMappingException(
633+
file_name, [existing, real_path]
634+
)
635+
else:
636+
return None # Already recorded for this arcname
617637
else:
618638
raise ValueError(f"Unknown file type: {file_type}")
619639
return path_tuple
@@ -675,15 +695,20 @@ def _add_tuple(path_tuple):
675695
_add_tuple(path_tuple)
676696

677697
def _user_code_tuples(self):
698+
# Track arcnames yielded by the directory walker so we can detect overlap
699+
# with USER_CONTENT files contributed via add_to_package hooks.
700+
seen_arcnames = set()
678701
if R.use_r():
679702
# the R working directory
680703
self._user_flow_dir = R.working_dir()
681704
for path_tuple in walk(
682705
"%s/" % R.working_dir(), file_filter=self._user_code_filter
683706
):
707+
seen_arcnames.add(path_tuple[1])
684708
yield path_tuple
685709
# the R package
686710
for path_tuple in R.package_paths():
711+
seen_arcnames.add(path_tuple[1])
687712
yield path_tuple
688713
else:
689714
# the user's working directory
@@ -694,10 +719,17 @@ def _user_code_tuples(self):
694719
file_filter=self._user_code_filter,
695720
exclude_tl_dirs=self._exclude_tl_dirs,
696721
):
697-
# TODO: This is where we will check if the file is already included
698-
# in the mfcontent portion
722+
seen_arcnames.add(path_tuple[1])
699723
yield path_tuple
700724

725+
# Emit USER_CONTENT files contributed by decorators/mutators that were not
726+
# already picked up by the directory walker (either because they live
727+
# outside the flow directory or were filtered out by the suffix/user filter).
728+
for arcname, file_path in self._user_content_from_addl.items():
729+
if arcname in seen_arcnames:
730+
continue
731+
yield (file_path, arcname)
732+
701733
def _make(self):
702734
backend = self._backend()
703735
with backend.create() as archive:
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
"""
2+
Example FlowMutator that extends the set of file suffixes included in the code
3+
package — mirrors the ``--package-suffixes`` CLI option as a decorator.
4+
5+
By default Metaflow's packaging walks the flow directory and includes files
6+
whose suffixes are in ``DEFAULT_PACKAGE_SUFFIXES`` (``.py,.R,.RDS``) plus
7+
whatever was passed via ``--package-suffixes``. This mutator lets a flow
8+
author declare additional suffixes directly on the flow class::
9+
10+
from metaflow import FlowSpec, step
11+
from metaflow.plugins.package_suffixes_mutator import package_suffixes
12+
13+
@package_suffixes([".yaml", ".json"])
14+
class MyFlow(FlowSpec):
15+
@step
16+
def start(self):
17+
...
18+
19+
The mutator walks the flow directory itself and yields every file with a
20+
matching suffix as ``USER_CONTENT``. The packaging layer deduplicates against
21+
the files that the default walker already picked up, so files that are already
22+
included (e.g. ``.py`` files) are not added twice.
23+
"""
24+
25+
import inspect
26+
import os
27+
from typing import List, Optional, Union
28+
29+
from metaflow.packaging_sys import ContentType
30+
from metaflow.packaging_sys.utils import walk
31+
from metaflow.user_decorators.user_flow_decorator import FlowMutator
32+
33+
34+
class package_suffixes(FlowMutator):
35+
"""Include additional file suffixes in the code package.
36+
37+
Parameters
38+
----------
39+
suffixes : list of str or comma-separated str
40+
Additional file suffixes to include (e.g. ``[".yaml", ".json"]`` or
41+
``".yaml,.json"``). Leading dots are optional; a suffix without a
42+
leading dot is treated as an extension (``yaml`` → ``.yaml``).
43+
"""
44+
45+
def init(self, suffixes: Union[List[str], str]):
46+
if isinstance(suffixes, str):
47+
suffixes = [s.strip() for s in suffixes.split(",") if s.strip()]
48+
self._suffixes = tuple(
49+
(s if s.startswith(".") else "." + s).lower() for s in suffixes
50+
)
51+
52+
def add_to_package(self):
53+
if not self._suffixes:
54+
return
55+
56+
try:
57+
flow_file = inspect.getfile(self._flow_cls)
58+
except (TypeError, OSError):
59+
return
60+
flow_dir = os.path.dirname(os.path.abspath(flow_file))
61+
if not flow_dir or not os.path.isdir(flow_dir):
62+
return
63+
64+
def _filter(fname: str) -> bool:
65+
lname = fname.lower()
66+
return any(lname.endswith(sfx) for sfx in self._suffixes)
67+
68+
for file_path, arcname in walk(flow_dir + os.sep, file_filter=_filter):
69+
yield (file_path, arcname, ContentType.USER_CONTENT)

test/unit/test_add_to_package.py

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,19 @@ def _make_deco(tuples):
5757
return deco
5858

5959

60-
def _call_add_addl_files(flow, environment, mfcontent):
61-
"""Call _add_addl_files on a bare MetaflowPackage instance."""
60+
def _build_pkg(flow, environment, mfcontent):
61+
"""Build a bare MetaflowPackage instance with minimal state."""
6262
pkg = object.__new__(MetaflowPackage)
6363
pkg._flow = flow
6464
pkg._environment = environment
6565
pkg._mfcontent = mfcontent
66+
pkg._user_content_from_addl = {}
67+
return pkg
68+
69+
70+
def _call_add_addl_files(flow, environment, mfcontent):
71+
"""Call _add_addl_files on a bare MetaflowPackage instance."""
72+
pkg = _build_pkg(flow, environment, mfcontent)
6673
pkg._add_addl_files()
6774
return mfcontent
6875

@@ -204,3 +211,54 @@ def record():
204211
)
205212
_call_add_addl_files(flow, _make_environment(), _make_mfcontent())
206213
assert call_order == ["flow_deco", "step_deco"]
214+
215+
216+
def test_user_content_recorded():
217+
"""USER_CONTENT tuples are recorded in _user_content_from_addl (not sent
218+
to _mfcontent which handles code/other files only)."""
219+
with tempfile.NamedTemporaryFile(suffix=".py") as f:
220+
deco = _make_deco([(f.name, "extra.py", ContentType.USER_CONTENT)])
221+
flow = _make_flow(
222+
steps=[_make_step()],
223+
flow_decorators={"my_deco": [deco]},
224+
)
225+
mfcontent = _make_mfcontent()
226+
pkg = _build_pkg(flow, _make_environment(), mfcontent)
227+
pkg._add_addl_files()
228+
229+
# Not routed to _mfcontent — USER_CONTENT is packaged alongside user code.
230+
mfcontent.add_code_file.assert_not_called()
231+
mfcontent.add_other_file.assert_not_called()
232+
mfcontent.add_module.assert_not_called()
233+
234+
assert pkg._user_content_from_addl == {"extra.py": os.path.realpath(f.name)}
235+
236+
237+
def test_user_content_duplicate_same_path_dedup():
238+
"""Same USER_CONTENT arcname with the same path from two decorators: dedup."""
239+
with tempfile.NamedTemporaryFile(suffix=".py") as f:
240+
d1 = _make_deco([(f.name, "shared.py", ContentType.USER_CONTENT)])
241+
d2 = _make_deco([(f.name, "shared.py", ContentType.USER_CONTENT)])
242+
flow = _make_flow(
243+
steps=[_make_step()],
244+
flow_decorators={"fd": [d1, d2]},
245+
)
246+
pkg = _build_pkg(flow, _make_environment(), _make_mfcontent())
247+
pkg._add_addl_files()
248+
assert pkg._user_content_from_addl == {"shared.py": os.path.realpath(f.name)}
249+
250+
251+
def test_user_content_duplicate_different_path_raises():
252+
"""Same USER_CONTENT arcname with different paths raises the usual exception."""
253+
with tempfile.NamedTemporaryFile(suffix=".py") as f1, tempfile.NamedTemporaryFile(
254+
suffix=".py"
255+
) as f2:
256+
d1 = _make_deco([(f1.name, "shared.py", ContentType.USER_CONTENT)])
257+
d2 = _make_deco([(f2.name, "shared.py", ContentType.USER_CONTENT)])
258+
flow = _make_flow(
259+
steps=[_make_step()],
260+
flow_decorators={"fd": [d1, d2]},
261+
)
262+
with pytest.raises(NonUniqueFileNameToFilePathMappingException):
263+
pkg = _build_pkg(flow, _make_environment(), _make_mfcontent())
264+
pkg._add_addl_files()
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
"""Tests for the ``package_suffixes`` example FlowMutator."""
2+
3+
import os
4+
import tempfile
5+
from unittest import mock
6+
7+
from metaflow.packaging_sys import ContentType
8+
from metaflow.plugins.package_suffixes_mutator import package_suffixes
9+
10+
11+
def _make_flow(tmpdir):
12+
"""Write a minimal flow.py and a few sibling files; return the flow file."""
13+
flow_file = os.path.join(tmpdir, "flow.py")
14+
with open(flow_file, "w") as f:
15+
f.write("# dummy flow\n")
16+
with open(os.path.join(tmpdir, "config.yaml"), "w") as f:
17+
f.write("a: 1\n")
18+
with open(os.path.join(tmpdir, "data.json"), "w") as f:
19+
f.write("{}\n")
20+
os.makedirs(os.path.join(tmpdir, "sub"))
21+
with open(os.path.join(tmpdir, "sub", "nested.yaml"), "w") as f:
22+
f.write("b: 2\n")
23+
with open(os.path.join(tmpdir, "ignored.txt"), "w") as f:
24+
f.write("ignored\n")
25+
return flow_file
26+
27+
28+
def _build_mutator(suffixes, flow_file):
29+
m = package_suffixes.__new__(package_suffixes)
30+
# We only need _flow_cls for inspect.getfile(); patch that directly instead
31+
# of constructing a real class.
32+
m._flow_cls = mock.Mock()
33+
m.init(suffixes)
34+
return m
35+
36+
37+
def test_init_list_form():
38+
m = package_suffixes.__new__(package_suffixes)
39+
m.init([".yaml", "json"])
40+
assert m._suffixes == (".yaml", ".json")
41+
42+
43+
def test_init_string_form():
44+
m = package_suffixes.__new__(package_suffixes)
45+
m.init(".yaml,json, .txt")
46+
assert m._suffixes == (".yaml", ".json", ".txt")
47+
48+
49+
def test_add_to_package_yields_matching_files():
50+
with tempfile.TemporaryDirectory() as tmp:
51+
flow_file = _make_flow(tmp)
52+
m = _build_mutator([".yaml", ".json"], flow_file)
53+
with mock.patch("inspect.getfile", return_value=flow_file):
54+
results = list(m.add_to_package())
55+
56+
# All tuples are USER_CONTENT
57+
assert all(t[2] == ContentType.USER_CONTENT for t in results)
58+
59+
arcnames = {t[1] for t in results}
60+
# walk() yields arcnames relative to the flow directory (no flow dir
61+
# basename prefix), matching the convention used by _user_code_tuples.
62+
assert "config.yaml" in arcnames
63+
assert "data.json" in arcnames
64+
assert os.path.join("sub", "nested.yaml") in arcnames
65+
# Non-matching files are not included.
66+
assert "ignored.txt" not in arcnames
67+
# flow.py is a .py file — not part of the configured extra suffixes.
68+
assert "flow.py" not in arcnames
69+
70+
71+
def test_add_to_package_empty_suffixes_yields_nothing():
72+
with tempfile.TemporaryDirectory() as tmp:
73+
flow_file = _make_flow(tmp)
74+
m = _build_mutator([], flow_file)
75+
with mock.patch("inspect.getfile", return_value=flow_file):
76+
assert list(m.add_to_package()) == []

0 commit comments

Comments
 (0)