Skip to content

Commit 8cd344c

Browse files
Tests for runner (#1859)
* synchronous run with resume and async resume * add nbrun * remove runtime error if not in jupyter env * fix env vars update * suggested changes to nbrun * tests using lower level chaining API * use higher level API * add runner import inside try * set the correct metadata while getting the Run object * pass USER env in tox * reset old env vars when returning a Run object * change name of metadata_pathspec_file to runner_attribute_file * add comments for why reset of env and metadata is needed * minor fix * Update test.yml * formatting * fix --------- Co-authored-by: Madhur Tandon <[email protected]>
1 parent 125a018 commit 8cd344c

9 files changed

+272
-152
lines changed

metaflow/cli.py

+36-36
Original file line numberDiff line numberDiff line change
@@ -3,39 +3,16 @@
33
import traceback
44
from datetime import datetime
55
from functools import wraps
6-
import metaflow.tracing as tracing
76

7+
import metaflow.tracing as tracing
88
from metaflow._vendor import click
99

10-
from . import lint
11-
from . import plugins
12-
from . import parameters
13-
from . import decorators
14-
from . import metaflow_version
15-
from . import namespace
16-
from .metaflow_current import current
10+
from . import decorators, lint, metaflow_version, namespace, parameters, plugins
1711
from .cli_args import cli_args
18-
from .tagging_util import validate_tags
19-
from .util import (
20-
resolve_identity,
21-
decompress_list,
22-
write_latest_run_id,
23-
get_latest_run_id,
24-
)
25-
from .task import MetaflowTask
12+
from .client.core import get_metadata
13+
from .datastore import FlowDataStore, TaskDataStore, TaskDataStoreSet
2614
from .exception import CommandException, MetaflowException
2715
from .graph import FlowGraph
28-
from .datastore import FlowDataStore, TaskDataStoreSet, TaskDataStore
29-
30-
from .runtime import NativeRuntime
31-
from .package import MetaflowPackage
32-
from .plugins import (
33-
DATASTORES,
34-
ENVIRONMENTS,
35-
LOGGING_SIDECARS,
36-
METADATA_PROVIDERS,
37-
MONITOR_SIDECARS,
38-
)
3916
from .metaflow_config import (
4017
DEFAULT_DATASTORE,
4118
DEFAULT_ENVIRONMENT,
@@ -44,12 +21,29 @@
4421
DEFAULT_MONITOR,
4522
DEFAULT_PACKAGE_SUFFIXES,
4623
)
24+
from .metaflow_current import current
4725
from .metaflow_environment import MetaflowEnvironment
26+
from .mflog import LOG_SOURCES, mflog
27+
from .package import MetaflowPackage
28+
from .plugins import (
29+
DATASTORES,
30+
ENVIRONMENTS,
31+
LOGGING_SIDECARS,
32+
METADATA_PROVIDERS,
33+
MONITOR_SIDECARS,
34+
)
4835
from .pylint_wrapper import PyLint
49-
from .R import use_r, metaflow_r_version
50-
from .mflog import mflog, LOG_SOURCES
36+
from .R import metaflow_r_version, use_r
37+
from .runtime import NativeRuntime
38+
from .tagging_util import validate_tags
39+
from .task import MetaflowTask
5140
from .unbounded_foreach import UBF_CONTROL, UBF_TASK
52-
41+
from .util import (
42+
decompress_list,
43+
get_latest_run_id,
44+
resolve_identity,
45+
write_latest_run_id,
46+
)
5347

5448
ERASE_TO_EOL = "\033[K"
5549
HIGHLIGHT = "red"
@@ -558,11 +552,11 @@ def common_run_options(func):
558552
help="Write the ID of this run to the file specified.",
559553
)
560554
@click.option(
561-
"--pathspec-file",
555+
"--runner-attribute-file",
562556
default=None,
563557
show_default=True,
564558
type=str,
565-
help="Write the pathspec of this run to the file specified.",
559+
help="Write the metadata and pathspec of this run to the file specified. Used internally for Metaflow's Runner API.",
566560
)
567561
@wraps(func)
568562
def wrapper(*args, **kwargs):
@@ -622,7 +616,7 @@ def resume(
622616
decospecs=None,
623617
run_id_file=None,
624618
resume_identifier=None,
625-
pathspec_file=None,
619+
runner_attribute_file=None,
626620
):
627621
before_run(obj, tags, decospecs + obj.environment.decospecs())
628622

