From 15e6e31fa17b3cd4b1a1a3b23e9be176cd7fd29b Mon Sep 17 00:00:00 2001 From: Romain Cledat Date: Wed, 5 Feb 2025 01:40:01 -0800 Subject: [PATCH] WIP: Add the ability to package more libraries --- metaflow/client/core.py | 8 +- metaflow/extension_support/__init__.py | 2 +- metaflow/extension_support/_empty_file.py | 4 +- metaflow/info_file.py | 25 - metaflow/metaflow_config.py | 10 +- metaflow/metaflow_version.py | 2 +- metaflow/package.py | 203 ------- metaflow/package/__init__.py | 157 ++++++ metaflow/package/mfenv.py | 512 ++++++++++++++++++ metaflow/plugins/package_cli.py | 14 +- metaflow/plugins/pypi/conda_decorator.py | 15 +- metaflow/special_files.py | 41 ++ metaflow/user_configs/config_options.py | 5 +- metaflow/user_configs/config_parameters.py | 4 - metaflow/util.py | 5 +- test/test_config/basic_config_silly.txt | 1 + test/test_config/card_config.py | 21 + test/test_config/config2.json | 4 + test/test_config/config_card.py | 30 + test/test_config/config_parser.py | 103 ++++ .../config_parser_requirements.txt | 2 + test/test_config/config_simple.json | 1 + test/test_config/config_simple.py | 98 ++++ test/test_config/config_simple2.py | 60 ++ test/test_config/helloconfig.py | 138 +++++ test/test_config/mutable_flow.py | 250 +++++++++ test/test_config/no_default.py | 18 + test/test_config/photo_config.json | 8 + test/test_config/runner_flow.py | 17 + test/test_config/test.py | 122 +++++ 30 files changed, 1627 insertions(+), 253 deletions(-) delete mode 100644 metaflow/info_file.py delete mode 100644 metaflow/package.py create mode 100644 metaflow/package/__init__.py create mode 100644 metaflow/package/mfenv.py create mode 100644 metaflow/special_files.py create mode 100644 test/test_config/basic_config_silly.txt create mode 100644 test/test_config/card_config.py create mode 100644 test/test_config/config2.json create mode 100644 test/test_config/config_card.py create mode 100644 test/test_config/config_parser.py create mode 100644 test/test_config/config_parser_requirements.txt create mode 100644 test/test_config/config_simple.json create mode 100644 test/test_config/config_simple.py create mode 100644 test/test_config/config_simple2.py create mode 100644 test/test_config/helloconfig.py create mode 100644 test/test_config/mutable_flow.py create mode 100644 test/test_config/no_default.py create mode 100644 test/test_config/photo_config.json create mode 100644 test/test_config/runner_flow.py create mode 100644 test/test_config/test.py diff --git a/metaflow/client/core.py b/metaflow/client/core.py index 87b6a88c37c..a317bf7b5b5 100644 --- a/metaflow/client/core.py +++ b/metaflow/client/core.py @@ -31,11 +31,12 @@ from metaflow.includefile import IncludedFile from metaflow.metaflow_config import DEFAULT_METADATA, MAX_ATTEMPTS from metaflow.metaflow_environment import MetaflowEnvironment +from metaflow.package.mfenv import MFEnv from metaflow.plugins import ENVIRONMENTS, METADATA_PROVIDERS +from metaflow.special_files import SpecialFile from metaflow.unbounded_foreach import CONTROL_TASK_TAG from metaflow.util import cached_property, is_stringish, resolve_identity, to_unicode -from ..info_file import INFO_FILE from .filecache import FileCache if TYPE_CHECKING: @@ -823,10 +824,7 @@ def __init__(self, flow_name: str, code_package: str): ) code_obj = BytesIO(blobdata) self._tar = tarfile.open(fileobj=code_obj, mode="r:gz") - # The JSON module in Python3 deals with Unicode. Tar gives bytes. - info_str = ( - self._tar.extractfile(os.path.basename(INFO_FILE)).read().decode("utf-8") - ) + info_str = MFEnv.get_archive_content(self._tar, SpecialFile.INFO_FILE) self._info = json.loads(info_str) self._flowspec = self._tar.extractfile(self._info["script"]).read() diff --git a/metaflow/extension_support/__init__.py b/metaflow/extension_support/__init__.py index 0cc9e00afae..5de8de66929 100644 --- a/metaflow/extension_support/__init__.py +++ b/metaflow/extension_support/__init__.py @@ -12,7 +12,7 @@ from itertools import chain from pathlib import Path -from metaflow.info_file import read_info_file +from metaflow.special_files import read_info_file # diff --git a/metaflow/extension_support/_empty_file.py b/metaflow/extension_support/_empty_file.py index d59e1556ddb..dbdcba34c17 100644 --- a/metaflow/extension_support/_empty_file.py +++ b/metaflow/extension_support/_empty_file.py @@ -1,2 +1,2 @@ -# This file serves as a __init__.py for metaflow_extensions when it is packaged -# and needs to remain empty. +# This file serves as a __init__.py for metaflow_extensions or metaflow +# packages when they are packaged and needs to remain empty. diff --git a/metaflow/info_file.py b/metaflow/info_file.py deleted file mode 100644 index 6d56a6152ba..00000000000 --- a/metaflow/info_file.py +++ /dev/null @@ -1,25 +0,0 @@ -import json - -from os import path - -CURRENT_DIRECTORY = path.dirname(path.abspath(__file__)) -INFO_FILE = path.join(path.dirname(CURRENT_DIRECTORY), "INFO") - -_info_file_content = None -_info_file_present = None - - -def read_info_file(): - global _info_file_content - global _info_file_present - if _info_file_present is None: - _info_file_present = path.exists(INFO_FILE) - if _info_file_present: - try: - with open(INFO_FILE, "r", encoding="utf-8") as contents: - _info_file_content = json.load(contents) - except IOError: - pass - if _info_file_present: - return _info_file_content - return None diff --git a/metaflow/metaflow_config.py b/metaflow/metaflow_config.py index e99d4f9a001..c42253c7c8e 100644 --- a/metaflow/metaflow_config.py +++ b/metaflow/metaflow_config.py @@ -445,7 +445,15 @@ ### # Debug configuration ### -DEBUG_OPTIONS = ["subcommand", "sidecar", "s3client", "tracing", "stubgen", "userconf"] +DEBUG_OPTIONS = [ + "subcommand", + "sidecar", + "s3client", + "tracing", + "stubgen", + "userconf", + "package", +] for typ in DEBUG_OPTIONS: vars()["DEBUG_%s" % typ.upper()] = from_conf("DEBUG_%s" % typ.upper(), False) diff --git a/metaflow/metaflow_version.py b/metaflow/metaflow_version.py index e3be8ed7956..92dab210bdf 100644 --- a/metaflow/metaflow_version.py +++ b/metaflow/metaflow_version.py @@ -11,7 +11,7 @@ from os import path, name, environ, listdir from metaflow.extension_support import update_package_info -from metaflow.info_file import CURRENT_DIRECTORY, read_info_file +from metaflow.special_files import read_info_file # True/False correspond to the value `public`` in get_version diff --git a/metaflow/package.py b/metaflow/package.py deleted file mode 100644 index 1385883d5a7..00000000000 --- a/metaflow/package.py +++ /dev/null @@ -1,203 +0,0 @@ -import importlib -import os -import sys -import tarfile -import time -import json -from io import BytesIO - -from .user_configs.config_parameters import CONFIG_FILE, dump_config_values -from .extension_support import EXT_PKG, package_mfext_all -from .metaflow_config import DEFAULT_PACKAGE_SUFFIXES -from .exception import MetaflowException -from .util import to_unicode -from . import R -from .info_file import INFO_FILE - -DEFAULT_SUFFIXES_LIST = DEFAULT_PACKAGE_SUFFIXES.split(",") -METAFLOW_SUFFIXES_LIST = [".py", ".html", ".css", ".js"] - - -class NonUniqueFileNameToFilePathMappingException(MetaflowException): - headline = "Non Unique file path for a file name included in code package" - - def __init__(self, filename, file_paths, lineno=None): - msg = ( - "Filename %s included in the code package includes multiple different paths for the same name : %s.\n" - "The `filename` in the `add_to_package` decorator hook requires a unique `file_path` to `file_name` mapping" - % (filename, ", ".join(file_paths)) - ) - super().__init__(msg=msg, lineno=lineno) - - -# this is os.walk(follow_symlinks=True) with cycle detection -def walk_without_cycles(top_root): - seen = set() - - def _recurse(root): - for parent, dirs, files in os.walk(root): - for d in dirs: - path = os.path.join(parent, d) - if os.path.islink(path): - # Breaking loops: never follow the same symlink twice - # - # NOTE: this also means that links to sibling links are - # not followed. In this case: - # - # x -> y - # y -> oo - # oo/real_file - # - # real_file is only included twice, not three times - reallink = os.path.realpath(path) - if reallink not in seen: - seen.add(reallink) - for x in _recurse(path): - yield x - yield parent, files - - for x in _recurse(top_root): - yield x - - -class MetaflowPackage(object): - def __init__(self, flow, environment, echo, suffixes=DEFAULT_SUFFIXES_LIST): - self.suffixes = list(set().union(suffixes, DEFAULT_SUFFIXES_LIST)) - self.environment = environment - self.metaflow_root = os.path.dirname(__file__) - - self.flow_name = flow.name - self._flow = flow - self.create_time = time.time() - environment.init_environment(echo) - for step in flow: - for deco in step.decorators: - deco.package_init(flow, step.__name__, environment) - self.blob = self._make() - - def _walk(self, root, exclude_hidden=True, suffixes=None): - if suffixes is None: - suffixes = [] - root = to_unicode(root) # handle files/folder with non ascii chars - prefixlen = len("%s/" % os.path.dirname(root)) - for ( - path, - files, - ) in walk_without_cycles(root): - if exclude_hidden and "/." in path: - continue - # path = path[2:] # strip the ./ prefix - # if path and (path[0] == '.' or './' in path): - # continue - for fname in files: - if (fname[0] == "." and fname in suffixes) or ( - fname[0] != "." - and any(fname.endswith(suffix) for suffix in suffixes) - ): - p = os.path.join(path, fname) - yield p, p[prefixlen:] - - def path_tuples(self): - """ - Returns list of (path, arcname) to be added to the job package, where - `arcname` is the alternative name for the file in the package. - """ - # We want the following contents in the tarball - # Metaflow package itself - for path_tuple in self._walk( - self.metaflow_root, exclude_hidden=False, suffixes=METAFLOW_SUFFIXES_LIST - ): - yield path_tuple - - # Metaflow extensions; for now, we package *all* extensions but this may change - # at a later date; it is possible to call `package_mfext_package` instead of - # `package_mfext_all` but in that case, make sure to also add a - # metaflow_extensions/__init__.py file to properly "close" the metaflow_extensions - # package and prevent other extensions from being loaded that may be - # present in the rest of the system - for path_tuple in package_mfext_all(): - yield path_tuple - - # Any custom packages exposed via decorators - deco_module_paths = {} - for step in self._flow: - for deco in step.decorators: - for path_tuple in deco.add_to_package(): - file_path, file_name = path_tuple - # Check if the path is not duplicated as - # many steps can have the same packages being imported - if file_name not in deco_module_paths: - deco_module_paths[file_name] = file_path - yield path_tuple - elif deco_module_paths[file_name] != file_path: - raise NonUniqueFileNameToFilePathMappingException( - file_name, [deco_module_paths[file_name], file_path] - ) - - # the package folders for environment - for path_tuple in self.environment.add_to_package(): - yield path_tuple - if R.use_r(): - # the R working directory - for path_tuple in self._walk( - "%s/" % R.working_dir(), suffixes=self.suffixes - ): - yield path_tuple - # the R package - for path_tuple in R.package_paths(): - yield path_tuple - else: - # the user's working directory - flowdir = os.path.dirname(os.path.abspath(sys.argv[0])) + "/" - for path_tuple in self._walk(flowdir, suffixes=self.suffixes): - yield path_tuple - - def _add_configs(self, tar): - buf = BytesIO() - buf.write(json.dumps(dump_config_values(self._flow)).encode("utf-8")) - self._add_file(tar, os.path.basename(CONFIG_FILE), buf) - - def _add_info(self, tar): - buf = BytesIO() - buf.write( - json.dumps( - self.environment.get_environment_info(include_ext_info=True) - ).encode("utf-8") - ) - self._add_file(tar, os.path.basename(INFO_FILE), buf) - - @staticmethod - def _add_file(tar, filename, buf): - info = tarfile.TarInfo(filename) - buf.seek(0) - info.size = len(buf.getvalue()) - # Setting this default to Dec 3, 2019 - info.mtime = 1575360000 - tar.addfile(info, buf) - - def _make(self): - def no_mtime(tarinfo): - # a modification time change should not change the hash of - # the package. Only content modifications will. - # Setting this default to Dec 3, 2019 - tarinfo.mtime = 1575360000 - return tarinfo - - buf = BytesIO() - with tarfile.open( - fileobj=buf, mode="w:gz", compresslevel=3, dereference=True - ) as tar: - self._add_info(tar) - self._add_configs(tar) - for path, arcname in self.path_tuples(): - tar.add(path, arcname=arcname, recursive=False, filter=no_mtime) - - blob = bytearray(buf.getvalue()) - blob[4:8] = [0] * 4 # Reset 4 bytes from offset 4 to account for ts - return blob - - def __str__(self): - return "" % ( - self.flow_name, - time.strftime("%a, %d %b %Y %H:%M:%S", self.create_time), - ) diff --git a/metaflow/package/__init__.py b/metaflow/package/__init__.py new file mode 100644 index 00000000000..74c763bd536 --- /dev/null +++ b/metaflow/package/__init__.py @@ -0,0 +1,157 @@ +import os +import sys +import tarfile +import time +import json +from io import BytesIO + + +from ..metaflow_config import DEFAULT_PACKAGE_SUFFIXES +from ..exception import MetaflowException +from ..special_files import SpecialFile +from ..user_configs.config_parameters import dump_config_values +from .. import R + +from .mfenv import MFEnv + +DEFAULT_SUFFIXES_LIST = DEFAULT_PACKAGE_SUFFIXES.split(",") + + +class NonUniqueFileNameToFilePathMappingException(MetaflowException): + headline = "Non-unique file path for a file name included in code package" + + def __init__(self, filename, file_paths, lineno=None): + msg = ( + "Filename %s included in the code package includes multiple different " + "paths for the same name : %s.\n" + "The `filename` in the `add_to_package` decorator hook requires a unique " + "`file_path` to `file_name` mapping" % (filename, ", ".join(file_paths)) + ) + super().__init__(msg=msg, lineno=lineno) + + +class MetaflowPackage(object): + def __init__(self, flow, environment, echo, suffixes=DEFAULT_SUFFIXES_LIST): + self.suffixes = list(set().union(suffixes, DEFAULT_SUFFIXES_LIST)) + self.environment = environment + self.metaflow_root = os.path.dirname(__file__) + + self.flow_name = flow.name + self._flow = flow + self.create_time = time.time() + environment.init_environment(echo) + for step in flow: + for deco in step.decorators: + deco.package_init(flow, step.__name__, environment) + + self._code_env = MFEnv(lambda x: hasattr(x, "METAFLOW_PACKAGE")) + + # Add special content + self._code_env.add_special_content( + SpecialFile.INFO_FILE, + json.dumps( + self.environment.get_environment_info(include_ext_info=True) + ).encode("utf-8"), + ) + + self._code_env.add_special_content( + SpecialFile.CONFIG_FILE, + json.dumps(dump_config_values(self._flow)).encode("utf-8"), + ) + + # Add user files (from decorators) -- we add these to the code environment + self._code_env.add_files(self._addl_files()) + + self.blob = self._make() + + def path_tuples(self): + # Package the environment + for path, arcname in self._code_env.files(): + yield path, arcname + for _, arcname in self._code_env.contents(): + yield f"{arcname}", arcname + + # Package the user code + for path, arcname in self._user_code_tuples(): + yield path, arcname + + def _addl_files(self): + # Look at all decorators that provide additional files + deco_module_paths = {} + for step in self._flow: + for deco in step.decorators: + for path_tuple in deco.add_to_package(): + file_path, file_name = path_tuple + # Check if the path is not duplicated as + # many steps can have the same packages being imported + if file_name not in deco_module_paths: + deco_module_paths[file_name] = file_path + yield path_tuple + elif deco_module_paths[file_name] != file_path: + raise NonUniqueFileNameToFilePathMappingException( + file_name, [deco_module_paths[file_name], file_path] + ) + + # the package folders for environment + for path_tuple in self.environment.add_to_package(): + yield path_tuple + + def _user_code_tuples(self): + if R.use_r(): + # the R working directory + for path_tuple in MFEnv.walk( + "%s/" % R.working_dir(), suffixes=self.suffixes + ): + yield path_tuple + # the R package + for path_tuple in R.package_paths(): + yield path_tuple + else: + # the user's working directory + flowdir = os.path.dirname(os.path.abspath(sys.argv[0])) + "/" + for path_tuple in MFEnv.walk(flowdir, suffixes=self.suffixes): + # TODO: This is where we will check if the file is already included + # in the mfenv portion using path_in_archive. If it is, we just need to + # include a symlink. + yield path_tuple + + @staticmethod + def _add_file(tar, filename, buf): + info = tarfile.TarInfo(filename) + buf.seek(0) + info.size = len(buf.getvalue()) + # Setting this default to Dec 3, 2019 + info.mtime = 1575360000 + tar.addfile(info, buf) + + def _make(self): + def no_mtime(tarinfo): + # a modification time change should not change the hash of + # the package. Only content modifications will. + # Setting this default to Dec 3, 2019 + tarinfo.mtime = 1575360000 + return tarinfo + + buf = BytesIO() + with tarfile.open( + fileobj=buf, mode="w:gz", compresslevel=3, dereference=True + ) as tar: + # Package the environment + for path, arcname in self._code_env.files(): + tar.add(path, arcname=arcname, recursive=False, filter=no_mtime) + for content, arcname in self._code_env.contents(): + self._add_file(tar, arcname, BytesIO(content)) + + # Package the user code + for path, arcname in self._user_code_tuples(): + tar.add(path, arcname=arcname, recursive=False, filter=no_mtime) + + blob = bytearray(buf.getvalue()) + blob[4:8] = [0] * 4 # Reset 4 bytes from offset 4 to account for ts + return blob + + def __str__(self): + return "" % ( + self.flow_name, + time.strftime("%a, %d %b %Y %H:%M:%S", self.create_time), + ) diff --git a/metaflow/package/mfenv.py b/metaflow/package/mfenv.py new file mode 100644 index 00000000000..4e08517302c --- /dev/null +++ b/metaflow/package/mfenv.py @@ -0,0 +1,512 @@ +import inspect +import os +import sys +import tarfile + +from collections import defaultdict +from dataclasses import dataclass +from pathlib import Path +from typing import ( + Callable, + Dict, + Generator, + Iterator, + List, + Mapping, + Optional, + Set, + Tuple, + TYPE_CHECKING, + Union, +) + +from types import ModuleType + + +from ..debug import debug +from ..extension_support import EXT_EXCLUDE_SUFFIXES, metadata, package_mfext_all + +from ..special_files import MFENV_DIR, SpecialFile +from ..util import get_metaflow_root, to_unicode + +packages_distributions = None + +if sys.version_info[:2] >= (3, 10): + packages_distributions = metadata.packages_distributions +else: + # This is the code present in 3.10+ -- we replicate here for other versions + def _packages_distributions() -> Mapping[str, List[str]]: + """ + Return a mapping of top-level packages to their + distributions. + """ + pkg_to_dist = defaultdict(list) + for dist in metadata.distributions(): + for pkg in _top_level_declared(dist) or _top_level_inferred(dist): + pkg_to_dist[pkg].append(dist.metadata["Name"]) + return dict(pkg_to_dist) + + def _top_level_declared(dist: metadata.Distribution) -> List[str]: + return (dist.read_text("top_level.txt") or "").split() + + def _topmost(name: "pathlib.PurePosixPath") -> Optional[str]: + """ + Return the top-most parent as long as there is a parent. + """ + top, *rest = name.parts + return top if rest else None + + def _get_toplevel_name(name: "pathlib.PurePosixPath") -> str: + return _topmost(name) or ( + # python/typeshed#10328 + inspect.getmodulename(name) # type: ignore + or str(name) + ) + + def _top_level_inferred(dist: "metadata.Distribution"): + opt_names = set(map(_get_toplevel_name, dist.files or [])) + + def importable_name(name): + return "." not in name + + return filter(importable_name, opt_names) + + packages_distributions = _packages_distributions + + +if TYPE_CHECKING: + import pathlib + + +_cached_distributions = None + + +def modules_to_distributions() -> Dict[str, List[metadata.Distribution]]: + """ + Return a mapping of top-level modules to their distributions. + + Returns + ------- + Dict[str, List[metadata.Distribution]] + A mapping of top-level modules to their distributions. + """ + global _cached_distributions + if _cached_distributions is None: + _cached_distributions = { + k: [metadata.distribution(d) for d in v] + for k, v in packages_distributions().items() + } + return _cached_distributions + + +@dataclass +class _ModuleInfo: + name: str + root_paths: Set[str] + module: ModuleType + + +class MFEnv: + + METAFLOW_SUFFIXES_LIST = [".py", ".html", ".css", ".js"] + + # this is os.walk(follow_symlinks=True) with cycle detection + @classmethod + def walk_without_cycles( + cls, + top_root: str, + ) -> Generator[Tuple[str, List[str]], None, None]: + seen = set() + + def _recurse(root): + for parent, dirs, files in os.walk(root): + for d in dirs: + path = os.path.join(parent, d) + if os.path.islink(path): + # Breaking loops: never follow the same symlink twice + # + # NOTE: this also means that links to sibling links are + # not followed. In this case: + # + # x -> y + # y -> oo + # oo/real_file + # + # real_file is only included twice, not three times + reallink = os.path.realpath(path) + if reallink not in seen: + seen.add(reallink) + for x in _recurse(path): + yield x + yield parent, files + + for x in _recurse(top_root): + yield x + + @classmethod + def walk( + cls, + root: str, + exclude_hidden: bool = True, + suffixes: Optional[List[str]] = None, + ) -> Generator[Tuple[str, str], None, None]: + if suffixes is None: + suffixes = [] + root = to_unicode(root) # handle files/folder with non ascii chars + prefixlen = len("%s/" % os.path.dirname(root)) + for ( + path, + files, + ) in cls.walk_without_cycles(root): + if exclude_hidden and "/." in path: + continue + # path = path[2:] # strip the ./ prefix + # if path and (path[0] == '.' or './' in path): + # continue + for fname in files: + if (fname[0] == "." and fname in suffixes) or ( + fname[0] != "." + and any(fname.endswith(suffix) for suffix in suffixes) + ): + p = os.path.join(path, fname) + yield p, p[prefixlen:] + + @classmethod + def get_filename(cls, name: Union[SpecialFile, str]) -> Optional[str]: + # In all cases, the special files are siblings of the metaflow root + # directory. + if isinstance(name, SpecialFile): + r = get_metaflow_root() + path_to_file = os.path.join(r, name.value) + else: + path_to_file = os.path.join(MFENV_DIR, name) + if os.path.isfile(path_to_file): + return path_to_file + return None + + @classmethod + def get_content(cls, name: Union[SpecialFile, str]) -> Optional[str]: + file_to_read = cls.get_filename(name) + if file_to_read: + with open(file_to_read, "r", encoding="utf-8") as f: + return f.read() + return None + + @classmethod + def get_archive_filename( + cls, archive: tarfile.TarFile, name: Union[SpecialFile, str] + ) -> Optional[str]: + # Backward compatible way of accessing all special files. Prior to MFEnv, they + # were stored at the TL of the archive. + real_name = name.value if isinstance(name, SpecialFile) else name + if archive.getmember(MFENV_DIR): + file_path = os.path.join(MFENV_DIR, real_name) + else: + file_path = real_name + if archive.getmember(file_path): + return file_path + return None + + @classmethod + def get_archive_content( + cls, archive: tarfile.TarFile, name: Union[SpecialFile, str] + ) -> Optional[str]: + file_to_read = cls.get_archive_filename(archive, name) + if file_to_read: + with archive.extractfile(file_to_read) as f: + return f.read().decode("utf-8") + return None + + def __init__(self, criteria: Callable[[ModuleType], bool]) -> None: + # Look at top-level modules that are present when MFEnv is initialized + modules = filter(lambda x: "." not in x[0], sys.modules.items()) + + # Determine the version of Metaflow that we are part of + self._metaflow_root = get_metaflow_root() + + self._modules = { + name: _ModuleInfo( + name, + set(Path(p).resolve().as_posix() for p in getattr(mod, "__path__", [])), + mod, + ) + for name, mod in dict(modules).items() + } # type: Dict[str, Set[str]] + + # Filter the modules + self._modules = { + name: info for name, info in self._modules.items() if criteria(info.module) + } + + # Contain metadata information regarding the distributions packaged. + # This allows Metaflow to "fake" distribution information when packaged + self._metainfo = {} # type: Dict[str, Dict[str, str]] + + # Maps an absolute path on the filesystem to the path of the file in the + # archive. + self._files = {} # type: Dict[str, str] + + self._content = {} # type: Dict[SpecialFile, bytes] + + debug.package_exec(f"Used system modules found: {str(self._modules)}") + + # Populate with files from the third party modules + for k, v in self._modules.items(): + self._files.update(self._module_files(k, v.root_paths)) + + # We include Metaflow as well + self._files.update(self._metaflow_distribution_files()) + + # Include extensions as well + self._files.update(self._metaflow_extension_files()) + + @property + def root_dir(self): + return MFENV_DIR + + def add_special_content(self, name: SpecialFile, content: bytes) -> None: + """ + Add a special file to the MF environment. + + This file will be included in the resulting code package in `MFENV_DIR`. + + Parameters + ---------- + name : SpecialFile + The special file to add to the MF environment + content : bytes + The content of the special file + """ + debug.package_exec(f"Adding special content {name.value} to the MF environment") + self._content[name] = content + + def add_module(self, module: ModuleType) -> None: + """ + Add a module to the MF environment. + + This module will be included in the resulting code package in `MFENV_DIR`. + + Parameters + ---------- + module : ModuleType + The module to include in the MF environment + """ + name = module.__name__ + debug.package_exec(f"Adding module {name} to the MF environment") + # If the module is a single file, we handle this here by looking at __file__ + # which will point to the single file. If it is an actual module, __path__ + # will contain the path(s) to the module + self._modules[name] = _ModuleInfo( + name, + set( + Path(p).resolve().as_posix() + for p in getattr(module, __path__, module.__file__) + ), + module, + ) + self._files.update(self._module_files(name, self._modules[name].root_paths)) + + def add_directory( + self, + directory: str, + criteria: Callable[[str], bool], + ) -> None: + """ + Add a directory to the MF environment. + + This directory will be included in the resulting code package in `MFENV_DIR`. + You can optionally specify a criteria function that takes a file path and + returns a boolean indicating whether or not the file should be included in the + code package. + + At runtime, the content of the directory will be accessible through the usual + PYTHONPATH mechanism but also through `current.envdir`. + + Parameters + ---------- + directory : str + The directory to include in the MF environment + criteria : Callable[[str], bool] + A function that takes a file path and returns a boolean indicating whether or + not the file should be included in the code package + """ + name = os.path.basename(directory) + debug.package_exec(f"Adding directory {directory} to the MF environment") + for root, _, files in os.walk(directory): + for file in files: + if any(file.endswith(x) for x in EXT_EXCLUDE_SUFFIXES): + continue + path = os.path.join(root, file) + relpath = os.path.relpath(path, directory) + path = os.path.realpath(path) + if criteria(path): + self._files[path] = os.path.join(name, relpath) + + def add_files(self, files: Iterator[Tuple[str, str]]) -> None: + """ + Add a list of files to the MF environment. + + These files will be included in the resulting code package in `MFENV_DIR`. + + + Parameters + ---------- + files : Iterator[Tuple[str, str]] + A list of files to include in the MF environment. The first element of the + tuple is the path to the file in the filesystem; the second element is the + path in the archive. + """ + for file, arcname in files: + debug.package_exec(f"Adding file {file} as {arcname} to the MF environment") + self._files[os.path.realpath(file)] = os.path.join(MFENV_DIR, arcname) + + def path_in_archive(self, path: str) -> Optional[str]: + """ + Return the path of the file in the code package if it is included through + add_directory or add_files. + + Note that we will use realpath to determine if two paths are equal. + This includes all files included as part of third party libraries as well as + anything that was added as part of `add_files` and `add_directory`. + + Parameters + ---------- + path : str + The path of the file on the filesystem + + Returns + ------- + Optional[str] + The path of the file in the code package or None if the file is not included + """ + return self._files.get(os.path.realpath(path)) + + def files(self) -> Generator[Tuple[str, str], None, None]: + """ + Return a generator of all files included in the MF environment. + + Returns + ------- + Generator[Tuple[str, str], None, None] + A generator of all files included in the MF environment. The first element of + the tuple is the path to the file in the filesystem; the second element is the + path in the archive. + """ + return self._files.items() + + def contents(self) -> Generator[Tuple[bytes, str], None, None]: + """ + Return a generator of all special files included in the MF environment. + + Returns + ------- + Generator[Tuple[bytes, str], None, None] + A generator of all special files included in the MF environment. The first + element of the tuple is the content to add; the second element is path in the + archive. + """ + for name, content in self._content.items(): + yield content, os.path.join(MFENV_DIR, name.value) + + def _module_files( + self, name: str, paths: Set[str] + ) -> Generator[Tuple[str, str], None, None]: + debug.package_exec( + f" Looking for distributions for module {name} in {paths}" + ) + paths = set(paths) # Do not modify external paths + has_init = False + distributions = modules_to_distributions().get(name) + prefix = f"{name}/" + init_file = f"{prefix}__init__.py" + + seen_distributions = set() + if distributions: + for dist in distributions: + dist_name = dist.metadata["Name"] # dist.name not always present + if dist_name in seen_distributions: + continue + # For some reason, sometimes the same distribution appears twice. We + # don't need to process twice. + seen_distributions.add(dist_name) + debug.package_exec( + f" Including distribution {dist_name} for module {name}" + ) + dist_root = dist.locate_file(name) + if dist_root not in paths: + # This is an error because it means that this distribution is + # not contributing to the module. + raise RuntimeError( + f"Distribution '{dist.metadata['Name']}' is not " + "contributing to module '{name}' as expected." + ) + paths.discard(dist_root) + if dist_name not in self._metainfo: + # Possible that a distribution contributes to multiple modules + self._metainfo[dist_name] = { + # We can add more if needed but these are likely the most + # useful (captures, name, version, etc and files which can + # be used to find non-python files in the distribution). + "METADATA": dist.read_text("METADATA"), + "RECORD": dist.read_text("RECORD"), + } + for file in dist.files or []: + # Skip files that do not belong to this module (distribution may + # provide multiple modules) + if not file.startswith(prefix): + continue + if file == init_file: + has_init = True + yield str(dist.locate(file).resolve().as_posix()), os.path.join( + MFENV_DIR, str(file) + ) + + # Now if there are more paths left in paths, it means there is a non-distribution + # component to this package which we also include. + debug.package_exec( + f" Looking for non-distribution files for module {name} in {paths}" + ) + for path in paths: + if not Path(path).is_dir(): + # Single file for the module -- this will be something like .py + yield path, os.path.join(MFENV_DIR, os.path.basename(path)) + else: + for root, _, files in os.walk(path): + for file in files: + if any(file.endswith(x) for x in EXT_EXCLUDE_SUFFIXES): + continue + rel_path = os.path.relpath(os.path.join(root, file), path) + if rel_path == "__init__.py": + has_init = True + yield os.path.join(root, file), os.path.join( + MFENV_DIR, + name, + rel_path, + ) + # We now include an empty __init__.py file to close the module and prevent + # leaks from possible namespace packages + if not has_init: + yield os.path.join( + self._metaflow_root, "metaflow", "extension_support", "_empty_file.py" + ), os.path.join(MFENV_DIR, name, "__init__.py") + + def _metaflow_distribution_files(self) -> Generator[Tuple[str, str], None, None]: + debug.package_exec( + f" Including Metaflow from {self._metaflow_root} to the MF Environment" + ) + for path_tuple in self.walk( + os.path.join(self._metaflow_root, "metaflow"), + exclude_hidden=False, + suffixes=self.METAFLOW_SUFFIXES_LIST, + ): + yield path_tuple[0], os.path.join(MFENV_DIR, path_tuple[1]) + + def _metaflow_extension_files(self) -> Generator[Tuple[str, str], None, None]: + # Metaflow extensions; for now, we package *all* extensions but this may change + # at a later date; it is possible to call `package_mfext_package` instead of + # `package_mfext_all` but in that case, make sure to also add a + # metaflow_extensions/__init__.py file to properly "close" the metaflow_extensions + # package and prevent other extensions from being loaded that may be + # present in the rest of the system + for path_tuple in package_mfext_all(): + yield path_tuple[0], os.path.join(MFENV_DIR, path_tuple[1]) diff --git a/metaflow/plugins/package_cli.py b/metaflow/plugins/package_cli.py index 2e6519f862f..418e03ac68a 100644 --- a/metaflow/plugins/package_cli.py +++ b/metaflow/plugins/package_cli.py @@ -38,14 +38,24 @@ def info(obj): @package.command(help="List files included in the code package.") +@click.option( + "--archive/--no-archive", + default=False, + help="If True, lists the file paths as present in the tarball. " + "If False, lists the files on the filesystem.", + show_default=True, +) @click.pass_obj -def list(obj): +def list(obj, archive=False): obj.echo( "Files included in the code package " "(change with --package-suffixes):", fg="magenta", bold=False, ) - obj.echo_always("\n".join(path for path, _ in obj.package.path_tuples())) + if archive: + obj.echo_always("\n".join(path for _, path in obj.package.path_tuples())) + else: + obj.echo_always("\n".join(path for path, _ in obj.package.path_tuples())) @package.command(help="Save the current code package in a tar file") diff --git a/metaflow/plugins/pypi/conda_decorator.py b/metaflow/plugins/pypi/conda_decorator.py index b1b7ee833d9..5396ddf8788 100644 --- a/metaflow/plugins/pypi/conda_decorator.py +++ b/metaflow/plugins/pypi/conda_decorator.py @@ -10,10 +10,10 @@ from metaflow.extension_support import EXT_PKG from metaflow.metadata_provider import MetaDatum from metaflow.metaflow_environment import InvalidEnvironmentException +from metaflow.package.mfenv import MFEnv +from metaflow.special_files import SpecialFile from metaflow.util import get_metaflow_root -from ...info_file import INFO_FILE - class CondaStepDecorator(StepDecorator): """ @@ -159,11 +159,11 @@ def runtime_init(self, flow, graph, package, run_id): os.path.join(self.metaflow_dir.name, "metaflow"), ) - info = os.path.join(get_metaflow_root(), os.path.basename(INFO_FILE)) + info = MFEnv.get_filename(SpecialFile.INFO_FILE) # Symlink the INFO file as well to properly propagate down the Metaflow version - if os.path.isfile(info): + if info: os.symlink( - info, os.path.join(self.metaflow_dir.name, os.path.basename(INFO_FILE)) + info, os.path.join(self.metaflow_dir.name, os.path.basename(info)) ) else: # If there is no info file, we will actually create one in this new @@ -173,7 +173,10 @@ def runtime_init(self, flow, graph, package, run_id): # EXT_PKG extensions are PYTHONPATH extensions. Instead of re-resolving, # we use the resolved information that is written out to the INFO file. with open( - os.path.join(self.metaflow_dir.name, os.path.basename(INFO_FILE)), + os.path.join( + self.metaflow_dir.name, + os.path.basename(SpecialFile.INFO_FILE.value), + ), mode="wt", encoding="utf-8", ) as f: diff --git a/metaflow/special_files.py b/metaflow/special_files.py new file mode 100644 index 00000000000..bea74b2e210 --- /dev/null +++ b/metaflow/special_files.py @@ -0,0 +1,41 @@ +import json +import os + +from enum import Enum + +from .util import get_metaflow_root + +_info_file_content = None +_info_file_present = None + + +# Ideally these would be in package/mfenv.py but that would cause imports to fail so +# moving here. The reason is that this is needed to read extension information which needs +# to happen before mfenv gets packaged. + +MFENV_DIR = ".mfenv" + + +class SpecialFile(Enum): + INFO_FILE = "INFO" + CONFIG_FILE = "CONFIG_PARAMETERS" + + +def read_info_file(): + + global _info_file_content + global _info_file_present + if _info_file_present is None: + file_path = os.path.join( + get_metaflow_root(), MFENV_DIR, SpecialFile.INFO_FILE.value + ) + if os.path.exists(file_path): + with open(file_path, "r") as f: + _info_file_content = json.load(f) + _info_file_present = True + else: + _info_file_present = False + + if _info_file_present: + return _info_file_content + return None diff --git a/metaflow/user_configs/config_options.py b/metaflow/user_configs/config_options.py index 341e775dd99..ca791b472b2 100644 --- a/metaflow/user_configs/config_options.py +++ b/metaflow/user_configs/config_options.py @@ -7,9 +7,10 @@ from metaflow._vendor import click from metaflow.debug import debug -from .config_parameters import CONFIG_FILE, ConfigValue +from .config_parameters import ConfigValue from ..exception import MetaflowException, MetaflowInternalError from ..parameters import DeployTimeField, ParameterContext, current_flow +from ..special_files import SpecialFile from ..util import get_username @@ -24,7 +25,7 @@ def _load_config_values(info_file: Optional[str] = None) -> Optional[Dict[Any, Any]]: if info_file is None: - info_file = os.path.basename(CONFIG_FILE) + info_file = os.path.basename(SpecialFile.CONFIG_FILE.value) try: with open(info_file, encoding="utf-8") as contents: return json.load(contents).get("user_configs", {}) diff --git a/metaflow/user_configs/config_parameters.py b/metaflow/user_configs/config_parameters.py index a430285c9d5..e5af4a82852 100644 --- a/metaflow/user_configs/config_parameters.py +++ b/metaflow/user_configs/config_parameters.py @@ -36,10 +36,6 @@ # return tracefunc_closure -CONFIG_FILE = os.path.join( - os.path.dirname(os.path.abspath(__file__)), "CONFIG_PARAMETERS" -) - ID_PATTERN = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_]*$") UNPACK_KEY = "_unpacked_delayed_" diff --git a/metaflow/util.py b/metaflow/util.py index cd3447d0e48..b5cb0a14085 100644 --- a/metaflow/util.py +++ b/metaflow/util.py @@ -9,7 +9,6 @@ from itertools import takewhile import re -from metaflow.exception import MetaflowUnknownUser, MetaflowInternalError try: # python2 @@ -162,6 +161,8 @@ def get_username(): def resolve_identity_as_tuple(): + from metaflow.exception import MetaflowUnknownUser + prod_token = os.environ.get("METAFLOW_PRODUCTION_TOKEN") if prod_token: return "production", prod_token @@ -236,6 +237,8 @@ class of the given object. def compress_list(lst, separator=",", rangedelim=":", zlibmarker="!", zlibmin=500): + from metaflow.exception import MetaflowInternalError + bad_items = [x for x in lst if separator in x or rangedelim in x or zlibmarker in x] if bad_items: raise MetaflowInternalError( diff --git a/test/test_config/basic_config_silly.txt b/test/test_config/basic_config_silly.txt new file mode 100644 index 00000000000..c438d89d5e0 --- /dev/null +++ b/test/test_config/basic_config_silly.txt @@ -0,0 +1 @@ +baz:amazing diff --git a/test/test_config/card_config.py b/test/test_config/card_config.py new file mode 100644 index 00000000000..cd3026e7ac8 --- /dev/null +++ b/test/test_config/card_config.py @@ -0,0 +1,21 @@ +import time +from metaflow import FlowSpec, step, Config, card + + +class CardConfigFlow(FlowSpec): + + config = Config("config", default_value="") + + @card(type=config.type) + @step + def start(self): + print("card type", self.config.type) + self.next(self.end) + + @step + def end(self): + print("full config", self.config) + + +if __name__ == "__main__": + CardConfigFlow() diff --git a/test/test_config/config2.json b/test/test_config/config2.json new file mode 100644 index 00000000000..12ec1d8f996 --- /dev/null +++ b/test/test_config/config2.json @@ -0,0 +1,4 @@ +{ + "default_param": 456, + "default_param2": 789 +} diff --git a/test/test_config/config_card.py b/test/test_config/config_card.py new file mode 100644 index 00000000000..04855a1ebed --- /dev/null +++ b/test/test_config/config_card.py @@ -0,0 +1,30 @@ +import time +from metaflow import FlowSpec, step, card, current, Config, Parameter, config_expr +from metaflow.cards import Image + +BASE = "https://picsum.photos/id" + + +class ConfigurablePhotoFlow(FlowSpec): + cfg = Config("config", default="photo_config.json") + id = Parameter("id", default=cfg.id, type=int) + size = Parameter("size", default=cfg.size, type=int) + + @card + @step + def start(self): + import requests + + params = {k: v for k, v in self.cfg.style.items() if v} + self.url = f"{BASE}/{self.id}/{self.size}/{self.size}" + img = requests.get(self.url, params) + current.card.append(Image(img.content)) + self.next(self.end) + + @step + def end(self): + pass + + +if __name__ == "__main__": + ConfigurablePhotoFlow() diff --git a/test/test_config/config_parser.py b/test/test_config/config_parser.py new file mode 100644 index 00000000000..f081e48d7b7 --- /dev/null +++ b/test/test_config/config_parser.py @@ -0,0 +1,103 @@ +import json +import os + +from metaflow import ( + Config, + FlowSpec, + Parameter, + config_expr, + current, + environment, + project, + pypi_base, + req_parser, + step, +) + +default_config = {"project_name": "config_parser"} + + +def audit(run, parameters, configs, stdout_path): + # We should only have one run here + if len(run) != 1: + raise RuntimeError("Expected only one run; got %d" % len(run)) + run = run[0] + + # Check successful run + if not run.successful: + raise RuntimeError("Run was not successful") + + if len(parameters) > 1: + expected_tokens = parameters[-1].split() + if len(expected_tokens) < 8: + raise RuntimeError("Unexpected parameter list: %s" % str(expected_tokens)) + expected_token = expected_tokens[7] + else: + expected_token = "" + + # Check that we have the proper project name + if f"project:{default_config['project_name']}" not in run.tags: + raise RuntimeError("Project name is incorrect.") + + # Check the value of the artifacts in the end step + end_task = run["end"].task + assert end_task.data.trigger_param == expected_token + + if end_task.data.lib_version != "2.5.148": + raise RuntimeError("Library version is incorrect.") + + # Check we properly parsed the requirements file + if len(end_task.data.req_config) != 2: + raise RuntimeError( + "Requirements file is incorrect -- expected 2 keys, saw %s" + % str(end_task.data.req_config) + ) + if end_task.data.req_config["python"] != "3.10.*": + raise RuntimeError( + "Requirements file is incorrect -- got python version %s" + % end_task.data.req_config["python"] + ) + + if end_task.data.req_config["packages"] != {"regex": "2024.11.6"}: + raise RuntimeError( + "Requirements file is incorrect -- got packages %s" + % end_task.data.req_config["packages"] + ) + + return None + + +def trigger_name_func(ctx): + return [current.project_flow_name + "Trigger"] + + +@project(name=config_expr("cfg.project_name")) +@pypi_base(**config_expr("req_config")) +class ConfigParser(FlowSpec): + + trigger_param = Parameter( + "trigger_param", + default="", + external_trigger=True, + external_artifact=trigger_name_func, + ) + cfg = Config("cfg", default_value=default_config) + + req_config = Config( + "req_config", default="config_parser_requirements.txt", parser=req_parser + ) + + @step + def start(self): + import regex + + self.lib_version = regex.__version__ # Should be '2.5.148' + self.next(self.end) + + @step + def end(self): + pass + + +if __name__ == "__main__": + ConfigParser() diff --git a/test/test_config/config_parser_requirements.txt b/test/test_config/config_parser_requirements.txt new file mode 100644 index 00000000000..b692401b2f4 --- /dev/null +++ b/test/test_config/config_parser_requirements.txt @@ -0,0 +1,2 @@ +python==3.10.* +regex==2024.11.6 diff --git a/test/test_config/config_simple.json b/test/test_config/config_simple.json new file mode 100644 index 00000000000..73d35ad496a --- /dev/null +++ b/test/test_config/config_simple.json @@ -0,0 +1 @@ +{"some": {"value": 5}} diff --git a/test/test_config/config_simple.py b/test/test_config/config_simple.py new file mode 100644 index 00000000000..d4637e7c679 --- /dev/null +++ b/test/test_config/config_simple.py @@ -0,0 +1,98 @@ +import json +import os + +from metaflow import ( + Config, + FlowSpec, + Parameter, + config_expr, + current, + environment, + project, + step, +) + +default_config = {"a": {"b": "41", "project_name": "config_project"}} + + +def audit(run, parameters, configs, stdout_path): + # We should only have one run here + if len(run) != 1: + raise RuntimeError("Expected only one run; got %d" % len(run)) + run = run[0] + + # Check successful run + if not run.successful: + raise RuntimeError("Run was not successful") + + if configs and configs.get("cfg_default_value"): + config = json.loads(configs["cfg_default_value"]) + else: + config = default_config + + if len(parameters) > 1: + expected_tokens = parameters[-1].split() + if len(expected_tokens) < 8: + raise RuntimeError("Unexpected parameter list: %s" % str(expected_tokens)) + expected_token = expected_tokens[7] + else: + expected_token = "" + + # Check that we have the proper project name + if f"project:{config['a']['project_name']}" not in run.tags: + raise RuntimeError("Project name is incorrect.") + + # Check the value of the artifacts in the end step + end_task = run["end"].task + assert end_task.data.trigger_param == expected_token + if ( + end_task.data.config_val != 5 + or end_task.data.config_val_2 != config["a"]["b"] + or end_task.data.config_from_env != "5" + or end_task.data.config_from_env_2 != config["a"]["b"] + ): + raise RuntimeError("Config values are incorrect.") + + return None + + +def trigger_name_func(ctx): + return [current.project_flow_name + "Trigger"] + + +@project(name=config_expr("cfg_default_value.a.project_name")) +class ConfigSimple(FlowSpec): + + trigger_param = Parameter( + "trigger_param", + default="", + external_trigger=True, + external_artifact=trigger_name_func, + ) + cfg = Config("cfg", default="config_simple.json") + cfg_default_value = Config( + "cfg_default_value", + default_value=default_config, + ) + + @environment( + vars={ + "TSTVAL": config_expr("str(cfg.some.value)"), + "TSTVAL2": cfg_default_value.a.b, + } + ) + @step + def start(self): + self.config_from_env = os.environ.get("TSTVAL") + self.config_from_env_2 = os.environ.get("TSTVAL2") + self.config_val = self.cfg.some.value + self.config_val_2 = self.cfg_default_value.a.b + self.next(self.end) + + @step + def end(self): + pass + + +if __name__ == "__main__": + ConfigSimple() diff --git a/test/test_config/config_simple2.py b/test/test_config/config_simple2.py new file mode 100644 index 00000000000..18ecb39eb26 --- /dev/null +++ b/test/test_config/config_simple2.py @@ -0,0 +1,60 @@ +import json +import os + +from metaflow import ( + Config, + FlowSpec, + Parameter, + config_expr, + current, + environment, + project, + step, + timeout, +) + +default_config = {"blur": 123, "timeout": 10} + + +def myparser(s: str): + return {"hi": "you"} + + +class ConfigSimple(FlowSpec): + + cfg = Config("cfg", default_value=default_config) + cfg_req = Config("cfg_req2", required=True) + blur = Parameter("blur", default=cfg.blur) + blur2 = Parameter("blur2", default=cfg_req.blur) + cfg_non_req = Config("cfg_non_req") + cfg_empty_default = Config("cfg_empty_default", default_value={}) + cfg_empty_default_parser = Config( + "cfg_empty_default_parser", default_value="", parser=myparser + ) + cfg_non_req_parser = Config("cfg_non_req_parser", parser=myparser) + + @timeout(seconds=cfg["timeout"]) + @step + def start(self): + print( + "Non req: %s; emtpy_default %s; empty_default_parser: %s, non_req_parser: %s" + % ( + self.cfg_non_req, + self.cfg_empty_default, + self.cfg_empty_default_parser, + self.cfg_non_req_parser, + ) + ) + print("Blur is %s" % self.blur) + print("Blur2 is %s" % self.blur2) + print("Config is of type %s" % type(self.cfg)) + self.next(self.end) + + @step + def end(self): + print("Blur is %s" % self.blur) + print("Blur2 is %s" % self.blur2) + + +if __name__ == "__main__": + ConfigSimple() diff --git a/test/test_config/helloconfig.py b/test/test_config/helloconfig.py new file mode 100644 index 00000000000..e62e795f615 --- /dev/null +++ b/test/test_config/helloconfig.py @@ -0,0 +1,138 @@ +import os + +from metaflow import ( + Config, + FlowSpec, + Parameter, + environment, + step, + project, + config_expr, + CustomFlowDecorator, + CustomStepDecorator, + titus, +) + + +def silly_parser(s): + k, v = s.split(":") + return {k: v} + + +def param_func(ctx): + return ctx.configs.config2.default_param2 + 1 + + +def config_func(ctx): + return {"val": 123} + + +default_config = { + "run_on_titus": ["hello"], + "cpu_count": 2, + "env_to_start": "Romain", + "magic_value": 42, + "project_name": "hirec", +} + +silly_config = "baz:awesome" + + +class TitusOrNot(CustomFlowDecorator): + def evaluate(self, mutable_flow): + for name, s in mutable_flow.steps: + if name in mutable_flow.config.run_on_titus: + s.add_decorator(titus, cpu=mutable_flow.config.cpu_count) + + +class AddEnvToStart(CustomFlowDecorator): + def evaluate(self, mutable_flow): + s = mutable_flow.start + s.add_decorator(environment, vars={"hello": mutable_flow.config.env_to_start}) + + +@TitusOrNot +@AddEnvToStart +@project(name=config_expr("config").project_name) +class HelloConfig(FlowSpec): + """ + A flow where Metaflow prints 'Hi'. + + Run this flow to validate that Metaflow is installed correctly. + + """ + + default_from_config = Parameter( + "default_from_config", default=config_expr("config2").default_param, type=int + ) + + default_from_func = Parameter("default_from_func", default=param_func, type=int) + + config = Config("config", default_value=default_config, help="Help for config") + sconfig = Config( + "sconfig", + default="sillyconfig.txt", + parser=silly_parser, + help="Help for sconfig", + required=True, + ) + config2 = Config("config2") + + config3 = Config("config3", default_value=config_func) + + env_config = Config("env_config", default_value={"vars": {"name": "Romain"}}) + + @step + def start(self): + """ + This is the 'start' step. All flows must have a step named 'start' that + is the first step in the flow. + + """ + print("HelloConfig is %s (should be awesome)" % self.sconfig.baz) + print( + "Environment variable hello %s (should be Romain)" % os.environ.get("hello") + ) + + print( + "Parameters are: default_from_config: %s, default_from_func: %s" + % (self.default_from_config, self.default_from_func) + ) + + print("Config3 has value: %s" % self.config3.val) + self.next(self.hello) + + @environment( + vars={ + "normal": config.env_to_start, + "stringify": config_expr("str(config.magic_value)"), + } + ) + @step + def hello(self): + """ + A step for metaflow to introduce itself. + + """ + print( + "In this step, we got a normal variable %s, one that is stringified %s" + % ( + os.environ.get("normal"), + os.environ.get("stringify"), + ) + ) + self.next(self.end) + + @environment(**env_config) + @step + def end(self): + """ + This is the 'end' step. All flows must have an 'end' step, which is the + last step in the flow. + + """ + print("HelloFlow is all done for %s" % os.environ.get("name")) + + +if __name__ == "__main__": + HelloConfig() diff --git a/test/test_config/mutable_flow.py b/test/test_config/mutable_flow.py new file mode 100644 index 00000000000..05ccd2b21bb --- /dev/null +++ b/test/test_config/mutable_flow.py @@ -0,0 +1,250 @@ +import json +import os + +from metaflow import ( + Config, + CustomFlowDecorator, + CustomStepDecorator, + FlowSpec, + Parameter, + config_expr, + current, + environment, + project, + step, +) + +default_config = { + "parameters": [ + {"name": "param1", "default": "41"}, + {"name": "param2", "default": "42"}, + ], + "step_add_environment": {"vars": {"STEP_LEVEL": "2"}}, + "step_add_environment_2": {"vars": {"STEP_LEVEL_2": "3"}}, + "flow_add_environment": {"vars": {"FLOW_LEVEL": "4"}}, + "project_name": "config_project", +} + + +def find_param_in_parameters(parameters, name): + for param in parameters: + splits = param.split(" ") + try: + idx = splits.index("--" + name) + return splits[idx + 1] + except ValueError: + continue + return None + + +def audit(run, parameters, configs, stdout_path): + # We should only have one run here + if len(run) != 1: + raise RuntimeError("Expected only one run; got %d" % len(run)) + run = run[0] + + # Check successful run + if not run.successful: + raise RuntimeError("Run was not successful") + + if configs: + # We should have one config called "config" + if len(configs) != 1 or not configs.get("config"): + raise RuntimeError("Expected one config called 'config'") + config = json.loads(configs["config"]) + else: + config = default_config + + if len(parameters) > 1: + expected_tokens = parameters[-1].split() + if len(expected_tokens) < 8: + raise RuntimeError("Unexpected parameter list: %s" % str(expected_tokens)) + expected_token = expected_tokens[7] + else: + expected_token = "" + + # Check that we have the proper project name + if f"project:{config['project_name']}" not in run.tags: + raise RuntimeError("Project name is incorrect.") + + # Check the start step that all values are properly set. We don't need + # to check end step as it would be a duplicate + start_task_data = run["start"].task.data + + assert start_task_data.trigger_param == expected_token + for param in config["parameters"]: + value = find_param_in_parameters(parameters, param["name"]) or param["default"] + if not hasattr(start_task_data, param["name"]): + raise RuntimeError(f"Missing parameter {param['name']}") + if getattr(start_task_data, param["name"]) != value: + raise RuntimeError( + f"Parameter {param['name']} has incorrect value %s versus %s expected" + % (getattr(start_task_data, param["name"]), value) + ) + assert ( + start_task_data.flow_level + == config["flow_add_environment"]["vars"]["FLOW_LEVEL"] + ) + assert ( + start_task_data.step_level + == config["step_add_environment"]["vars"]["STEP_LEVEL"] + ) + assert ( + start_task_data.step_level_2 + == config["step_add_environment_2"]["vars"]["STEP_LEVEL_2"] + ) + + return None + + +class ModifyFlow(CustomFlowDecorator): + def evaluate(self, mutable_flow): + steps = ["start", "end"] + count = 0 + for name, s in mutable_flow.steps: + assert name in steps, "Unexpected step name" + steps.remove(name) + count += 1 + assert count == 2, "Unexpected number of steps" + + count = 0 + parameters = [] + for name, c in mutable_flow.configs: + assert name == "config", "Unexpected config name" + parameters = c["parameters"] + count += 1 + assert count == 1, "Unexpected number of configs" + + count = 0 + for name, p in mutable_flow.parameters: + if name == "trigger_param": + continue + assert name == parameters[count]["name"], "Unexpected parameter name" + count += 1 + + # Do some actual modification, we are going to update an environment decorator. + # Note that in this flow, we have an environment decorator which is then + to_add = mutable_flow.config["flow_add_environment"]["vars"] + for name, s in mutable_flow.steps: + if name == "start": + decos = [deco for deco in s.decorators] + assert len(decos) == 1, "Unexpected number of decorators" + assert decos[0].name == "environment", "Unexpected decorator" + for k, v in to_add.items(): + decos[0].attributes["vars"][k] = v + else: + s.add_decorator( + environment, **mutable_flow.config["flow_add_environment"].to_dict() + ) + + +class ModifyFlowWithArgs(CustomFlowDecorator): + def init(self, *args, **kwargs): + self._field_to_check = args[0] + + def evaluate(self, mutable_flow): + parameters = mutable_flow.config.get(self._field_to_check, []) + for param in parameters: + mutable_flow.add_parameter( + param["name"], + Parameter( + param["name"], + type=str, + default=param["default"], + external_artifact=trigger_name_func, + ), + overwrite=True, + ) + + +class ModifyStep(CustomStepDecorator): + def evaluate(self, mutable_step): + mutable_step.remove_decorator("environment") + + for deco in mutable_step.decorators: + assert deco.name != "environment", "Unexpected decorator" + + mutable_step.add_decorator( + environment, **mutable_step.flow.config["step_add_environment"].to_dict() + ) + + +class ModifyStep2(CustomStepDecorator): + def evaluate(self, mutable_step): + to_add = mutable_step.flow.config["step_add_environment_2"]["vars"] + for deco in mutable_step.decorators: + if deco.name == "environment": + for k, v in to_add.items(): + deco.attributes["vars"][k] = v + + +def trigger_name_func(ctx): + return [current.project_flow_name + "Trigger"] + + +@ModifyFlow +@ModifyFlowWithArgs("parameters") +@project(name=config_expr("config.project_name")) +class ConfigMutableFlow(FlowSpec): + + trigger_param = Parameter( + "trigger_param", + default="", + external_trigger=True, + external_artifact=trigger_name_func, + ) + config = Config("config", default_value=default_config) + + def _check(self, step_decorators): + for p in self.config.parameters: + assert hasattr(self, p["name"]), "Missing parameter" + + assert ( + os.environ.get("SHOULD_NOT_EXIST") is None + ), "Unexpected environment variable" + + assert ( + os.environ.get("FLOW_LEVEL") + == self.config.flow_add_environment["vars"]["FLOW_LEVEL"] + ), "Flow level environment variable not set" + self.flow_level = os.environ.get("FLOW_LEVEL") + + if step_decorators: + assert ( + os.environ.get("STEP_LEVEL") + == self.config.step_add_environment.vars.STEP_LEVEL + ), "Missing step_level decorator" + assert ( + os.environ.get("STEP_LEVEL_2") + == self.config["step_add_environment_2"]["vars"].STEP_LEVEL_2 + ), "Missing step_level_2 decorator" + + self.step_level = os.environ.get("STEP_LEVEL") + self.step_level_2 = os.environ.get("STEP_LEVEL_2") + else: + assert ( + os.environ.get("STEP_LEVEL") is None + ), "Step level environment variable set" + assert ( + os.environ.get("STEP_LEVEL_2") is None + ), "Step level 2 environment variable set" + + @ModifyStep2 + @ModifyStep + @environment(vars={"SHOULD_NOT_EXIST": "1"}) + @step + def start(self): + print("Starting start step...") + self._check(step_decorators=True) + print("All checks are good.") + self.next(self.end) + + @step + def end(self): + print("Starting end step...") + self._check(step_decorators=False) + print("All checks are good.") + + +if __name__ == "__main__": + ConfigMutableFlow() diff --git a/test/test_config/no_default.py b/test/test_config/no_default.py new file mode 100644 index 00000000000..6acc0dbcbc6 --- /dev/null +++ b/test/test_config/no_default.py @@ -0,0 +1,18 @@ +from metaflow import Config, FlowSpec, card, step + + +class Sample(FlowSpec): + config = Config("config", default=None) + + @card + @step + def start(self): + self.next(self.end) + + @step + def end(self): + pass + + +if __name__ == "__main__": + Sample() diff --git a/test/test_config/photo_config.json b/test/test_config/photo_config.json new file mode 100644 index 00000000000..6d04a5a102e --- /dev/null +++ b/test/test_config/photo_config.json @@ -0,0 +1,8 @@ +{ + "id": 1084, + "size": 400, + "style": { + "grayscale": true, + "blur": 5 + } +} diff --git a/test/test_config/runner_flow.py b/test/test_config/runner_flow.py new file mode 100644 index 00000000000..1b55234b42f --- /dev/null +++ b/test/test_config/runner_flow.py @@ -0,0 +1,17 @@ +from metaflow import FlowSpec, Runner, step + + +class RunnerFlow(FlowSpec): + @step + def start(self): + with Runner("./mutable_flow.py") as r: + r.run() + self.next(self.end) + + @step + def end(self): + print("Done") + + +if __name__ == "__main__": + RunnerFlow() diff --git a/test/test_config/test.py b/test/test_config/test.py new file mode 100644 index 00000000000..64b63a43a0f --- /dev/null +++ b/test/test_config/test.py @@ -0,0 +1,122 @@ +import json +import os +import uuid + +from typing import Any, Dict, List, Optional + +maestro_rand = str(uuid.uuid4())[:8] +scheduler_cluster = os.environ.get("NETFLIX_ENVIRONMENT", "sandbox") +# Use sandbox for tests +if scheduler_cluster == "prod": + scheduler_cluster = "sandbox" + + +# Generates tests for regular, titus and maestro invocations +def all_three_options( + id_base: str, + flow: str, + config_values: Optional[List[Dict[str, Any]]] = None, + configs: Optional[List[Dict[str, str]]] = None, + addl_params: Optional[List[str]] = None, +): + result = [] + if config_values is None: + config_values = [{}] + if configs is None: + configs = [{}] + if addl_params is None: + addl_params = [] + + if len(config_values) < len(configs): + config_values.extend([{}] * (len(configs) - len(config_values))) + if len(configs) < len(config_values): + configs.extend([{}] * (len(config_values) - len(configs))) + if len(addl_params) < len(config_values): + addl_params.extend([""] * (len(config_values) - len(addl_params))) + + for idx, (config_value, config) in enumerate(zip(config_values, configs)): + # Regular run + result.append( + { + "id": f"{id_base}_{idx}", + "flow": flow, + "config_values": config_value, + "configs": config, + "params": "run " + addl_params[idx], + } + ) + + # Titus run + result.append( + { + "id": f"{id_base}_titus_{idx}", + "flow": flow, + "config_values": config_value, + "configs": config, + "params": "run --with titus " + addl_params[idx], + } + ) + + # Maestro run + result.append( + { + "id": f"{id_base}_maestro_{idx}", + "flow": flow, + "config_values": config_value, + "configs": config, + "params": [ + # Create the flow + f"--branch {maestro_rand}_{id_base}_maestro_{idx} maestro " + f"--cluster {scheduler_cluster} create", + # Trigger the run + f"--branch {maestro_rand}_{id_base}_maestro_{idx} maestro " + f"--cluster {scheduler_cluster} trigger --trigger_param " + f"{maestro_rand} --force " + addl_params[idx], + ], + "user_environment": {"METAFLOW_SETUP_GANDALF_POLICY": "0"}, + } + ) + return result + + +TESTS = [ + *all_three_options( + "config_simple", + "config_simple.py", + [ + {}, + { + "cfg_default_value": json.dumps( + {"a": {"project_name": "config_project_2", "b": "56"}} + ) + }, + ], + ), + *all_three_options( + "mutable_flow", + "mutable_flow.py", + [ + {}, + { + "config": json.dumps( + { + "parameters": [ + {"name": "param3", "default": "43"}, + {"name": "param4", "default": "44"}, + ], + "step_add_environment": {"vars": {"STEP_LEVEL": "5"}}, + "step_add_environment_2": {"vars": {"STEP_LEVEL_2": "6"}}, + "flow_add_environment": {"vars": {"FLOW_LEVEL": "7"}}, + "project_name": "config_project_2", + } + ) + }, + ], + addl_params=["", "--param3 45"], + ), + *all_three_options( + "config_parser_flow", + "config_parser.py", + [{}], + ), +]