Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions metaflow/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,9 +475,18 @@ def start(
# be raised. For resume, since we ignore those options, we ignore the error.
raise ctx.obj.delayed_config_exception

# Initialize the phase early so it can be used in the mutators
# The phase is determined by which CLI subcommand is being invoked (e.g. "run" → LAUNCH,
# "step" → TASK, "batch" → TRAMPOLINE).
system_context._update(phase=_phase_from_cli_args(getattr(ctx, "saved_args", None)))
Comment on lines +478 to +481
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what would this change when you moved it up?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can now use the phase in the external_init of mutators (for example)


# Process config decorators (this is the pre_mutate phase for both flow mutators and
# step mutators -- the mutate is called in init_step_decorators)

# Init all values in the flow mutators and then process them
for decorator in ctx.obj.flow._flow_mutators:
decorator.external_init()

new_cls = ctx.obj.flow._process_config_decorators(config_options)
if new_cls:
ctx.obj.flow = new_cls(use_cli=False)
Expand Down Expand Up @@ -561,13 +570,7 @@ def start(
ctx.obj.monitor.start()
_system_monitor.init_system_monitor(ctx.obj.flow.name, ctx.obj.monitor)

# Populate the system context singleton for this process. The phase is
# determined by which CLI subcommand is being invoked (e.g. "run" → LAUNCH,
# "step" → TASK, "batch" → TRAMPOLINE).
saved_args = getattr(ctx, "saved_args", None)
phase = _phase_from_cli_args(saved_args)
system_context._update(
phase=phase,
flow=ctx.obj.flow,
graph=ctx.obj.graph,
environment=ctx.obj.environment,
Expand Down
22 changes: 21 additions & 1 deletion metaflow/debug.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,33 @@ class Debug(object):
def __init__(self):
import metaflow.metaflow_config as config

have_debug_options = False
for typ in config.DEBUG_OPTIONS:
if getattr(config, "DEBUG_%s" % typ.upper()):
op = partial(self.log, typ)
have_debug_options = True
else:
op = self.noop
# use debug.$type_exec(args) to log command line for subprocesses
# of type $type
setattr(self, "%s_exec" % typ, op)
# use the debug.$type flag to check if logging is enabled for $type
setattr(self, typ, op != self.noop)
# In some environments (I'm looking at you Bazel), the path to the filename is
# super long and not very useful. We will print it once and truncate the rest.
# This is not 100% accurate as each package is in a separate directory so the
# prefix length may be a bit different but it's good enough and removes a lot
# of noise while also keeping the cost low (instead of having to figure out
# the prefix for each package)
self.prefix_len = 0
if have_debug_options:
# Figure out the name of the current file and strip out everything before
#
self.prefix_len = len(inspect.stack()[0][1][: -len("metaflow/debug.py")])
self.log(
"options",
"File prefix is: %s" % inspect.stack()[0][1][: self.prefix_len],
)

def log(self, typ, args):
if is_stringish(args):
Expand All @@ -40,7 +57,10 @@ def log(self, typ, args):
s = " ".join(args)
lineno = inspect.currentframe().f_back.f_lineno
filename = inspect.stack()[1][1]
print("debug[%s %s:%s]: %s" % (typ, filename, lineno, s), file=sys.stderr)
print(
"debug[%s %s:%s]: %s" % (typ, filename[self.prefix_len :], lineno, s),
file=sys.stderr,
)

def __getattr__(self, name):
# Small piece of code to get pyright and other linters to recognize that there
Expand Down
35 changes: 35 additions & 0 deletions metaflow/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,41 @@ def get_top_level_options(self):
"""
return []

def add_to_package(self):
"""
Called to add custom files needed by this flow decorator. This hook will be
called in the `MetaflowPackage` class where metaflow compiles the code package
tarball. This hook can return one of two things (the first is for backwards
compatibility -- move to the second):
- a generator yielding a tuple of `(file_path, arcname)` to add files to
the code package. `file_path` is the path to the file on the local filesystem
and `arcname` is the path relative to the packaged code.
- a generator yielding a tuple of `(content, arcname, type)` where:
- type is one of
ContentType.{USER_CONTENT, CODE_CONTENT, MODULE_CONTENT, OTHER_CONTENT}
- for USER_CONTENT:
- the file will be included relative to the directory containing the
user's flow file.
- content: path to the file to include
Comment thread
greptile-apps[bot] marked this conversation as resolved.
- arcname: path relative to the directory containing the user's flow file
- for CODE_CONTENT:
- the file will be included relative to the code directory in the package.
This will be the directory containing `metaflow`.
- content: path to the file to include
- arcname: path relative to the code directory in the package
- for MODULE_CONTENT:
- the module will be added to the code package as a python module. It will
be accessible as usual (import <module_name>)
- content: name of the module
- arcname: None (ignored)
- for OTHER_CONTENT:
- the file will be included relative to any other configuration/metadata
files for the flow
- content: path to the file to include
- arcname: path relative to the config directory in the package
"""
return []


# compare this to parameters.add_custom_parameters
def add_decorator_options(cmd):
Expand Down
75 changes: 73 additions & 2 deletions metaflow/package/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ def _module_selector(m) -> bool:
self._blob_url = None
self._blob = None

# USER_CONTENT files contributed by decorators/mutators via add_to_package.
# Keyed by arcname (path relative to the flow directory) -> absolute file path.
# These are merged with the files discovered by walking the flow directory;
# duplicates (same arcname) are dropped to avoid conflicts.
self._user_content_from_addl: Dict[str, str] = {}

# Update package in the system context -- it will be available
# in all hooks going forward including the ones called in the
# thread that is used to create the package asynchronously.
Expand Down Expand Up @@ -611,11 +617,33 @@ def _check_tuple(path_tuple):
raise NonUniqueFileNameToFilePathMappingException(
file_name, [deco_module_paths[file_name], file_path]
)
elif file_type == ContentType.USER_CONTENT:
# USER_CONTENT files will be merged with the files discovered by
# walking the flow directory. Track them here so we can:
# 1. Include them in the package even if they live outside the
# flow directory (or are excluded by the user_code_filter).
# 2. Avoid duplicating files already picked up by the walker.
real_path = os.path.realpath(path_tuple[0])
path_tuple = (real_path, file_name, file_type)
existing = self._user_content_from_addl.get(file_name)
if existing is None:
self._user_content_from_addl[file_name] = real_path
elif existing != real_path:
raise NonUniqueFileNameToFilePathMappingException(
file_name, [existing, real_path]
)
else:
return None # Already recorded for this arcname
else:
raise ValueError(f"Unknown file type: {file_type}")
return path_tuple

def _add_tuple(path_tuple):
# USER_CONTENT is intentionally NOT handled here: those files are
# packaged alongside the user's flow code (see _user_code_tuples)
# rather than under the mfcontent namespace, and are tracked in
# self._user_content_from_addl by _check_tuple above. mfcontent
# owns MODULE/CODE/OTHER only.
file_path, file_name, file_type = path_tuple
if file_type == ContentType.MODULE_CONTENT:
# file_path is actually a module
Expand All @@ -625,6 +653,16 @@ def _add_tuple(path_tuple):
elif file_type == ContentType.OTHER_CONTENT:
self._mfcontent.add_other_file(file_path, file_name)

# flow decorators
for decos in self._flow._flow_decorators.values():
for deco in decos:
for path_tuple in deco.add_to_package():
path_tuple = _check_tuple(path_tuple)
if path_tuple is None:
continue
_add_tuple(path_tuple)

# step decorators
for step in self._flow:
for deco in step.decorators:
for path_tuple in deco.add_to_package():
Expand All @@ -640,16 +678,42 @@ def _add_tuple(path_tuple):
continue
_add_tuple(path_tuple)

# flow mutators
for mutator in self._flow._flow_mutators:
for path_tuple in mutator.add_to_package():
path_tuple = _check_tuple(path_tuple)
if path_tuple is None:
continue
_add_tuple(path_tuple)

# step mutators (deduplicated across steps)
seen_step_mutators = set()
for step in self._flow:
for mutator in step.config_decorators:
if id(mutator) in seen_step_mutators:
continue
seen_step_mutators.add(id(mutator))
for path_tuple in mutator.add_to_package():
path_tuple = _check_tuple(path_tuple)
if path_tuple is None:
continue
_add_tuple(path_tuple)

def _user_code_tuples(self):
# Track arcnames yielded by the directory walker so we can detect overlap
# with USER_CONTENT files contributed via add_to_package hooks.
seen_arcnames = set()
if R.use_r():
# the R working directory
self._user_flow_dir = R.working_dir()
for path_tuple in walk(
"%s/" % R.working_dir(), file_filter=self._user_code_filter
):
seen_arcnames.add(path_tuple[1])
yield path_tuple
# the R package
for path_tuple in R.package_paths():
seen_arcnames.add(path_tuple[1])
yield path_tuple
else:
# the user's working directory
Expand All @@ -660,10 +724,17 @@ def _user_code_tuples(self):
file_filter=self._user_code_filter,
exclude_tl_dirs=self._exclude_tl_dirs,
):
# TODO: This is where we will check if the file is already included
# in the mfcontent portion
seen_arcnames.add(path_tuple[1])
yield path_tuple

# Emit USER_CONTENT files contributed by decorators/mutators that were not
# already picked up by the directory walker (either because they live
# outside the flow directory or were filtered out by the suffix/user filter).
for arcname, file_path in self._user_content_from_addl.items():
if arcname in seen_arcnames:
continue
yield (file_path, arcname)

def _make(self):
backend = self._backend()
with backend.create() as archive:
Expand Down
69 changes: 69 additions & 0 deletions metaflow/plugins/package_suffixes_mutator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""
Example FlowMutator that extends the set of file suffixes included in the code
package — mirrors the ``--package-suffixes`` CLI option as a decorator.

By default Metaflow's packaging walks the flow directory and includes files
whose suffixes are in ``DEFAULT_PACKAGE_SUFFIXES`` (``.py,.R,.RDS``) plus
whatever was passed via ``--package-suffixes``. This mutator lets a flow
author declare additional suffixes directly on the flow class::

from metaflow import FlowSpec, step
from metaflow.plugins.package_suffixes_mutator import package_suffixes

@package_suffixes([".yaml", ".json"])
class MyFlow(FlowSpec):
@step
def start(self):
...

The mutator walks the flow directory itself and yields every file with a
matching suffix as ``USER_CONTENT``. The packaging layer deduplicates against
the files that the default walker already picked up, so files that are already
included (e.g. ``.py`` files) are not added twice.
"""

import inspect
import os
from typing import List, Union

from metaflow.packaging_sys import ContentType
from metaflow.packaging_sys.utils import walk
from metaflow.user_decorators.user_flow_decorator import FlowMutator


class package_suffixes(FlowMutator):
"""Include additional file suffixes in the code package.

Parameters
----------
suffixes : list of str or comma-separated str
Additional file suffixes to include (e.g. ``[".yaml", ".json"]`` or
``".yaml,.json"``). Leading dots are optional; a suffix without a
leading dot is treated as an extension (``yaml`` → ``.yaml``).
"""

def init(self, suffixes: Union[List[str], str]):
if isinstance(suffixes, str):
suffixes = [s.strip() for s in suffixes.split(",") if s.strip()]
self._suffixes = tuple(
(s if s.startswith(".") else "." + s).lower() for s in suffixes
)

def add_to_package(self):
if not self._suffixes:
return

try:
flow_file = inspect.getfile(self._flow_cls)
except (TypeError, OSError):
return
flow_dir = os.path.dirname(os.path.abspath(flow_file))
if not flow_dir or not os.path.isdir(flow_dir):
return

def _filter(fname: str) -> bool:
lname = fname.lower()
return any(lname.endswith(sfx) for sfx in self._suffixes)

for file_path, arcname in walk(flow_dir + os.sep, file_filter=_filter):
yield (file_path, arcname, ContentType.USER_CONTENT)
35 changes: 35 additions & 0 deletions metaflow/user_decorators/user_flow_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,3 +310,38 @@ def mutate(
A representation of this flow
"""
return None

def add_to_package(self):
"""
Called to add custom files needed by this flow mutator. This hook will be
called in the `MetaflowPackage` class where metaflow compiles the code package
tarball. This hook can return one of two things (the first is for backwards
compatibility -- generally use the second when you implement your mutator):
- a generator yielding a tuple of `(file_path, arcname)` to add files to
the code package. `file_path` is the path to the file on the local filesystem
and `arcname` is the path relative to the packaged code.
- a generator yielding a tuple of `(content, arcname, type)` where:
- type is one of
ContentType.{USER_CONTENT, CODE_CONTENT, MODULE_CONTENT, OTHER_CONTENT}
- for USER_CONTENT:
- the file will be included relative to the directory containing the
user's flow file.
- content: path to the file to include
- arcname: path relative to the directory containing the user's flow file
- for CODE_CONTENT:
- the file will be included relative to the code directory in the package.
This will be the directory containing `metaflow`.
- content: path to the file to include
- arcname: path relative to the code directory in the package
- for MODULE_CONTENT:
- the module will be added to the code package as a python module. It will
be accessible as usual (import <module_name>)
- content: name of the module
- arcname: None (ignored)
- for OTHER_CONTENT:
- the file will be included relative to any other configuration/metadata
files for the flow
- content: path to the file to include
- arcname: path relative to the config directory in the package
"""
return []
35 changes: 35 additions & 0 deletions metaflow/user_decorators/user_step_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -788,3 +788,38 @@ def mutate(
A representation of this step
"""
return None

def add_to_package(self):
"""
Called to add custom files needed by this step mutator. This hook will be
called in the `MetaflowPackage` class where metaflow compiles the code package
tarball. This hook can return one of two things (the first is for backwards
compatibility -- move to the second):
- a generator yielding a tuple of `(file_path, arcname)` to add files to
the code package. `file_path` is the path to the file on the local filesystem
and `arcname` is the path relative to the packaged code.
- a generator yielding a tuple of `(content, arcname, type)` where:
- type is one of
ContentType.{USER_CONTENT, CODE_CONTENT, MODULE_CONTENT, OTHER_CONTENT}
- for USER_CONTENT:
- the file will be included relative to the directory containing the
user's flow file.
- content: path to the file to include
- arcname: path relative to the directory containing the user's flow file
- for CODE_CONTENT:
- the file will be included relative to the code directory in the package.
This will be the directory containing `metaflow`.
- content: path to the file to include
- arcname: path relative to the code directory in the package
- for MODULE_CONTENT:
- the module will be added to the code package as a python module. It will
be accessible as usual (import <module_name>)
- content: name of the module
- arcname: None (ignored)
- for OTHER_CONTENT:
- the file will be included relative to any other configuration/metadata
files for the flow
- content: path to the file to include
- arcname: path relative to the config directory in the package
"""
return []
Loading
Loading