diff --git a/metaflow/cli.py b/metaflow/cli.py index ee779cda2b3..b5694ab1556 100644 --- a/metaflow/cli.py +++ b/metaflow/cli.py @@ -475,9 +475,18 @@ def start( # be raised. For resume, since we ignore those options, we ignore the error. raise ctx.obj.delayed_config_exception + # Initialize the phase early so it can be used in the mutators + # The phase is determined by which CLI subcommand is being invoked (e.g. "run" → LAUNCH, + # "step" → TASK, "batch" → TRAMPOLINE). + system_context._update(phase=_phase_from_cli_args(getattr(ctx, "saved_args", None))) + # Process config decorators (this is the pre_mutate phase for both flow mutators and # step mutators -- the mutate is called in init_step_decorators) + # Init all values in the flow mutators and then process them + for decorator in ctx.obj.flow._flow_mutators: + decorator.external_init() + new_cls = ctx.obj.flow._process_config_decorators(config_options) if new_cls: ctx.obj.flow = new_cls(use_cli=False) @@ -561,13 +570,7 @@ def start( ctx.obj.monitor.start() _system_monitor.init_system_monitor(ctx.obj.flow.name, ctx.obj.monitor) - # Populate the system context singleton for this process. The phase is - # determined by which CLI subcommand is being invoked (e.g. "run" → LAUNCH, - # "step" → TASK, "batch" → TRAMPOLINE). - saved_args = getattr(ctx, "saved_args", None) - phase = _phase_from_cli_args(saved_args) system_context._update( - phase=phase, flow=ctx.obj.flow, graph=ctx.obj.graph, environment=ctx.obj.environment, diff --git a/metaflow/debug.py b/metaflow/debug.py index 1462df2bb7a..34f34dff0e4 100644 --- a/metaflow/debug.py +++ b/metaflow/debug.py @@ -22,9 +22,11 @@ class Debug(object): def __init__(self): import metaflow.metaflow_config as config + have_debug_options = False for typ in config.DEBUG_OPTIONS: if getattr(config, "DEBUG_%s" % typ.upper()): op = partial(self.log, typ) + have_debug_options = True else: op = self.noop # use debug.$type_exec(args) to log command line for subprocesses @@ -32,6 +34,21 @@ def __init__(self): setattr(self, "%s_exec" % typ, op) # use the debug.$type flag to check if logging is enabled for $type setattr(self, typ, op != self.noop) + # In some environments (I'm looking at you Bazel), the path to the filename is + # super long and not very useful. We will print it once and truncate the rest. + # This is not 100% accurate as each package is in a separate directory so the + # prefix length may be a bit different but it's good enough and removes a lot + # of noise while also keeping the cost low (instead of having to figure out + # the prefix for each package) + self.prefix_len = 0 + if have_debug_options: + # Figure out the name of the current file and strip out everything before + # + self.prefix_len = len(inspect.stack()[0][1][: -len("metaflow/debug.py")]) + self.log( + "options", + "File prefix is: %s" % inspect.stack()[0][1][: self.prefix_len], + ) def log(self, typ, args): if is_stringish(args): @@ -40,7 +57,10 @@ def log(self, typ, args): s = " ".join(args) lineno = inspect.currentframe().f_back.f_lineno filename = inspect.stack()[1][1] - print("debug[%s %s:%s]: %s" % (typ, filename, lineno, s), file=sys.stderr) + print( + "debug[%s %s:%s]: %s" % (typ, filename[self.prefix_len :], lineno, s), + file=sys.stderr, + ) def __getattr__(self, name): # Small piece of code to get pyright and other linters to recognize that there diff --git a/metaflow/decorators.py b/metaflow/decorators.py index 54c41ef4834..0894b9acbeb 100644 --- a/metaflow/decorators.py +++ b/metaflow/decorators.py @@ -272,6 +272,41 @@ def get_top_level_options(self): """ return [] + def add_to_package(self): + """ + Called to add custom files needed by this flow decorator. This hook will be + called in the `MetaflowPackage` class where metaflow compiles the code package + tarball. This hook can return one of two things (the first is for backwards + compatibility -- move to the second): + - a generator yielding a tuple of `(file_path, arcname)` to add files to + the code package. `file_path` is the path to the file on the local filesystem + and `arcname` is the path relative to the packaged code. + - a generator yielding a tuple of `(content, arcname, type)` where: + - type is one of + ContentType.{USER_CONTENT, CODE_CONTENT, MODULE_CONTENT, OTHER_CONTENT} + - for USER_CONTENT: + - the file will be included relative to the directory containing the + user's flow file. + - content: path to the file to include + - arcname: path relative to the directory containing the user's flow file + - for CODE_CONTENT: + - the file will be included relative to the code directory in the package. + This will be the directory containing `metaflow`. + - content: path to the file to include + - arcname: path relative to the code directory in the package + - for MODULE_CONTENT: + - the module will be added to the code package as a python module. It will + be accessible as usual (import ) + - content: name of the module + - arcname: None (ignored) + - for OTHER_CONTENT: + - the file will be included relative to any other configuration/metadata + files for the flow + - content: path to the file to include + - arcname: path relative to the config directory in the package + """ + return [] + # compare this to parameters.add_custom_parameters def add_decorator_options(cmd): diff --git a/metaflow/package/__init__.py b/metaflow/package/__init__.py index 26f3b14ed4a..94dd7c14b42 100644 --- a/metaflow/package/__init__.py +++ b/metaflow/package/__init__.py @@ -132,6 +132,12 @@ def _module_selector(m) -> bool: self._blob_url = None self._blob = None + # USER_CONTENT files contributed by decorators/mutators via add_to_package. + # Keyed by arcname (path relative to the flow directory) -> absolute file path. + # These are merged with the files discovered by walking the flow directory; + # duplicates (same arcname) are dropped to avoid conflicts. + self._user_content_from_addl: Dict[str, str] = {} + # Update package in the system context -- it will be available # in all hooks going forward including the ones called in the # thread that is used to create the package asynchronously. @@ -611,11 +617,33 @@ def _check_tuple(path_tuple): raise NonUniqueFileNameToFilePathMappingException( file_name, [deco_module_paths[file_name], file_path] ) + elif file_type == ContentType.USER_CONTENT: + # USER_CONTENT files will be merged with the files discovered by + # walking the flow directory. Track them here so we can: + # 1. Include them in the package even if they live outside the + # flow directory (or are excluded by the user_code_filter). + # 2. Avoid duplicating files already picked up by the walker. + real_path = os.path.realpath(path_tuple[0]) + path_tuple = (real_path, file_name, file_type) + existing = self._user_content_from_addl.get(file_name) + if existing is None: + self._user_content_from_addl[file_name] = real_path + elif existing != real_path: + raise NonUniqueFileNameToFilePathMappingException( + file_name, [existing, real_path] + ) + else: + return None # Already recorded for this arcname else: raise ValueError(f"Unknown file type: {file_type}") return path_tuple def _add_tuple(path_tuple): + # USER_CONTENT is intentionally NOT handled here: those files are + # packaged alongside the user's flow code (see _user_code_tuples) + # rather than under the mfcontent namespace, and are tracked in + # self._user_content_from_addl by _check_tuple above. mfcontent + # owns MODULE/CODE/OTHER only. file_path, file_name, file_type = path_tuple if file_type == ContentType.MODULE_CONTENT: # file_path is actually a module @@ -625,6 +653,16 @@ def _add_tuple(path_tuple): elif file_type == ContentType.OTHER_CONTENT: self._mfcontent.add_other_file(file_path, file_name) + # flow decorators + for decos in self._flow._flow_decorators.values(): + for deco in decos: + for path_tuple in deco.add_to_package(): + path_tuple = _check_tuple(path_tuple) + if path_tuple is None: + continue + _add_tuple(path_tuple) + + # step decorators for step in self._flow: for deco in step.decorators: for path_tuple in deco.add_to_package(): @@ -640,16 +678,42 @@ def _add_tuple(path_tuple): continue _add_tuple(path_tuple) + # flow mutators + for mutator in self._flow._flow_mutators: + for path_tuple in mutator.add_to_package(): + path_tuple = _check_tuple(path_tuple) + if path_tuple is None: + continue + _add_tuple(path_tuple) + + # step mutators (deduplicated across steps) + seen_step_mutators = set() + for step in self._flow: + for mutator in step.config_decorators: + if id(mutator) in seen_step_mutators: + continue + seen_step_mutators.add(id(mutator)) + for path_tuple in mutator.add_to_package(): + path_tuple = _check_tuple(path_tuple) + if path_tuple is None: + continue + _add_tuple(path_tuple) + def _user_code_tuples(self): + # Track arcnames yielded by the directory walker so we can detect overlap + # with USER_CONTENT files contributed via add_to_package hooks. + seen_arcnames = set() if R.use_r(): # the R working directory self._user_flow_dir = R.working_dir() for path_tuple in walk( "%s/" % R.working_dir(), file_filter=self._user_code_filter ): + seen_arcnames.add(path_tuple[1]) yield path_tuple # the R package for path_tuple in R.package_paths(): + seen_arcnames.add(path_tuple[1]) yield path_tuple else: # the user's working directory @@ -660,10 +724,17 @@ def _user_code_tuples(self): file_filter=self._user_code_filter, exclude_tl_dirs=self._exclude_tl_dirs, ): - # TODO: This is where we will check if the file is already included - # in the mfcontent portion + seen_arcnames.add(path_tuple[1]) yield path_tuple + # Emit USER_CONTENT files contributed by decorators/mutators that were not + # already picked up by the directory walker (either because they live + # outside the flow directory or were filtered out by the suffix/user filter). + for arcname, file_path in self._user_content_from_addl.items(): + if arcname in seen_arcnames: + continue + yield (file_path, arcname) + def _make(self): backend = self._backend() with backend.create() as archive: diff --git a/metaflow/plugins/package_suffixes_mutator.py b/metaflow/plugins/package_suffixes_mutator.py new file mode 100644 index 00000000000..4e6e733b0df --- /dev/null +++ b/metaflow/plugins/package_suffixes_mutator.py @@ -0,0 +1,69 @@ +""" +Example FlowMutator that extends the set of file suffixes included in the code +package — mirrors the ``--package-suffixes`` CLI option as a decorator. + +By default Metaflow's packaging walks the flow directory and includes files +whose suffixes are in ``DEFAULT_PACKAGE_SUFFIXES`` (``.py,.R,.RDS``) plus +whatever was passed via ``--package-suffixes``. This mutator lets a flow +author declare additional suffixes directly on the flow class:: + + from metaflow import FlowSpec, step + from metaflow.plugins.package_suffixes_mutator import package_suffixes + + @package_suffixes([".yaml", ".json"]) + class MyFlow(FlowSpec): + @step + def start(self): + ... + +The mutator walks the flow directory itself and yields every file with a +matching suffix as ``USER_CONTENT``. The packaging layer deduplicates against +the files that the default walker already picked up, so files that are already +included (e.g. ``.py`` files) are not added twice. +""" + +import inspect +import os +from typing import List, Union + +from metaflow.packaging_sys import ContentType +from metaflow.packaging_sys.utils import walk +from metaflow.user_decorators.user_flow_decorator import FlowMutator + + +class package_suffixes(FlowMutator): + """Include additional file suffixes in the code package. + + Parameters + ---------- + suffixes : list of str or comma-separated str + Additional file suffixes to include (e.g. ``[".yaml", ".json"]`` or + ``".yaml,.json"``). Leading dots are optional; a suffix without a + leading dot is treated as an extension (``yaml`` → ``.yaml``). + """ + + def init(self, suffixes: Union[List[str], str]): + if isinstance(suffixes, str): + suffixes = [s.strip() for s in suffixes.split(",") if s.strip()] + self._suffixes = tuple( + (s if s.startswith(".") else "." + s).lower() for s in suffixes + ) + + def add_to_package(self): + if not self._suffixes: + return + + try: + flow_file = inspect.getfile(self._flow_cls) + except (TypeError, OSError): + return + flow_dir = os.path.dirname(os.path.abspath(flow_file)) + if not flow_dir or not os.path.isdir(flow_dir): + return + + def _filter(fname: str) -> bool: + lname = fname.lower() + return any(lname.endswith(sfx) for sfx in self._suffixes) + + for file_path, arcname in walk(flow_dir + os.sep, file_filter=_filter): + yield (file_path, arcname, ContentType.USER_CONTENT) diff --git a/metaflow/user_decorators/user_flow_decorator.py b/metaflow/user_decorators/user_flow_decorator.py index 46834047a2b..5221cc0f971 100644 --- a/metaflow/user_decorators/user_flow_decorator.py +++ b/metaflow/user_decorators/user_flow_decorator.py @@ -310,3 +310,38 @@ def mutate( A representation of this flow """ return None + + def add_to_package(self): + """ + Called to add custom files needed by this flow mutator. This hook will be + called in the `MetaflowPackage` class where metaflow compiles the code package + tarball. This hook can return one of two things (the first is for backwards + compatibility -- generally use the second when you implement your mutator): + - a generator yielding a tuple of `(file_path, arcname)` to add files to + the code package. `file_path` is the path to the file on the local filesystem + and `arcname` is the path relative to the packaged code. + - a generator yielding a tuple of `(content, arcname, type)` where: + - type is one of + ContentType.{USER_CONTENT, CODE_CONTENT, MODULE_CONTENT, OTHER_CONTENT} + - for USER_CONTENT: + - the file will be included relative to the directory containing the + user's flow file. + - content: path to the file to include + - arcname: path relative to the directory containing the user's flow file + - for CODE_CONTENT: + - the file will be included relative to the code directory in the package. + This will be the directory containing `metaflow`. + - content: path to the file to include + - arcname: path relative to the code directory in the package + - for MODULE_CONTENT: + - the module will be added to the code package as a python module. It will + be accessible as usual (import ) + - content: name of the module + - arcname: None (ignored) + - for OTHER_CONTENT: + - the file will be included relative to any other configuration/metadata + files for the flow + - content: path to the file to include + - arcname: path relative to the config directory in the package + """ + return [] diff --git a/metaflow/user_decorators/user_step_decorator.py b/metaflow/user_decorators/user_step_decorator.py index 7985ae4320a..ff11e692491 100644 --- a/metaflow/user_decorators/user_step_decorator.py +++ b/metaflow/user_decorators/user_step_decorator.py @@ -788,3 +788,38 @@ def mutate( A representation of this step """ return None + + def add_to_package(self): + """ + Called to add custom files needed by this step mutator. This hook will be + called in the `MetaflowPackage` class where metaflow compiles the code package + tarball. This hook can return one of two things (the first is for backwards + compatibility -- move to the second): + - a generator yielding a tuple of `(file_path, arcname)` to add files to + the code package. `file_path` is the path to the file on the local filesystem + and `arcname` is the path relative to the packaged code. + - a generator yielding a tuple of `(content, arcname, type)` where: + - type is one of + ContentType.{USER_CONTENT, CODE_CONTENT, MODULE_CONTENT, OTHER_CONTENT} + - for USER_CONTENT: + - the file will be included relative to the directory containing the + user's flow file. + - content: path to the file to include + - arcname: path relative to the directory containing the user's flow file + - for CODE_CONTENT: + - the file will be included relative to the code directory in the package. + This will be the directory containing `metaflow`. + - content: path to the file to include + - arcname: path relative to the code directory in the package + - for MODULE_CONTENT: + - the module will be added to the code package as a python module. It will + be accessible as usual (import ) + - content: name of the module + - arcname: None (ignored) + - for OTHER_CONTENT: + - the file will be included relative to any other configuration/metadata + files for the flow + - content: path to the file to include + - arcname: path relative to the config directory in the package + """ + return [] diff --git a/test/unit/test_add_to_package.py b/test/unit/test_add_to_package.py new file mode 100644 index 00000000000..e6106bc1b32 --- /dev/null +++ b/test/unit/test_add_to_package.py @@ -0,0 +1,445 @@ +"""Tests for _add_addl_files in MetaflowPackage. + +Exercises the add_to_package hooks on FlowDecorators, FlowMutators, +StepDecorators, and StepMutators, as well as deduplication and error handling. +""" + +import os +import sys +import tempfile +from types import ModuleType +from unittest import mock +from unittest.mock import MagicMock, call + +import pytest + +from metaflow.package import ( + MetaflowPackage, + NonUniqueFileNameToFilePathMappingException, +) +from metaflow.packaging_sys import ContentType + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_step(decorators=None, config_decorators=None): + step = MagicMock() + step.decorators = decorators or [] + step.config_decorators = config_decorators or [] + return step + + +def _make_flow(steps, flow_decorators=None, flow_mutators=None): + flow = MagicMock() + # The flow may be iterated multiple times (step decos + step mutators), + # so return a fresh iterator each time. + flow.__iter__ = lambda self: iter(steps) + flow._flow_decorators = flow_decorators or {} + flow._flow_mutators = flow_mutators or [] + return flow + + +def _make_environment(tuples=None): + env = MagicMock() + env.add_to_package.return_value = tuples or [] + return env + + +def _make_mfcontent(): + return MagicMock() + + +def _make_deco(tuples): + """Create a mock decorator-like object with an add_to_package method.""" + deco = MagicMock() + deco.add_to_package.return_value = tuples + return deco + + +def _build_pkg(flow, environment, mfcontent): + """Build a bare MetaflowPackage instance with minimal state.""" + pkg = object.__new__(MetaflowPackage) + pkg._flow = flow + pkg._environment = environment + pkg._mfcontent = mfcontent + pkg._user_content_from_addl = {} + return pkg + + +def _call_add_addl_files(flow, environment, mfcontent): + """Call _add_addl_files on a bare MetaflowPackage instance.""" + pkg = _build_pkg(flow, environment, mfcontent) + pkg._add_addl_files() + return mfcontent + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +def test_flow_decorator_add_to_package(): + """Flow decorator's add_to_package files are added.""" + with tempfile.NamedTemporaryFile(suffix=".py") as f: + deco = _make_deco([(f.name, "flow_deco_file.py", ContentType.CODE_CONTENT)]) + flow = _make_flow( + steps=[_make_step()], + flow_decorators={"my_deco": [deco]}, + ) + mc = _call_add_addl_files(flow, _make_environment(), _make_mfcontent()) + mc.add_code_file.assert_called_once_with( + os.path.realpath(f.name), "flow_deco_file.py" + ) + + +def test_flow_mutator_add_to_package_module(): + """Flow mutator's add_to_package with MODULE_CONTENT calls add_module.""" + import json + + mutator = _make_deco([(json, None, ContentType.MODULE_CONTENT)]) + flow = _make_flow(steps=[_make_step()], flow_mutators=[mutator]) + mc = _call_add_addl_files(flow, _make_environment(), _make_mfcontent()) + mc.add_module.assert_called_once_with(json) + + +def test_step_mutator_deduplicated_across_steps(): + """Same StepMutator instance on two steps: add_to_package called once.""" + with tempfile.NamedTemporaryFile(suffix=".py") as f: + mutator = _make_deco([(f.name, "shared.py", ContentType.CODE_CONTENT)]) + step1 = _make_step(config_decorators=[mutator]) + step2 = _make_step(config_decorators=[mutator]) + flow = _make_flow(steps=[step1, step2]) + mc = _call_add_addl_files(flow, _make_environment(), _make_mfcontent()) + assert mutator.add_to_package.call_count == 1 + mc.add_code_file.assert_called_once() + + +def test_step_mutator_distinct_instances(): + """Two different StepMutator instances: both called.""" + with tempfile.NamedTemporaryFile(suffix=".py") as f1, tempfile.NamedTemporaryFile( + suffix=".py" + ) as f2: + m1 = _make_deco([(f1.name, "file1.py", ContentType.CODE_CONTENT)]) + m2 = _make_deco([(f2.name, "file2.py", ContentType.CODE_CONTENT)]) + step = _make_step(config_decorators=[m1, m2]) + flow = _make_flow(steps=[step]) + mc = _call_add_addl_files(flow, _make_environment(), _make_mfcontent()) + assert m1.add_to_package.call_count == 1 + assert m2.add_to_package.call_count == 1 + assert mc.add_code_file.call_count == 2 + + +def test_legacy_two_tuple_defaults_to_code_content(): + """A 2-tuple (file_path, arcname) is treated as CODE_CONTENT.""" + with tempfile.NamedTemporaryFile(suffix=".py") as f: + deco = _make_deco([(f.name, "legacy.py")]) + step = _make_step(decorators=[deco]) + flow = _make_flow(steps=[step]) + mc = _call_add_addl_files(flow, _make_environment(), _make_mfcontent()) + mc.add_code_file.assert_called_once_with(os.path.realpath(f.name), "legacy.py") + + +def test_non_unique_filename_raises(): + """Different file paths for the same arcname raises an exception.""" + with tempfile.NamedTemporaryFile(suffix=".py") as f1, tempfile.NamedTemporaryFile( + suffix=".py" + ) as f2: + d1 = _make_deco([(f1.name, "same_name.py", ContentType.CODE_CONTENT)]) + d2 = _make_deco([(f2.name, "same_name.py", ContentType.CODE_CONTENT)]) + step = _make_step(decorators=[d1, d2]) + flow = _make_flow(steps=[step]) + with pytest.raises(NonUniqueFileNameToFilePathMappingException): + _call_add_addl_files(flow, _make_environment(), _make_mfcontent()) + + +def test_module_content_deduplicated(): + """Same module returned by two decorators: add_module called once.""" + import json + + d1 = _make_deco([(json, None, ContentType.MODULE_CONTENT)]) + d2 = _make_deco([(json, None, ContentType.MODULE_CONTENT)]) + step = _make_step(decorators=[d1, d2]) + flow = _make_flow(steps=[step]) + mc = _call_add_addl_files(flow, _make_environment(), _make_mfcontent()) + mc.add_module.assert_called_once_with(json) + + +def test_other_content_type(): + """OTHER_CONTENT files are passed to add_other_file.""" + with tempfile.NamedTemporaryFile(suffix=".yaml") as f: + deco = _make_deco([(f.name, "config.yaml", ContentType.OTHER_CONTENT)]) + step = _make_step(decorators=[deco]) + flow = _make_flow(steps=[step]) + mc = _call_add_addl_files(flow, _make_environment(), _make_mfcontent()) + mc.add_other_file.assert_called_once_with( + os.path.realpath(f.name), "config.yaml" + ) + + +def test_ordering_flow_decorators_before_step_decorators(): + """Flow decorators are processed before step decorators. + + We verify the flow decorator's add_to_package is called before the step + decorator's by checking call order on a shared mock recorder. + """ + call_order = [] + + def make_recording_deco(label, tuples): + deco = MagicMock() + + def record(): + call_order.append(label) + return tuples + + deco.add_to_package = record + return deco + + with tempfile.NamedTemporaryFile(suffix=".py") as f1, tempfile.NamedTemporaryFile( + suffix=".py" + ) as f2: + flow_deco = make_recording_deco( + "flow_deco", [(f1.name, "flow_file.py", ContentType.CODE_CONTENT)] + ) + step_deco = make_recording_deco( + "step_deco", [(f2.name, "step_file.py", ContentType.CODE_CONTENT)] + ) + step = _make_step(decorators=[step_deco]) + flow = _make_flow( + steps=[step], + flow_decorators={"fd": [flow_deco]}, + ) + _call_add_addl_files(flow, _make_environment(), _make_mfcontent()) + assert call_order == ["flow_deco", "step_deco"] + + +def test_user_content_recorded(): + """USER_CONTENT tuples are recorded in _user_content_from_addl (not sent + to _mfcontent which handles code/other files only).""" + with tempfile.NamedTemporaryFile(suffix=".py") as f: + deco = _make_deco([(f.name, "extra.py", ContentType.USER_CONTENT)]) + flow = _make_flow( + steps=[_make_step()], + flow_decorators={"my_deco": [deco]}, + ) + mfcontent = _make_mfcontent() + pkg = _build_pkg(flow, _make_environment(), mfcontent) + pkg._add_addl_files() + + # Not routed to _mfcontent — USER_CONTENT is packaged alongside user code. + mfcontent.add_code_file.assert_not_called() + mfcontent.add_other_file.assert_not_called() + mfcontent.add_module.assert_not_called() + + assert pkg._user_content_from_addl == {"extra.py": os.path.realpath(f.name)} + + +def test_user_content_duplicate_same_path_dedup(): + """Same USER_CONTENT arcname with the same path from two decorators: dedup.""" + with tempfile.NamedTemporaryFile(suffix=".py") as f: + d1 = _make_deco([(f.name, "shared.py", ContentType.USER_CONTENT)]) + d2 = _make_deco([(f.name, "shared.py", ContentType.USER_CONTENT)]) + flow = _make_flow( + steps=[_make_step()], + flow_decorators={"fd": [d1, d2]}, + ) + pkg = _build_pkg(flow, _make_environment(), _make_mfcontent()) + pkg._add_addl_files() + assert pkg._user_content_from_addl == {"shared.py": os.path.realpath(f.name)} + + +def test_user_content_duplicate_different_path_raises(): + """Same USER_CONTENT arcname with different paths raises the usual exception.""" + with tempfile.NamedTemporaryFile(suffix=".py") as f1, tempfile.NamedTemporaryFile( + suffix=".py" + ) as f2: + d1 = _make_deco([(f1.name, "shared.py", ContentType.USER_CONTENT)]) + d2 = _make_deco([(f2.name, "shared.py", ContentType.USER_CONTENT)]) + flow = _make_flow( + steps=[_make_step()], + flow_decorators={"fd": [d1, d2]}, + ) + with pytest.raises(NonUniqueFileNameToFilePathMappingException): + pkg = _build_pkg(flow, _make_environment(), _make_mfcontent()) + pkg._add_addl_files() + + +# --------------------------------------------------------------------------- +# _user_code_tuples — merge of USER_CONTENT with the flow-dir walker (DEF-010) +# --------------------------------------------------------------------------- + + +def _build_pkg_for_user_tuples(tmpdir, user_content_from_addl=None): + """Build a minimal MetaflowPackage with just enough state for + _user_code_tuples(): a flow dir, filter, exclude list, and the + dict of USER_CONTENT files produced by add_to_package.""" + pkg = object.__new__(MetaflowPackage) + pkg._user_code_filter = lambda _name: True + pkg._exclude_tl_dirs = [] + pkg._user_content_from_addl = user_content_from_addl or {} + pkg._user_flow_dir = None + return pkg + + +def _fake_flow_dir(tmpdir, *relpaths): + """Create `flow.py` plus the given relative paths in tmpdir. Returns the + absolute flow.py path.""" + flow_file = os.path.join(tmpdir, "flow.py") + with open(flow_file, "w") as f: + f.write("# flow\n") + for rel in relpaths: + p = os.path.join(tmpdir, rel) + os.makedirs(os.path.dirname(p), exist_ok=True) + with open(p, "w") as f: + f.write("x\n") + return flow_file + + +def test_user_code_tuples_emits_addl_user_content_not_in_walker(): + """A USER_CONTENT file outside the walker's output gets emitted.""" + with tempfile.TemporaryDirectory() as flow_dir, tempfile.NamedTemporaryFile( + suffix=".cfg", delete=False + ) as external: + try: + _fake_flow_dir(flow_dir, "code.py") + pkg = _build_pkg_for_user_tuples( + flow_dir, + user_content_from_addl={"extra.cfg": external.name}, + ) + with mock.patch.object( + sys, "argv", [os.path.join(flow_dir, "flow.py")] + ), mock.patch("metaflow.R.use_r", return_value=False): + tuples = list(pkg._user_code_tuples()) + by_arc = {arc: path for path, arc in tuples} + # walker picked up code.py and flow.py from the flow dir + assert "code.py" in by_arc + assert "flow.py" in by_arc + # external USER_CONTENT was emitted as well + assert by_arc["extra.cfg"] == external.name + finally: + os.unlink(external.name) + + +def test_user_code_tuples_skips_addl_when_walker_already_has_it(): + """USER_CONTENT with same arcname as a walker-yielded file is dropped.""" + with tempfile.TemporaryDirectory() as flow_dir: + _fake_flow_dir(flow_dir, "code.py") + walker_path = os.path.join(flow_dir, "code.py") + # A *different* absolute path but same arcname. The walker wins. + with tempfile.NamedTemporaryFile(suffix=".py", delete=False) as shadow: + try: + pkg = _build_pkg_for_user_tuples( + flow_dir, + user_content_from_addl={"code.py": shadow.name}, + ) + with mock.patch.object( + sys, "argv", [os.path.join(flow_dir, "flow.py")] + ), mock.patch("metaflow.R.use_r", return_value=False): + tuples = list(pkg._user_code_tuples()) + # code.py appears exactly once, with the walker's path + code_py = [t for t in tuples if t[1] == "code.py"] + assert len(code_py) == 1 + assert code_py[0][0] == walker_path + # shadow is never emitted + assert not any(t[0] == shadow.name for t in tuples) + finally: + os.unlink(shadow.name) + + +def test_user_code_tuples_respects_user_code_filter(): + """USER_CONTENT bypasses the suffix/user filter applied to the walker. + + The walker's filter excludes .yaml, but a USER_CONTENT tuple with a .yaml + arcname must still be emitted — this is a primary reason USER_CONTENT + exists. + """ + with tempfile.TemporaryDirectory() as flow_dir: + _fake_flow_dir(flow_dir, "conf.yaml") + yaml_path = os.path.join(flow_dir, "conf.yaml") + pkg = _build_pkg_for_user_tuples( + flow_dir, + user_content_from_addl={"conf.yaml": yaml_path}, + ) + # Restrict walker to .py only — .yaml should not come from the walker. + pkg._user_code_filter = lambda fname: fname.lower().endswith(".py") + with mock.patch.object( + sys, "argv", [os.path.join(flow_dir, "flow.py")] + ), mock.patch("metaflow.R.use_r", return_value=False): + tuples = list(pkg._user_code_tuples()) + by_arc = {arc: path for path, arc in tuples} + assert "conf.yaml" in by_arc + assert by_arc["conf.yaml"] == yaml_path + + +# --------------------------------------------------------------------------- +# Integration: _add_addl_files + _user_code_tuples together (DEF-011) +# --------------------------------------------------------------------------- + + +def test_integration_add_addl_then_user_code_tuples_dedupes_by_arcname(): + """End-to-end: decorator emits USER_CONTENT for a file already in the + flow dir; the final user tuples contain it only once with the walker's + path. + """ + with tempfile.TemporaryDirectory() as flow_dir: + _fake_flow_dir(flow_dir, "code.py") + walker_path = os.path.join(flow_dir, "code.py") + + deco = _make_deco([(walker_path, "code.py", ContentType.USER_CONTENT)]) + flow = _make_flow( + steps=[_make_step()], + flow_decorators={"fd": [deco]}, + ) + pkg = _build_pkg(flow, _make_environment(), _make_mfcontent()) + pkg._user_code_filter = lambda _: True + pkg._exclude_tl_dirs = [] + pkg._user_flow_dir = None + + # Phase 1: populate _user_content_from_addl from add_to_package hooks. + pkg._add_addl_files() + assert pkg._user_content_from_addl == {"code.py": os.path.realpath(walker_path)} + + # Phase 2: walk the flow dir and merge addl USER_CONTENT. + with mock.patch.object( + sys, "argv", [os.path.join(flow_dir, "flow.py")] + ), mock.patch("metaflow.R.use_r", return_value=False): + tuples = list(pkg._user_code_tuples()) + + # code.py is present exactly once, walker's copy wins (by arcname dedup). + code_py = [t for t in tuples if t[1] == "code.py"] + assert len(code_py) == 1 + + +def test_integration_add_addl_contributes_file_outside_flow_dir(): + """End-to-end: decorator emits USER_CONTENT for a file that is NOT in the + flow dir; it ends up in the user tuples via the merge path. + """ + with tempfile.TemporaryDirectory() as flow_dir, tempfile.NamedTemporaryFile( + suffix=".cfg", delete=False + ) as external: + try: + _fake_flow_dir(flow_dir) + + deco = _make_deco( + [(external.name, "external.cfg", ContentType.USER_CONTENT)] + ) + flow = _make_flow( + steps=[_make_step()], + flow_decorators={"fd": [deco]}, + ) + pkg = _build_pkg(flow, _make_environment(), _make_mfcontent()) + pkg._user_code_filter = lambda _: True + pkg._exclude_tl_dirs = [] + pkg._user_flow_dir = None + + pkg._add_addl_files() + with mock.patch.object( + sys, "argv", [os.path.join(flow_dir, "flow.py")] + ), mock.patch("metaflow.R.use_r", return_value=False): + tuples = list(pkg._user_code_tuples()) + by_arc = {arc: path for path, arc in tuples} + assert by_arc["external.cfg"] == os.path.realpath(external.name) + finally: + os.unlink(external.name) diff --git a/test/unit/test_package_suffixes_mutator.py b/test/unit/test_package_suffixes_mutator.py new file mode 100644 index 00000000000..cd0085df619 --- /dev/null +++ b/test/unit/test_package_suffixes_mutator.py @@ -0,0 +1,76 @@ +"""Tests for the ``package_suffixes`` example FlowMutator.""" + +import os +import tempfile +from unittest import mock + +from metaflow.packaging_sys import ContentType +from metaflow.plugins.package_suffixes_mutator import package_suffixes + + +def _make_flow(tmpdir): + """Write a minimal flow.py and a few sibling files; return the flow file.""" + flow_file = os.path.join(tmpdir, "flow.py") + with open(flow_file, "w") as f: + f.write("# dummy flow\n") + with open(os.path.join(tmpdir, "config.yaml"), "w") as f: + f.write("a: 1\n") + with open(os.path.join(tmpdir, "data.json"), "w") as f: + f.write("{}\n") + os.makedirs(os.path.join(tmpdir, "sub")) + with open(os.path.join(tmpdir, "sub", "nested.yaml"), "w") as f: + f.write("b: 2\n") + with open(os.path.join(tmpdir, "ignored.txt"), "w") as f: + f.write("ignored\n") + return flow_file + + +def _build_mutator(suffixes, flow_file): + m = package_suffixes.__new__(package_suffixes) + # We only need _flow_cls for inspect.getfile(); patch that directly instead + # of constructing a real class. + m._flow_cls = mock.Mock() + m.init(suffixes) + return m + + +def test_init_list_form(): + m = package_suffixes.__new__(package_suffixes) + m.init([".yaml", "json"]) + assert m._suffixes == (".yaml", ".json") + + +def test_init_string_form(): + m = package_suffixes.__new__(package_suffixes) + m.init(".yaml,json, .txt") + assert m._suffixes == (".yaml", ".json", ".txt") + + +def test_add_to_package_yields_matching_files(): + with tempfile.TemporaryDirectory() as tmp: + flow_file = _make_flow(tmp) + m = _build_mutator([".yaml", ".json"], flow_file) + with mock.patch("inspect.getfile", return_value=flow_file): + results = list(m.add_to_package()) + + # All tuples are USER_CONTENT + assert all(t[2] == ContentType.USER_CONTENT for t in results) + + arcnames = {t[1] for t in results} + # walk() yields arcnames relative to the flow directory (no flow dir + # basename prefix), matching the convention used by _user_code_tuples. + assert "config.yaml" in arcnames + assert "data.json" in arcnames + assert os.path.join("sub", "nested.yaml") in arcnames + # Non-matching files are not included. + assert "ignored.txt" not in arcnames + # flow.py is a .py file — not part of the configured extra suffixes. + assert "flow.py" not in arcnames + + +def test_add_to_package_empty_suffixes_yields_nothing(): + with tempfile.TemporaryDirectory() as tmp: + flow_file = _make_flow(tmp) + m = _build_mutator([], flow_file) + with mock.patch("inspect.getfile", return_value=flow_file): + assert list(m.add_to_package()) == []