@@ -679,10 +673,13 @@ def resume(
679673
resume_identifier=resume_identifier,
680674
)
681675
write_file(run_id_file, runtime.run_id)
682-
write_file(pathspec_file, "/".join((obj.flow.name, runtime.run_id)))
683676
runtime.print_workflow_info()
684677

685678
runtime.persist_constants()
679+
write_file(
680+
runner_attribute_file,
681+
"%s:%s" % (get_metadata(), "/".join((obj.flow.name, runtime.run_id))),
682+
)
686683
if clone_only:
687684
runtime.clone_original_run()
688685
else:
@@ -713,7 +710,7 @@ def run(
713710
max_log_size=None,
714711
decospecs=None,
715712
run_id_file=None,
716-
pathspec_file=None,
713+
runner_attribute_file=None,
717714
user_namespace=None,
718715
**kwargs
719716
):
@@ -738,11 +735,14 @@ def run(
738735
)
739736
write_latest_run_id(obj, runtime.run_id)
740737
write_file(run_id_file, runtime.run_id)
741-
write_file(pathspec_file, "/".join((obj.flow.name, runtime.run_id)))
742738

743739
obj.flow._set_constants(obj.graph, kwargs)
744740
runtime.print_workflow_info()
745741
runtime.persist_constants()
742+
write_file(
743+
runner_attribute_file,
744+
"%s:%s" % (get_metadata(), "/".join((obj.flow.name, runtime.run_id))),
745+
)
746746
runtime.execute()
747747

748748

metaflow/runner/click_api.py

