Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 137 additions & 0 deletions metaflow/cmd/debug_cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import json
import os

from metaflow._vendor import click

METAFLOW_ATTACH_CONFIG = {
"name": "Metaflow: Attach",
"type": "debugpy",
"request": "attach",
"connect": {"host": "localhost", "port": 5678},
"justMyCode": True,
"autoAttachChildProcesses": True,
}


@click.group()
def cli():
pass


@cli.group(help="Commands related to debugging Metaflow flows.")
def debug():
pass


@debug.group(help="VSCode debugger integration.")
def vscode():
pass


@vscode.command(
"install-config",
help="Install VSCode debug configuration for attaching to Metaflow tasks.",
)
@click.option(
"--base-port",
default=5678,
type=int,
show_default=True,
help="Port number for the debugpy attach configuration.",
)
@click.option(
"--dir",
"target_dir",
default=".",
type=click.Path(),
help="Workspace root directory where .vscode/ will be created.",
)
@click.option(
"--overwrite",
is_flag=True,
default=False,
help="Overwrite existing launch.json instead of merging.",
)
@click.option(
"--remote-root",
default=None,
type=str,
help="Remote container root (e.g. /root/metaflow). Adds pathMappings for remote debugging.",
)
def install_config(base_port, target_dir, overwrite, remote_root):
target_dir = os.path.abspath(target_dir)
vscode_dir = os.path.join(target_dir, ".vscode")
launch_path = os.path.join(vscode_dir, "launch.json")

our_config = dict(METAFLOW_ATTACH_CONFIG)
our_config["connect"] = {"host": "localhost", "port": base_port}

if remote_root is not None:
import metaflow

# Parent dir containing the metaflow package (e.g. site-packages or repo root).
local_metaflow_src = os.path.dirname(os.path.dirname(metaflow.__file__))
our_config["pathMappings"] = [
{
"localRoot": "${workspaceFolder}",
"remoteRoot": remote_root,
},
{
"localRoot": local_metaflow_src,
"remoteRoot": remote_root + "/.mf_code",
},
]

if not os.path.isdir(vscode_dir):
os.makedirs(vscode_dir)

if os.path.exists(launch_path) and not overwrite:
with open(launch_path, "r") as f:
try:
existing = json.load(f)
except json.JSONDecodeError:
click.echo(
"Warning: existing launch.json is not valid JSON. "
"Use --overwrite to replace it.",
err=True,
)
return

configs = existing.get("configurations", [])

# Check if our config already exists
for i, cfg in enumerate(configs):
if cfg.get("name") == "Metaflow: Attach":
if cfg != our_config:
configs[i] = our_config
existing["configurations"] = configs
with open(launch_path, "w") as f:
json.dump(existing, f, indent=4)
f.write("\n")
click.echo("Updated 'Metaflow: Attach' config in %s" % launch_path)
else:
click.echo(
"'Metaflow: Attach' config already up to date in %s"
% launch_path
)
return

# Merge: append our config
configs.append(our_config)
existing["configurations"] = configs
with open(launch_path, "w") as f:
json.dump(existing, f, indent=4)
f.write("\n")
click.echo("Added 'Metaflow: Attach' config to existing %s" % launch_path)
else:
launch_json = {
"version": "0.2.0",
"configurations": [our_config],
}
with open(launch_path, "w") as f:
json.dump(launch_json, f, indent=4)
f.write("\n")
click.echo(
"Created %s with 'Metaflow: Attach' config (port %d)"
% (launch_path, base_port)
)
1 change: 1 addition & 0 deletions metaflow/cmd/main_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def status():
("tutorials", ".tutorials_cmd.cli"),
("develop", ".develop.cli"),
("code", ".code.cli"),
("debug", ".debug_cli.cli"),
]

process_cmds(globals())
Expand Down
78 changes: 55 additions & 23 deletions metaflow/extension_support/cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def process_cmds(module_globals):
# override metaflow core)
for name, class_path in _all_cmds:
_ext_debug(" Adding command '%s' from '%s'" % (name, class_path))
_all_cmds_dict[name] = class_path
_all_cmds_dict.setdefault(name, []).append(class_path)

# Resolve the ENABLED_CMD variable. The rules are the following:
# - if ENABLED_CMD is non None, it means it was either set directly by the user
Expand Down Expand Up @@ -83,33 +83,65 @@ def resolve_cmds():
to_return = []

