diff --git a/metaflow/cli.py b/metaflow/cli.py index 1fa0c729f88..2d44f4c103a 100644 --- a/metaflow/cli.py +++ b/metaflow/cli.py @@ -15,6 +15,7 @@ from .exception import CommandException, MetaflowException from .flowspec import _FlowState from .graph import FlowGraph +from .meta_files import read_included_dist_info from .metaflow_config import ( DEFAULT_DATASTORE, DEFAULT_ENVIRONMENT, @@ -26,6 +27,7 @@ from .metaflow_current import current from metaflow.system import _system_monitor, _system_logger from .metaflow_environment import MetaflowEnvironment +from .package.mfenv import PackagedDistributionFinder from .plugins import ( DATASTORES, ENVIRONMENTS, @@ -325,6 +327,11 @@ def start( echo(" executing *%s*" % ctx.obj.flow.name, fg="magenta", nl=False) echo(" for *%s*" % resolve_identity(), fg="magenta") + # Check if we need to setup the distribution finder (if running ) + dist_info = read_included_dist_info() + if dist_info: + sys.meta_path.append(PackagedDistributionFinder(dist_info)) + # Setup the context cli_args._set_top_kwargs(ctx.params) ctx.obj.echo = echo diff --git a/metaflow/client/core.py b/metaflow/client/core.py index 4d3ea0676f8..29d9d23f8c8 100644 --- a/metaflow/client/core.py +++ b/metaflow/client/core.py @@ -34,7 +34,7 @@ from metaflow.metaflow_environment import MetaflowEnvironment from metaflow.package.mfenv import MFEnv from metaflow.plugins import ENVIRONMENTS, METADATA_PROVIDERS -from metaflow.special_files import SpecialFile +from metaflow.meta_files import MetaFile from metaflow.unbounded_foreach import CONTROL_TASK_TAG from metaflow.util import cached_property, is_stringish, resolve_identity, to_unicode @@ -825,7 +825,7 @@ def __init__(self, flow_name: str, code_package: str): ) code_obj = BytesIO(blobdata) self._tar = tarfile.open(fileobj=code_obj, mode="r:gz") - info_str = MFEnv.get_archive_content(self._tar, SpecialFile.INFO_FILE) + info_str = MFEnv.get_archive_content(self._tar, MetaFile.INFO_FILE) self._info = json.loads(info_str) self._flowspec = self._tar.extractfile(self._info["script"]).read() diff --git a/metaflow/extension_support/__init__.py b/metaflow/extension_support/__init__.py index 5de8de66929..f1f25b7777e 100644 --- a/metaflow/extension_support/__init__.py +++ b/metaflow/extension_support/__init__.py @@ -12,7 +12,7 @@ from itertools import chain from pathlib import Path -from metaflow.special_files import read_info_file +from metaflow.meta_files import read_info_file # diff --git a/metaflow/meta_files.py b/metaflow/meta_files.py new file mode 100644 index 00000000000..b14c1c68214 --- /dev/null +++ b/metaflow/meta_files.py @@ -0,0 +1,58 @@ +import json +import os + +from enum import Enum + +from .util import get_metaflow_root + +_info_file_content = None +_info_file_present = None +_included_dist_info = None +_included_dist_present = None + +# Ideally these would be in package/mfenv.py but that would cause imports to fail so +# moving here. The reason is that this is needed to read extension information which needs +# to happen before mfenv gets packaged. + +MFENV_DIR = ".mfenv" + + +class MetaFile(Enum): + INFO_FILE = "INFO" + CONFIG_FILE = "CONFIG_PARAMETERS" + INCLUDED_DIST_INFO = "INCLUDED_DIST_INFO" + + +def read_info_file(): + + global _info_file_content + global _info_file_present + if _info_file_present is None: + file_path = os.path.join(get_metaflow_root(), MetaFile.INFO_FILE.value) + if os.path.exists(file_path): + with open(file_path, "r", encoding="utf-8") as f: + _info_file_content = json.load(f) + _info_file_present = True + else: + _info_file_present = False + + if _info_file_present: + return _info_file_content + return None + + +def read_included_dist_info(): + global _included_dist_info + global _included_dist_present + if _included_dist_present is None: + file_path = os.path.join(get_metaflow_root(), MetaFile.INCLUDED_DIST_INFO.value) + if os.path.exists(file_path): + with open(file_path, "r", encoding="utf-8") as f: + _included_dist_info = json.load(f) + _included_dist_present = True + else: + _included_dist_present = False + + if _included_dist_present: + return _included_dist_info + return None diff --git a/metaflow/metaflow_environment.py b/metaflow/metaflow_environment.py index dde7be0b9fe..d6084c06c3a 100644 --- a/metaflow/metaflow_environment.py +++ b/metaflow/metaflow_environment.py @@ -7,6 +7,8 @@ from metaflow.exception import MetaflowException from metaflow.extension_support import dump_module_info from metaflow.mflog import BASH_MFLOG, BASH_FLUSH_LOGS + +from .meta_files import MFENV_DIR from . import R @@ -176,6 +178,7 @@ def get_package_commands(self, code_package_url, datastore_type): "after 6 tries. Exiting...' && exit 1; " "fi" % code_package_url, "TAR_OPTIONS='--warning=no-timestamp' tar xf job.tar", + "export PYTHONPATH=`pwd`/%s:$PYTHONPATH" % MFENV_DIR, "mflog 'Task is starting.'", "flush_mflogs", ] diff --git a/metaflow/metaflow_version.py b/metaflow/metaflow_version.py index 92dab210bdf..e0857f4a807 100644 --- a/metaflow/metaflow_version.py +++ b/metaflow/metaflow_version.py @@ -11,7 +11,7 @@ from os import path, name, environ, listdir from metaflow.extension_support import update_package_info -from metaflow.special_files import read_info_file +from metaflow.meta_files import read_info_file # True/False correspond to the value `public`` in get_version diff --git a/metaflow/package/__init__.py b/metaflow/package/__init__.py index 74c763bd536..b4430633d8e 100644 --- a/metaflow/package/__init__.py +++ b/metaflow/package/__init__.py @@ -8,7 +8,7 @@ from ..metaflow_config import DEFAULT_PACKAGE_SUFFIXES from ..exception import MetaflowException -from ..special_files import SpecialFile +from ..meta_files import MetaFile from ..user_configs.config_parameters import dump_config_values from .. import R @@ -46,16 +46,16 @@ def __init__(self, flow, environment, echo, suffixes=DEFAULT_SUFFIXES_LIST): self._code_env = MFEnv(lambda x: hasattr(x, "METAFLOW_PACKAGE")) - # Add special content - self._code_env.add_special_content( - SpecialFile.INFO_FILE, + # Add metacontent + self._code_env.add_meta_content( + MetaFile.INFO_FILE, json.dumps( self.environment.get_environment_info(include_ext_info=True) ).encode("utf-8"), ) - self._code_env.add_special_content( - SpecialFile.CONFIG_FILE, + self._code_env.add_meta_content( + MetaFile.CONFIG_FILE, json.dumps(dump_config_values(self._flow)).encode("utf-8"), ) @@ -68,7 +68,7 @@ def path_tuples(self): # Package the environment for path, arcname in self._code_env.files(): yield path, arcname - for _, arcname in self._code_env.contents(): + for _, arcname in self._code_env.metacontents(): yield f"{arcname}", arcname # Package the user code @@ -139,7 +139,7 @@ def no_mtime(tarinfo): # Package the environment for path, arcname in self._code_env.files(): tar.add(path, arcname=arcname, recursive=False, filter=no_mtime) - for content, arcname in self._code_env.contents(): + for content, arcname in self._code_env.metacontents(): self._add_file(tar, arcname, BytesIO(content)) # Package the user code diff --git a/metaflow/package/mfenv.py b/metaflow/package/mfenv.py index 4e08517302c..6c5ad8c6527 100644 --- a/metaflow/package/mfenv.py +++ b/metaflow/package/mfenv.py @@ -1,5 +1,7 @@ import inspect +import json import os +import re import sys import tarfile @@ -26,7 +28,7 @@ from ..debug import debug from ..extension_support import EXT_EXCLUDE_SUFFIXES, metadata, package_mfext_all -from ..special_files import MFENV_DIR, SpecialFile +from ..meta_files import MFENV_DIR, MetaFile from ..util import get_metaflow_root, to_unicode packages_distributions = None @@ -80,6 +82,8 @@ def importable_name(name): _cached_distributions = None +name_normalizer = re.compile(r"[-_.]+") + def modules_to_distributions() -> Dict[str, List[metadata.Distribution]]: """ @@ -106,6 +110,50 @@ class _ModuleInfo: module: ModuleType +class PackagedDistribution(metadata.Distribution): + """ + A Python Package packaged within a MFEnv. This allows users to use use importlib + as they would regularly and the packaged Python Package would be considered as a + distribution even if it really isn't (since it is just included in the PythonPath). + """ + + def __init__(self, root: str, content: Dict[str, str]): + self._root = Path(root) + self._content = content + + # Strongly inspired from PathDistribution in metadata.py + def read_text(self, filename: Union[str, os.PathLike[str]]) -> Optional[str]: + if str(filename) in self._content: + return self._content[str(filename)] + return None + + read_text.__doc__ = metadata.Distribution.read_text.__doc__ + + def locate_file(self, path: Union[str, os.PathLike[str]]) -> metadata.SimplePath: + return self._root / path + + +class PackagedDistributionFinder(metadata.DistributionFinder): + + def __init__(self, dist_info: Dict[str, Dict[str, str]]): + self._dist_info = dist_info + + def find_distributions(self, context=metadata.DistributionFinder.Context()): + if context.name is None: + # Yields all known distributions + for name, info in self._dist_info.items(): + yield PackagedDistribution( + os.path.join(get_metaflow_root(), name), info + ) + name = name_normalizer.sub("-", context.name).lower() + if name in self._dist_info: + yield PackagedDistribution( + os.path.join(get_metaflow_root(), context.name), + self._dist_info[name], + ) + return None + + class MFEnv: METAFLOW_SUFFIXES_LIST = [".py", ".html", ".css", ".js"] @@ -172,20 +220,17 @@ def walk( yield p, p[prefixlen:] @classmethod - def get_filename(cls, name: Union[SpecialFile, str]) -> Optional[str]: - # In all cases, the special files are siblings of the metaflow root - # directory. - if isinstance(name, SpecialFile): - r = get_metaflow_root() - path_to_file = os.path.join(r, name.value) - else: - path_to_file = os.path.join(MFENV_DIR, name) + def get_filename(cls, name: Union[MetaFile, str]) -> Optional[str]: + # Get the filename of the expanded file -- it will always be expanded next to + # metaflow_root which is already in MFENV_DIR. + real_name = name.value if isinstance(name, MetaFile) else name + path_to_file = os.path.join(get_metaflow_root(), real_name) if os.path.isfile(path_to_file): return path_to_file return None @classmethod - def get_content(cls, name: Union[SpecialFile, str]) -> Optional[str]: + def get_content(cls, name: Union[MetaFile, str]) -> Optional[str]: file_to_read = cls.get_filename(name) if file_to_read: with open(file_to_read, "r", encoding="utf-8") as f: @@ -194,11 +239,11 @@ def get_content(cls, name: Union[SpecialFile, str]) -> Optional[str]: @classmethod def get_archive_filename( - cls, archive: tarfile.TarFile, name: Union[SpecialFile, str] + cls, archive: tarfile.TarFile, name: Union[MetaFile, str] ) -> Optional[str]: # Backward compatible way of accessing all special files. Prior to MFEnv, they # were stored at the TL of the archive. - real_name = name.value if isinstance(name, SpecialFile) else name + real_name = name.value if isinstance(name, MetaFile) else name if archive.getmember(MFENV_DIR): file_path = os.path.join(MFENV_DIR, real_name) else: @@ -209,7 +254,7 @@ def get_archive_filename( @classmethod def get_archive_content( - cls, archive: tarfile.TarFile, name: Union[SpecialFile, str] + cls, archive: tarfile.TarFile, name: Union[MetaFile, str] ) -> Optional[str]: file_to_read = cls.get_archive_filename(archive, name) if file_to_read: @@ -240,13 +285,13 @@ def __init__(self, criteria: Callable[[ModuleType], bool]) -> None: # Contain metadata information regarding the distributions packaged. # This allows Metaflow to "fake" distribution information when packaged - self._metainfo = {} # type: Dict[str, Dict[str, str]] + self._distmetainfo = {} # type: Dict[str, Dict[str, str]] # Maps an absolute path on the filesystem to the path of the file in the # archive. self._files = {} # type: Dict[str, str] - self._content = {} # type: Dict[SpecialFile, bytes] + self._metacontent = {} # type: Dict[MetaFile, bytes] debug.package_exec(f"Used system modules found: {str(self._modules)}") @@ -264,21 +309,21 @@ def __init__(self, criteria: Callable[[ModuleType], bool]) -> None: def root_dir(self): return MFENV_DIR - def add_special_content(self, name: SpecialFile, content: bytes) -> None: + def add_meta_content(self, name: MetaFile, content: bytes) -> None: """ - Add a special file to the MF environment. + Add a metafile to the MF environment. This file will be included in the resulting code package in `MFENV_DIR`. Parameters ---------- - name : SpecialFile - The special file to add to the MF environment + name : MetaFile + The metafile to add to the MF environment content : bytes - The content of the special file + The content of the metafile """ debug.package_exec(f"Adding special content {name.value} to the MF environment") - self._content[name] = content + self._metacontent[name] = content def add_module(self, module: ModuleType) -> None: """ @@ -394,19 +439,24 @@ def files(self) -> Generator[Tuple[str, str], None, None]: """ return self._files.items() - def contents(self) -> Generator[Tuple[bytes, str], None, None]: + def metacontents(self) -> Generator[Tuple[bytes, str], None, None]: """ - Return a generator of all special files included in the MF environment. + Return a generator of all metafiles included in the MF environment. Returns ------- Generator[Tuple[bytes, str], None, None] - A generator of all special files included in the MF environment. The first + A generator of all metafiles included in the MF environment. The first element of the tuple is the content to add; the second element is path in the archive. """ - for name, content in self._content.items(): + for name, content in self._metacontent.items(): yield content, os.path.join(MFENV_DIR, name.value) + if self._distmetainfo: + yield ( + json.dumps(self._distmetainfo).encode("utf-8"), + os.path.join(MFENV_DIR, MetaFile.INCLUDED_DIST_INFO.value), + ) def _module_files( self, name: str, paths: Set[str] @@ -432,18 +482,19 @@ def _module_files( debug.package_exec( f" Including distribution {dist_name} for module {name}" ) - dist_root = dist.locate_file(name) + dist_root = str(dist.locate_file(name)) if dist_root not in paths: # This is an error because it means that this distribution is # not contributing to the module. raise RuntimeError( f"Distribution '{dist.metadata['Name']}' is not " - "contributing to module '{name}' as expected." + f"contributing to module '{name}' as expected (got '{dist_root}' " + f"when expected one of {paths})" ) paths.discard(dist_root) - if dist_name not in self._metainfo: + if dist_name not in self._distmetainfo: # Possible that a distribution contributes to multiple modules - self._metainfo[dist_name] = { + self._distmetainfo[dist_name] = { # We can add more if needed but these are likely the most # useful (captures, name, version, etc and files which can # be used to find non-python files in the distribution). @@ -453,13 +504,13 @@ def _module_files( for file in dist.files or []: # Skip files that do not belong to this module (distribution may # provide multiple modules) - if not file.startswith(prefix): + if file.parts[0] != name: continue if file == init_file: has_init = True - yield str(dist.locate(file).resolve().as_posix()), os.path.join( - MFENV_DIR, str(file) - ) + yield str( + dist.locate_file(file).resolve().as_posix() + ), os.path.join(MFENV_DIR, str(file)) # Now if there are more paths left in paths, it means there is a non-distribution # component to this package which we also include. diff --git a/metaflow/plugins/pypi/conda_decorator.py b/metaflow/plugins/pypi/conda_decorator.py index 5396ddf8788..3baa3e25357 100644 --- a/metaflow/plugins/pypi/conda_decorator.py +++ b/metaflow/plugins/pypi/conda_decorator.py @@ -11,7 +11,7 @@ from metaflow.metadata_provider import MetaDatum from metaflow.metaflow_environment import InvalidEnvironmentException from metaflow.package.mfenv import MFEnv -from metaflow.special_files import SpecialFile +from metaflow.meta_files import MetaFile from metaflow.util import get_metaflow_root @@ -159,7 +159,7 @@ def runtime_init(self, flow, graph, package, run_id): os.path.join(self.metaflow_dir.name, "metaflow"), ) - info = MFEnv.get_filename(SpecialFile.INFO_FILE) + info = MFEnv.get_filename(MetaFile.INFO_FILE) # Symlink the INFO file as well to properly propagate down the Metaflow version if info: os.symlink( @@ -175,7 +175,7 @@ def runtime_init(self, flow, graph, package, run_id): with open( os.path.join( self.metaflow_dir.name, - os.path.basename(SpecialFile.INFO_FILE.value), + os.path.basename(MetaFile.INFO_FILE.value), ), mode="wt", encoding="utf-8", diff --git a/metaflow/special_files.py b/metaflow/special_files.py deleted file mode 100644 index bea74b2e210..00000000000 --- a/metaflow/special_files.py +++ /dev/null @@ -1,41 +0,0 @@ -import json -import os - -from enum import Enum - -from .util import get_metaflow_root - -_info_file_content = None -_info_file_present = None - - -# Ideally these would be in package/mfenv.py but that would cause imports to fail so -# moving here. The reason is that this is needed to read extension information which needs -# to happen before mfenv gets packaged. - -MFENV_DIR = ".mfenv" - - -class SpecialFile(Enum): - INFO_FILE = "INFO" - CONFIG_FILE = "CONFIG_PARAMETERS" - - -def read_info_file(): - - global _info_file_content - global _info_file_present - if _info_file_present is None: - file_path = os.path.join( - get_metaflow_root(), MFENV_DIR, SpecialFile.INFO_FILE.value - ) - if os.path.exists(file_path): - with open(file_path, "r") as f: - _info_file_content = json.load(f) - _info_file_present = True - else: - _info_file_present = False - - if _info_file_present: - return _info_file_content - return None diff --git a/metaflow/user_configs/config_options.py b/metaflow/user_configs/config_options.py index 62f67224a0d..12556dc4753 100644 --- a/metaflow/user_configs/config_options.py +++ b/metaflow/user_configs/config_options.py @@ -10,7 +10,7 @@ from .config_parameters import ConfigValue from ..exception import MetaflowException, MetaflowInternalError from ..parameters import DeployTimeField, ParameterContext, current_flow -from ..special_files import SpecialFile +from ..meta_files import MetaFile from ..util import get_username @@ -25,7 +25,7 @@ def _load_config_values(info_file: Optional[str] = None) -> Optional[Dict[Any, Any]]: if info_file is None: - info_file = os.path.basename(SpecialFile.CONFIG_FILE.value) + info_file = os.path.basename(MetaFile.CONFIG_FILE.value) try: with open(info_file, encoding="utf-8") as contents: return json.load(contents).get("user_configs", {})