+18-22
Original file line numberDiff line numberDiff line change
@@ -7,38 +7,34 @@
77
"""
88
)
99

10-
import inspect
10+
import datetime
1111
import importlib
12+
import inspect
1213
import itertools
14+
import uuid
1315
from collections import OrderedDict
14-
import uuid, datetime
15-
from typing import (
16-
Optional,
17-
List,
18-
OrderedDict as TOrderedDict,
19-
Any,
20-
Union,
21-
Dict,
22-
Callable,
23-
)
16+
from typing import Any, Callable, Dict, List, Optional
17+
from typing import OrderedDict as TOrderedDict
18+
from typing import Union
19+
2420
from metaflow import FlowSpec, Parameter
25-
from metaflow.cli import start
2621
from metaflow._vendor import click
27-
from metaflow.parameters import JSONTypeClass
28-
from metaflow.includefile import FilePathClass
29-
from metaflow._vendor.typeguard import check_type, TypeCheckError
3022
from metaflow._vendor.click.types import (
31-
StringParamType,
32-
IntParamType,
33-
FloatParamType,
3423
BoolParamType,
35-
UUIDParameterType,
36-
Path,
37-
DateTime,
38-
Tuple,
3924
Choice,
25+
DateTime,
4026
File,
27+
FloatParamType,
28+
IntParamType,
29+
Path,
30+
StringParamType,
31+
Tuple,
32+
UUIDParameterType,
4133
)
34+
from metaflow._vendor.typeguard import TypeCheckError, check_type
35+
from metaflow.cli import start
36+
from metaflow.includefile import FilePathClass
37+
from metaflow.parameters import JSONTypeClass
4238

4339
click_to_python_types = {
4440
StringParamType: str,

metaflow/runner/metaflow_runner.py

+50-20
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,17 @@
11
import os
22
import sys
3-
import time
43
import tempfile
4+
import time
55
from typing import Dict, Iterator, Optional, Tuple
6-
from metaflow import Run
7-
from .subprocess_manager import SubprocessManager, CommandManager
6+
7+
from metaflow import Run, metadata
8+
9+
from .subprocess_manager import CommandManager, SubprocessManager
10+
11+
12+
def clear_and_set_os_environ(env: Dict):
13+
os.environ.clear()
14+
os.environ.update(env)
815

916

1017
def read_from_file_when_ready(file_path: str, timeout: float = 5):
@@ -227,7 +234,8 @@ def __init__(
227234
from metaflow.runner.click_api import MetaflowAPI
228235

229236
self.flow_file = flow_file
230-
self.env_vars = os.environ.copy()
237+
self.old_env = os.environ.copy()
238+
self.env_vars = self.old_env.copy()
231239
self.env_vars.update(env or {})
232240
if profile:
233241
self.env_vars["METAFLOW_PROFILE"] = profile
@@ -241,9 +249,21 @@ def __enter__(self) -> "Runner":
241249
async def __aenter__(self) -> "Runner":
242250
return self
243251

244-
def __get_executing_run(self, tfp_pathspec, command_obj):
252+
def __get_executing_run(self, tfp_runner_attribute, command_obj):
253+
# When two 'Runner' executions are done sequentially i.e. one after the other
254+
# the 2nd run kinda uses the 1st run's previously set metadata and
255+
# environment variables.
256+
257+
# It is thus necessary to set them to correct values before we return
258+
# the Run object.
245259
try:
246-
pathspec = read_from_file_when_ready(tfp_pathspec.name, timeout=10)
260+
# Set the environment variables to what they were before the run executed.
261+
clear_and_set_os_environ(self.old_env)
262+
263+
# Set the correct metadata from the runner_attribute file corresponding to this run.
264+
content = read_from_file_when_ready(tfp_runner_attribute.name, timeout=10)
265+
metadata_for_flow, pathspec = content.split(":", maxsplit=1)
266+
metadata(metadata_for_flow)
247267
run_object = Run(pathspec, _namespace_check=False)
248268
return ExecutingRun(self, command_obj, run_object)
249269
except TimeoutError as e:
@@ -280,17 +300,19 @@ def run(self, show_output: bool = False, **kwargs) -> ExecutingRun:
280300
ExecutingRun object for this run.
281301
"""
282302
with tempfile.TemporaryDirectory() as temp_dir:
283-
tfp_pathspec = tempfile.NamedTemporaryFile(dir=temp_dir, delete=False)
303+
tfp_runner_attribute = tempfile.NamedTemporaryFile(
304+
dir=temp_dir, delete=False
305+
)
284306
command = self.api(**self.top_level_kwargs).run(
285-
pathspec_file=tfp_pathspec.name, **kwargs
307+
runner_attribute_file=tfp_runner_attribute.name, **kwargs
286308
)
287309

288310
pid = self.spm.run_command(
289311
[sys.executable, *command], env=self.env_vars, show_output=show_output
290312
)
291313
command_obj = self.spm.get(pid)
292314

293-
return self.__get_executing_run(tfp_pathspec, command_obj)
315+
return self.__get_executing_run(tfp_runner_attribute, command_obj)
294316

295317
def resume(self, show_output: bool = False, **kwargs):
296318
"""
@@ -315,17 +337,19 @@ def resume(self, show_output: bool = False, **kwargs):
315337
ExecutingRun object for this resumed run.
316338
"""
317339
with tempfile.TemporaryDirectory() as temp_dir:
318-
tfp_pathspec = tempfile.NamedTemporaryFile(dir=temp_dir, delete=False)
340+
tfp_runner_attribute = tempfile.NamedTemporaryFile(
341+
dir=temp_dir, delete=False
342+
)
319343
command = self.api(**self.top_level_kwargs).resume(
320-
pathspec_file=tfp_pathspec.name, **kwargs
344+
runner_attribute_file=tfp_runner_attribute.name, **kwargs
321345
)
322346

323347
pid = self.spm.run_command(
324348
[sys.executable, *command], env=self.env_vars, show_output=show_output
325349
)
326350
command_obj = self.spm.get(pid)
327351