for name in set_of_commands:
class_path = _all_cmds_dict.get(name, None)
if class_path is None:
class_paths = _all_cmds_dict.get(name, None)
if class_paths is None:
raise ValueError(
"Configuration requested command '%s' but no such command is available"
% name
)
path, cls_name = class_path.rsplit(".", 1)
try:
cmd_module = importlib.import_module(path)
except ImportError:
raise ValueError("Cannot locate command '%s' at '%s'" % (name, path))

cls = getattr(cmd_module, cls_name, None)
if cls is None:
raise ValueError(
"Cannot locate '%s' class for command at '%s'" % (cls_name, path)
)
all_cmds = list(cls.commands)
if len(all_cmds) > 1:
raise ValueError("%s defines more than one command -- use a group" % path)
if all_cmds[0] != name:
raise ValueError(
"%s: expected name to be '%s' but got '%s' instead"
% (path, name, all_cmds[0])

def _load_cmd_cls(class_path, name):
path, cls_name = class_path.rsplit(".", 1)
try:
cmd_module = importlib.import_module(path)
except ImportError:
raise ValueError("Cannot locate command '%s' at '%s'" % (name, path))
cls = getattr(cmd_module, cls_name, None)
if cls is None:
raise ValueError(
"Cannot locate '%s' class for command at '%s'" % (cls_name, path)
)
all_cmds = list(cls.commands)
if len(all_cmds) > 1:
raise ValueError(
"%s defines more than one command -- use a group" % path
)
if all_cmds[0] != name:
raise ValueError(
"%s: expected name to be '%s' but got '%s' instead"
% (path, name, all_cmds[0])
)
return cls
Comment on lines +93 to +114
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inner function defined inside loop on every iteration

_load_cmd_cls is defined anew on every iteration of for name in set_of_commands:. While this doesn't cause a bug here (it doesn't close over the loop variable), it unnecessarily creates a new function object each time and makes the structure harder to follow.

Consider hoisting _load_cmd_cls to module level or at least to the top of resolve_cmds() so it's defined once.


if len(class_paths) == 1:
cls = _load_cmd_cls(class_paths[0], name)
to_return.append(cls)
_ext_debug(" Added command '%s' from '%s'" % (name, class_paths[0]))
else:
# Multiple providers for the same command name — merge subcommands.
# The last entry (extension) is the base; earlier entries contribute
# subcommands that don't collide with the base.
# This is effectively overriding anything in the previous extensions
# with later extensions.
base_cls = _load_cmd_cls(class_paths[-1], name)
base_group = base_cls.commands[name]

for earlier_path in class_paths[:-1]:
earlier_cls = _load_cmd_cls(earlier_path, name)
earlier_group = earlier_cls.commands[name]
for cmd_name, cmd in earlier_group.commands.items():
if cmd_name not in base_group.commands:
base_group.add_command(cmd, cmd_name)
_ext_debug(
" Merged subcommand '%s' into '%s' from '%s'"
% (cmd_name, name, earlier_path)
)

to_return.append(base_cls)
_ext_debug(
" Added merged command '%s' (base from '%s', %d providers)"
% (name, class_paths[-1], len(class_paths))
)
to_return.append(cls)
_ext_debug(" Added command '%s' from '%s'" % (name, class_path))

return to_return

Expand Down
1 change: 1 addition & 0 deletions metaflow/plugins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
("airflow_internal", ".airflow.airflow_decorator.AirflowInternalDecorator"),
("pypi", ".pypi.pypi_decorator.PyPIStepDecorator"),
("conda", ".pypi.conda_decorator.CondaStepDecorator"),
("debugger", ".debugger_step_decorator.DebuggerStepDecorator"),
]

# Add new flow decorators here
Expand Down
5 changes: 5 additions & 0 deletions metaflow/plugins/aws/batch/batch_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,11 @@ def echo(msg, stream="stderr", batch_id=None, **kwargs):
if split_vars:
env.update(split_vars)

# Forward debugger env vars to the remote container.
for key, val in os.environ.items():
if key.startswith("METAFLOW_DEBUGPY_"):
env[key] = val

if retry_count:
ctx.obj.echo_always(
"Sleeping %d minutes before the next AWS Batch retry"
Expand Down
Loading
Loading