Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Working POC] Working commit for spin #2335

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 47 additions & 12 deletions metaflow/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ def logger(body="", system_msg=False, head="", bad=False, timestamp=True, nl=Tru
"step": "metaflow.cli_components.step_cmd.step",
"run": "metaflow.cli_components.run_cmds.run",
"resume": "metaflow.cli_components.run_cmds.resume",
"spin": "metaflow.cli_components.run_cmds.spin",
"spin-internal": "metaflow.cli_components.step_cmd.spin_internal",
},
)
def cli(ctx):
Expand Down Expand Up @@ -151,7 +153,7 @@ def show(obj):
echo_always("\n%s" % obj.graph.doc)
for node_name in obj.graph.sorted_nodes:
node = obj.graph[node_name]
echo_always("\nStep *%s*" % node.name, err=False)
echo_always("\nStep *%s* " % node.name, err=False)
echo_always(node.doc if node.doc else "?", indent=True, err=False)
if node.type != "end":
echo_always(
Expand Down Expand Up @@ -349,7 +351,6 @@ def start(
)

ctx.obj.datastore_impl.datastore_root = datastore_root

FlowDataStore.default_storage_impl = ctx.obj.datastore_impl

# At this point, we are able to resolve the user-configuration options so we can
Expand Down Expand Up @@ -439,14 +440,10 @@ def start(
ctx.obj.event_logger = LOGGING_SIDECARS[event_logger](
flow=ctx.obj.flow, env=ctx.obj.environment
)
ctx.obj.event_logger.start()
_system_logger.init_system_logger(ctx.obj.flow.name, ctx.obj.event_logger)

ctx.obj.monitor = MONITOR_SIDECARS[monitor](
flow=ctx.obj.flow, env=ctx.obj.environment
)
ctx.obj.monitor.start()
_system_monitor.init_system_monitor(ctx.obj.flow.name, ctx.obj.monitor)

ctx.obj.metadata = [m for m in METADATA_PROVIDERS if m.TYPE == metadata][0](
ctx.obj.environment, ctx.obj.flow, ctx.obj.event_logger, ctx.obj.monitor
Expand All @@ -461,6 +458,44 @@ def start(
)

ctx.obj.config_options = config_options
ctx.obj.is_spin = False

# Override values for spin
if hasattr(ctx, "saved_args") and ctx.saved_args and "spin" in ctx.saved_args[0]:
# For spin, we will only use the local metadata provider, datastore, environment
# and null event logger and monitor
ctx.obj.is_spin = True
ctx.obj.spin_metadata = [m for m in METADATA_PROVIDERS if m.TYPE == "local"][0](
ctx.obj.environment, ctx.obj.flow, ctx.obj.event_logger, ctx.obj.monitor
)
# ctx.obj.event_logger = LOGGING_SIDECARS["nullSidecarLogger"](
# flow=ctx.obj.flow, env=ctx.obj.environment
# )
# ctx.obj.monitor = MONITOR_SIDECARS["nullSidecarMonitor"](
# flow=ctx.obj.flow, env=ctx.obj.environment
# )
ctx.obj.spin_datastore_impl = [d for d in DATASTORES if d.TYPE == "local"][0]
# ctx.obj.spin_datastore_impl = [d for d in DATASTORES if d.TYPE == "s3"][0]
if datastore_root is None:
datastore_root = ctx.obj.spin_datastore_impl.get_datastore_root_from_config(
ctx.obj.echo
)
ctx.obj.spin_datastore_impl.datastore_root = datastore_root
ctx.obj.spin_flow_datastore = FlowDataStore(
ctx.obj.flow.name,
ctx.obj.environment, # Same environment as run/resume
ctx.obj.spin_metadata, # local metadata provider
ctx.obj.event_logger, # null event logger
ctx.obj.monitor, # null monitor
storage_impl=ctx.obj.spin_datastore_impl,
)

# Start event logger and monitor
ctx.obj.event_logger.start()
_system_logger.init_system_logger(ctx.obj.flow.name, ctx.obj.event_logger)

ctx.obj.monitor.start()
_system_monitor.init_system_monitor(ctx.obj.flow.name, ctx.obj.monitor)

decorators._init(ctx.obj.flow)

Expand All @@ -470,14 +505,14 @@ def start(
ctx.obj.flow,
ctx.obj.graph,
ctx.obj.environment,
ctx.obj.flow_datastore,
ctx.obj.metadata,
ctx.obj.flow_datastore if not ctx.obj.is_spin else ctx.obj.spin_flow_datastore,
ctx.obj.metadata if not ctx.obj.is_spin else ctx.obj.spin_metadata,
ctx.obj.logger,
echo,
deco_options,
)

# In the case of run/resume, we will want to apply the TL decospecs
# In the case of run/resume/spin, we will want to apply the TL decospecs
# *after* the run decospecs so that they don't take precedence. In other
# words, for the same decorator, we want `myflow.py run --with foo` to
# take precedence over any other `foo` decospec
Expand All @@ -493,7 +528,7 @@ def start(
parameters.set_parameter_context(
ctx.obj.flow.name,
ctx.obj.echo,
ctx.obj.flow_datastore,
ctx.obj.flow_datastore if not ctx.obj.is_spin else ctx.obj.spin_flow_datastore,
{
k: ConfigValue(v)
for k, v in ctx.obj.flow.__class__._flow_state.get(
Expand All @@ -505,9 +540,9 @@ def start(
if (
hasattr(ctx, "saved_args")
and ctx.saved_args
and ctx.saved_args[0] not in ("run", "resume")
and ctx.saved_args[0] not in ("run", "resume", "spin")
):
# run/resume are special cases because they can add more decorators with --with,
# run/resume/spin are special cases because they can add more decorators with --with,
# so they have to take care of themselves.
all_decospecs = ctx.obj.tl_decospecs + list(
ctx.obj.environment.decospecs() or []
Expand Down
176 changes: 157 additions & 19 deletions metaflow/cli_components/run_cmds.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,17 @@
from ..metaflow_current import current
from ..metaflow_config import DEFAULT_DECOSPECS
from ..package import MetaflowPackage
from ..runtime import NativeRuntime
from ..runtime import NativeRuntime, SpinRuntime
from ..system import _system_logger

from ..tagging_util import validate_tags
from ..util import get_latest_run_id, write_latest_run_id
from ..util import get_latest_run_id, write_latest_run_id, get_latest_task_pathspec


def before_run(obj, tags, decospecs):
validate_tags(tags)

# There's a --with option both at the top-level and for the run
# There's a --with option both at the top-level and for the run/resume/spin
# subcommand. Why?
#
# "run --with shoes" looks so much better than "--with shoes run".
Expand All @@ -40,7 +40,7 @@ def before_run(obj, tags, decospecs):
+ list(obj.environment.decospecs() or [])
)
if all_decospecs:
# These decospecs are the ones from run/resume PLUS the ones from the
# These decospecs are the ones from run/resume/spin PLUS the ones from the
# environment (for example the @conda)
decorators._attach_decorators(obj.flow, all_decospecs)
decorators._init(obj.flow)
Expand All @@ -52,7 +52,11 @@ def before_run(obj, tags, decospecs):
# obj.environment.init_environment(obj.logger)

decorators._init_step_decorators(
obj.flow, obj.graph, obj.environment, obj.flow_datastore, obj.logger
obj.flow,
obj.graph,
obj.environment,
obj.flow_datastore if not obj.is_spin else obj.spin_flow_datastore,
obj.logger,
)

obj.metadata.add_sticky_tags(tags=tags)
Expand Down Expand Up @@ -88,6 +92,29 @@ def config_merge_cb(ctx, param, value):
return tuple(list(value) + splits)


def common_runner_options(func):
@click.option(
"--run-id-file",
default=None,
show_default=True,
type=str,
help="Write the ID of this run to the file specified.",
)
@click.option(
"--runner-attribute-file",
default=None,
show_default=True,
type=str,
help="Write the metadata and pathspec of this run to the file specified. Used internally "
"for Metaflow's Runner API.",
)
@wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)

return wrapper


def common_run_options(func):
@click.option(
"--tag",
Expand Down Expand Up @@ -129,20 +156,6 @@ def common_run_options(func):
"in steps.",
callback=config_merge_cb,
)
@click.option(
"--run-id-file",
default=None,
show_default=True,
type=str,
help="Write the ID of this run to the file specified.",
)
@click.option(
"--runner-attribute-file",
default=None,
show_default=True,
type=str,
help="Write the metadata and pathspec of this run to the file specified. Used internally for Metaflow's Runner API.",
)
@wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
Expand Down Expand Up @@ -187,6 +200,7 @@ def wrapper(*args, **kwargs):
@click.command(help="Resume execution of a previous run of this flow.")
@tracing.cli("cli/resume")
@common_run_options
@common_runner_options
@click.pass_obj
def resume(
obj,
Expand Down Expand Up @@ -305,6 +319,7 @@ def resume(
@click.command(help="Run the workflow locally.")
@tracing.cli("cli/run")
@common_run_options
@common_runner_options
@click.option(
"--namespace",
"user_namespace",
Expand Down Expand Up @@ -380,3 +395,126 @@ def run(
)
with runtime.run_heartbeat():
runtime.execute()


@click.command(help="Spins up a task for a given step from a previous run locally.")
@click.option(
"--step-name",
default=None,
show_default=True,
help="Step name to spin up. Must provide either step-name or spin-pathspec.",
)
@click.option(
"--spin-pathspec",
default=None,
show_default=True,
help="Task ID to use when spinning up the step. The spun step will use the artifacts"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

task id -> pathspec?

"corresponding to this task ID. If not provided, an arbitrary task ID from the latest "
"run will be used.",
)
@click.option(
"--skip-decorators/--no-skip-decorators",
is_flag=True,
default=False,
show_default=True,
help="Skip decorators attached to the step.",
)
@click.option(
"--artifacts-module",
default=None,
show_default=True,
help="Path to a module that contains artifacts to be used in the spun step. The artifacts should "
"be defined as a dictionary called ARTIFACTS with keys as the artifact names and values as the "
"artifact values. The artifact values will overwrite the default values of the artifacts used in "
"the spun step.",
)
@click.option(
"--max-log-size",
default=10,
show_default=True,
help="Maximum size of stdout and stderr captured in "
"megabytes. If a step outputs more than this to "
"stdout/stderr, its output will be truncated.",
)
@common_runner_options
@click.pass_obj
def spin(
obj,
step_name=None,
spin_pathspec=None,
artifacts_module=None,
skip_decorators=False,
max_log_size=None,
run_id_file=None,
runner_attribute_file=None,
**kwargs
):
before_run(obj, [], [])
if step_name and spin_pathspec:
raise CommandException(
"Cannot specify both step-name and spin-pathspec. Please specify only one."
)
if not (step_name or spin_pathspec):
raise CommandException(
"Please specify either step-name or spin-pathspec to spin a task."
)
if step_name is not None:
spin_pathspec = get_latest_task_pathspec(obj.flow.name, step_name)
else:
step_name = spin_pathspec.split("/")[2]

obj.echo(
f"Spinning up step *{step_name}* locally using previous task pathspec *{spin_pathspec}*"
)
obj.flow._set_constants(obj.graph, kwargs, obj.config_options)
step_func = getattr(obj.flow, step_name)

spin_runtime = SpinRuntime(
obj.flow,
obj.graph,
obj.flow_datastore,
obj.metadata,
obj.environment,
obj.package,
obj.logger,
obj.entrypoint,
obj.event_logger,
obj.monitor,
obj.spin_metadata,
obj.spin_flow_datastore,
step_func,
spin_pathspec,
skip_decorators,
artifacts_module,
max_log_size * 1024 * 1024,
)
_system_logger.log_event(
level="info",
module="metaflow.task",
name="spin",
payload={
"msg": str(
{
"step_name": step_name,
"task_pathspec": spin_pathspec,
}
)
},
)

write_latest_run_id(obj, spin_runtime.run_id)
write_file(run_id_file, spin_runtime.run_id)
spin_runtime.execute()

if runner_attribute_file:
with open(runner_attribute_file, "w") as f:
json.dump(
{
"task_id": spin_runtime.task.task_id,
"step_name": step_name,
"run_id": spin_runtime.run_id,
"flow_name": obj.flow.name,
"metadata": f"{obj.spin_metadata.__class__.TYPE}@{obj.spin_metadata.__class__.INFO}",
},
f,
)
Loading
Loading