Skip to content

Commit

Permalink
Basic packaging works with Kubernetes
Browse files Browse the repository at this point in the history
(Conda still needs to be fixed to use correct file)
  • Loading branch information
romain-intel committed Mar 6, 2025
1 parent 97efb3f commit 789e841
Show file tree
Hide file tree
Showing 11 changed files with 169 additions and 91 deletions.
7 changes: 7 additions & 0 deletions metaflow/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from .exception import CommandException, MetaflowException
from .flowspec import _FlowState
from .graph import FlowGraph
from .meta_files import read_included_dist_info
from .metaflow_config import (
DEFAULT_DATASTORE,
DEFAULT_ENVIRONMENT,
Expand All @@ -26,6 +27,7 @@
from .metaflow_current import current
from metaflow.system import _system_monitor, _system_logger
from .metaflow_environment import MetaflowEnvironment
from .package.mfenv import PackagedDistributionFinder
from .plugins import (
DATASTORES,
ENVIRONMENTS,
Expand Down Expand Up @@ -325,6 +327,11 @@ def start(
echo(" executing *%s*" % ctx.obj.flow.name, fg="magenta", nl=False)
echo(" for *%s*" % resolve_identity(), fg="magenta")

# Check if we need to setup the distribution finder (if running )
dist_info = read_included_dist_info()
if dist_info:
sys.meta_path.append(PackagedDistributionFinder(dist_info))

# Setup the context
cli_args._set_top_kwargs(ctx.params)
ctx.obj.echo = echo
Expand Down
4 changes: 2 additions & 2 deletions metaflow/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from metaflow.metaflow_environment import MetaflowEnvironment
from metaflow.package.mfenv import MFEnv
from metaflow.plugins import ENVIRONMENTS, METADATA_PROVIDERS
from metaflow.special_files import SpecialFile
from metaflow.meta_files import MetaFile
from metaflow.unbounded_foreach import CONTROL_TASK_TAG
from metaflow.util import cached_property, is_stringish, resolve_identity, to_unicode

Expand Down Expand Up @@ -825,7 +825,7 @@ def __init__(self, flow_name: str, code_package: str):
)
code_obj = BytesIO(blobdata)
self._tar = tarfile.open(fileobj=code_obj, mode="r:gz")
info_str = MFEnv.get_archive_content(self._tar, SpecialFile.INFO_FILE)
info_str = MFEnv.get_archive_content(self._tar, MetaFile.INFO_FILE)
self._info = json.loads(info_str)
self._flowspec = self._tar.extractfile(self._info["script"]).read()

Expand Down
2 changes: 1 addition & 1 deletion metaflow/extension_support/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from itertools import chain
from pathlib import Path

from metaflow.special_files import read_info_file
from metaflow.meta_files import read_info_file


#
Expand Down
58 changes: 58 additions & 0 deletions metaflow/meta_files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import json
import os

from enum import Enum

from .util import get_metaflow_root

_info_file_content = None
_info_file_present = None
_included_dist_info = None
_included_dist_present = None

# Ideally these would be in package/mfenv.py but that would cause imports to fail so
# moving here. The reason is that this is needed to read extension information which needs
# to happen before mfenv gets packaged.

MFENV_DIR = ".mfenv"


class MetaFile(Enum):
INFO_FILE = "INFO"
CONFIG_FILE = "CONFIG_PARAMETERS"
INCLUDED_DIST_INFO = "INCLUDED_DIST_INFO"


def read_info_file():

global _info_file_content
global _info_file_present
if _info_file_present is None:
file_path = os.path.join(get_metaflow_root(), MetaFile.INFO_FILE.value)
if os.path.exists(file_path):
with open(file_path, "r", encoding="utf-8") as f:
_info_file_content = json.load(f)
_info_file_present = True
else:
_info_file_present = False

if _info_file_present:
return _info_file_content
return None


def read_included_dist_info():
global _included_dist_info
global _included_dist_present
if _included_dist_present is None:
file_path = os.path.join(get_metaflow_root(), MetaFile.INCLUDED_DIST_INFO.value)
if os.path.exists(file_path):
with open(file_path, "r", encoding="utf-8") as f:
_included_dist_info = json.load(f)
_included_dist_present = True
else:
_included_dist_present = False

if _included_dist_present:
return _included_dist_info
return None
3 changes: 3 additions & 0 deletions metaflow/metaflow_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from metaflow.exception import MetaflowException
from metaflow.extension_support import dump_module_info
from metaflow.mflog import BASH_MFLOG, BASH_FLUSH_LOGS

from .meta_files import MFENV_DIR
from . import R


Expand Down Expand Up @@ -176,6 +178,7 @@ def get_package_commands(self, code_package_url, datastore_type):
"after 6 tries. Exiting...' && exit 1; "
"fi" % code_package_url,
"TAR_OPTIONS='--warning=no-timestamp' tar xf job.tar",
"export PYTHONPATH=`pwd`/%s:$PYTHONPATH" % MFENV_DIR,
"mflog 'Task is starting.'",
"flush_mflogs",
]
Expand Down
2 changes: 1 addition & 1 deletion metaflow/metaflow_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from os import path, name, environ, listdir

from metaflow.extension_support import update_package_info
from metaflow.special_files import read_info_file
from metaflow.meta_files import read_info_file


# True/False correspond to the value `public`` in get_version
Expand Down
16 changes: 8 additions & 8 deletions metaflow/package/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from ..metaflow_config import DEFAULT_PACKAGE_SUFFIXES
from ..exception import MetaflowException
from ..special_files import SpecialFile
from ..meta_files import MetaFile
from ..user_configs.config_parameters import dump_config_values
from .. import R

Expand Down Expand Up @@ -46,16 +46,16 @@ def __init__(self, flow, environment, echo, suffixes=DEFAULT_SUFFIXES_LIST):

self._code_env = MFEnv(lambda x: hasattr(x, "METAFLOW_PACKAGE"))

# Add special content
self._code_env.add_special_content(
SpecialFile.INFO_FILE,
# Add metacontent
self._code_env.add_meta_content(
MetaFile.INFO_FILE,
json.dumps(
self.environment.get_environment_info(include_ext_info=True)
).encode("utf-8"),
)

self._code_env.add_special_content(
SpecialFile.CONFIG_FILE,
self._code_env.add_meta_content(
MetaFile.CONFIG_FILE,
json.dumps(dump_config_values(self._flow)).encode("utf-8"),
)

Expand All @@ -68,7 +68,7 @@ def path_tuples(self):
# Package the environment
for path, arcname in self._code_env.files():
yield path, arcname
for _, arcname in self._code_env.contents():
for _, arcname in self._code_env.metacontents():
yield f"<generated>{arcname}", arcname

# Package the user code
Expand Down Expand Up @@ -139,7 +139,7 @@ def no_mtime(tarinfo):
# Package the environment
for path, arcname in self._code_env.files():
tar.add(path, arcname=arcname, recursive=False, filter=no_mtime)
for content, arcname in self._code_env.contents():
for content, arcname in self._code_env.metacontents():
self._add_file(tar, arcname, BytesIO(content))

# Package the user code
Expand Down
Loading

0 comments on commit 789e841

Please sign in to comment.