328-
return self.__get_executing_run(tfp_pathspec, command_obj)
352+
return self.__get_executing_run(tfp_runner_attribute, command_obj)
329353

330354
async def async_run(self, **kwargs) -> ExecutingRun:
331355
"""
@@ -344,17 +368,20 @@ async def async_run(self, **kwargs) -> ExecutingRun:
344368
ExecutingRun object for this run.
345369
"""
346370
with tempfile.TemporaryDirectory() as temp_dir:
347-
tfp_pathspec = tempfile.NamedTemporaryFile(dir=temp_dir, delete=False)
371+
tfp_runner_attribute = tempfile.NamedTemporaryFile(
372+
dir=temp_dir, delete=False
373+
)
348374
command = self.api(**self.top_level_kwargs).run(
349-
pathspec_file=tfp_pathspec.name, **kwargs
375+
runner_attribute_file=tfp_runner_attribute.name, **kwargs
350376
)
351377

352378
pid = await self.spm.async_run_command(
353-
[sys.executable, *command], env=self.env_vars
379+
[sys.executable, *command],
380+
env=self.env_vars,
354381
)
355382
command_obj = self.spm.get(pid)
356383

357-
return self.__get_executing_run(tfp_pathspec, command_obj)
384+
return self.__get_executing_run(tfp_runner_attribute, command_obj)
358385

359386
async def async_resume(self, **kwargs):
360387
"""
@@ -373,17 +400,20 @@ async def async_resume(self, **kwargs):
373400
ExecutingRun object for this resumed run.
374401
"""
375402
with tempfile.TemporaryDirectory() as temp_dir:
376-
tfp_pathspec = tempfile.NamedTemporaryFile(dir=temp_dir, delete=False)
403+
tfp_runner_attribute = tempfile.NamedTemporaryFile(
404+
dir=temp_dir, delete=False
405+
)
377406
command = self.api(**self.top_level_kwargs).resume(
378-
pathspec_file=tfp_pathspec.name, **kwargs
407+
runner_attribute_file=tfp_runner_attribute.name, **kwargs
379408
)
380409

381410
pid = await self.spm.async_run_command(
382-
[sys.executable, *command], env=self.env_vars
411+
[sys.executable, *command],
412+
env=self.env_vars,
383413
)
384414
command_obj = self.spm.get(pid)
385415

386-
return self.__get_executing_run(tfp_pathspec, command_obj)
416+
return self.__get_executing_run(tfp_runner_attribute, command_obj)
387417

388418
def __exit__(self, exc_type, exc_value, traceback):
389419
self.spm.cleanup()

metaflow/runner/nbrun.py

+5-4
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1-
import os
21
import ast
2+
import os
33
import shutil
44
import tempfile
5-
from typing import Optional, Dict
5+
from typing import Dict, Optional
6+
67
from metaflow import Runner
78

89
try:
@@ -50,7 +51,7 @@ def __init__(
5051
profile: Optional[str] = None,
5152
env: Optional[Dict] = None,
5253
base_dir: Optional[str] = None,
53-
**kwargs
54+
**kwargs,
5455
):
5556
self.cell = get_current_cell()
5657
self.flow = flow
@@ -88,7 +89,7 @@ def __init__(
8889
flow_file=self.tmp_flow_file.name,
8990
profile=profile,
9091
env=self.env_vars,
91-
**kwargs
92+
**kwargs,
9293
)
9394

9495
def nbrun(self, **kwargs):

metaflow/runner/subprocess_manager.py

+6-6
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
1+
import asyncio
12
import os
2-
import sys
3-
import time
4-
import signal
53
import shutil
6-
import asyncio
4+
import signal
5+
import subprocess
6+
import sys
77
import tempfile
88
import threading
9-
import subprocess
10-
from typing import List, Dict, Optional, Callable, Iterator, Tuple
9+
import time
10+
from typing import Callable, Dict, Iterator, List, Optional, Tuple
1111

1212

1313
def kill_process_and_descendants(pid, termination_timeout):

0 commit comments

Comments
 (0)