Skip to content

Commit 7e265ac

Browse files
authored
retrieve entire graph then clone (#1728)
add more print profile time and threads workers with clone only command include origin_ds_set and s3 batch write clean up pr for review remove wait-only flag because it is no longer used address comments fix OSS resume
1 parent 98f3f2a commit 7e265ac

File tree

4 files changed

+166
-55
lines changed

4 files changed

+166
-55
lines changed

metaflow/cli.py

+6-12
Original file line numberDiff line numberDiff line change
@@ -473,13 +473,6 @@ def echo_unicode(line, **kwargs):
473473
help="Pathspec of the origin task for this task to clone. Do "
474474
"not execute anything.",
475475
)
476-
@click.option(
477-
"--clone-wait-only/--no-clone-wait-only",
478-
default=False,
479-
show_default=True,
480-
help="If specified, waits for an external process to clone the task",
481-
hidden=True,
482-
)
483476
@click.option(
484477
"--clone-run-id",
485478
default=None,
@@ -519,7 +512,6 @@ def step(
519512
retry_count=None,
520513
max_user_code_retries=None,
521514
clone_only=None,
522-
clone_wait_only=False,
523515
clone_run_id=None,
524516
decospecs=None,
525517
ubf_context="none",
@@ -575,7 +567,6 @@ def step(
575567
task_id,
576568
clone_only,
577569
retry_count,
578-
wait_only=clone_wait_only,
579570
)
580571
else:
581572
task.run_step(
@@ -802,7 +793,10 @@ def resume(
802793
write_run_id(run_id_file, runtime.run_id)
803794
runtime.print_workflow_info()
804795
runtime.persist_constants()
805-
runtime.execute()
796+
if clone_only:
797+
runtime.clone_original_run()
798+
else:
799+
runtime.execute()
806800

807801

808802
@tracing.cli_entrypoint("cli/run")
@@ -830,7 +824,7 @@ def run(
830824
decospecs=None,
831825
run_id_file=None,
832826
user_namespace=None,
833-
**kwargs
827+
**kwargs,
834828
):
835829
if user_namespace is not None:
836830
namespace(user_namespace or None)
@@ -985,7 +979,7 @@ def start(
985979
pylint=None,
986980
event_logger=None,
987981
monitor=None,
988-
**deco_options
982+
**deco_options,
989983
):
990984
global echo
991985
if quiet:

metaflow/clone_util.py

+71
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
import time
2+
from .metadata import MetaDatum
3+
4+
5+
def clone_task_helper(
6+
flow_name,
7+
clone_run_id,
8+
run_id,
9+
step_name,
10+
clone_task_id,
11+
task_id,
12+
flow_datastore,
13+
metadata_service,
14+
origin_ds_set=None,
15+
attempt_id=0,
16+
):
17+
# 1. initialize output datastore
18+
output = flow_datastore.get_task_datastore(
19+
run_id, step_name, task_id, attempt=attempt_id, mode="w"
20+
)
21+
output.init_task()
22+
23+
origin_run_id, origin_step_name, origin_task_id = (
24+
clone_run_id,
25+
step_name,
26+
clone_task_id,
27+
)
28+
# 2. initialize origin datastore
29+
origin = None
30+
if origin_ds_set:
31+
origin = origin_ds_set.get_with_pathspec(
32+
"{}/{}/{}".format(origin_run_id, origin_step_name, origin_task_id)
33+
)
34+
else:
35+
origin = flow_datastore.get_task_datastore(
36+
origin_run_id, origin_step_name, origin_task_id
37+
)
38+
metadata_tags = ["attempt_id:{0}".format(attempt_id)]
39+
output.clone(origin)
40+
_ = metadata_service.register_task_id(
41+
run_id,
42+
step_name,
43+
task_id,
44+
attempt_id,
45+
)
46+
metadata_service.register_metadata(
47+
run_id,
48+
step_name,
49+
task_id,
50+
[
51+
MetaDatum(
52+
field="origin-task-id",
53+
value=str(origin_task_id),
54+
type="origin-task-id",
55+
tags=metadata_tags,
56+
),
57+
MetaDatum(
58+
field="origin-run-id",
59+
value=str(origin_run_id),
60+
type="origin-run-id",
61+
tags=metadata_tags,
62+
),
63+
MetaDatum(
64+
field="attempt",
65+
value=str(attempt_id),
66+
type="attempt",
67+
tags=metadata_tags,
68+
),
69+
],
70+
)
71+
output.done()

metaflow/runtime.py

+62-3
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from datetime import datetime
1414
from io import BytesIO
1515
from functools import partial
16+
from concurrent import futures
1617

1718
from metaflow.datastore.exceptions import DataException
1819

@@ -30,6 +31,7 @@
3031
from .decorators import flow_decorators
3132
from .mflog import mflog, RUNTIME_LOG_SOURCE
3233
from .util import to_unicode, compress_list, unicode_type
34+
from .clone_util import clone_task_helper
3335
from .unbounded_foreach import (
3436
CONTROL_TASK_TAG,
3537
UBF_CONTROL,
@@ -188,7 +190,7 @@ def _new_task(self, step, input_paths=None, **kwargs):
188190
decos=decos,
189191
logger=self._logger,
190192
resume_identifier=self._resume_identifier,
191-
**kwargs
193+
**kwargs,
192194
)
193195

194196
@property
@@ -234,6 +236,65 @@ def _should_skip_clone_only_execution(self):
234236
)
235237
return False, None
236238

239+
def clone_task(self, step_name, task_id):
240+
self._logger(
241+
"Cloning task from {}/{}/{}/{} to {}/{}/{}/{}".format(
242+
self._flow.name,
243+
self._clone_run_id,
244+
step_name,
245+
task_id,
246+
self._flow.name,
247+
self._run_id,
248+
step_name,
249+
task_id,
250+
),
251+
system_msg=True,
252+
)
253+
clone_task_helper(
254+
self._flow.name,
255+
self._clone_run_id,
256+
self._run_id,
257+
step_name,
258+
task_id, # origin_task_id
259+
task_id,
260+
self._flow_datastore,
261+
self._metadata,
262+
origin_ds_set=self._origin_ds_set,
263+
)
264+
265+
def clone_original_run(self):
266+
(
267+
should_skip_clone_only_execution,
268+
skip_reason,
269+
) = self._should_skip_clone_only_execution()
270+
if should_skip_clone_only_execution:
271+
self._logger(skip_reason, system_msg=True)
272+
return
273+
self._metadata.start_run_heartbeat(self._flow.name, self._run_id)
274+
self._logger(
275+
"Start cloning original run: {}/{}".format(
276+
self._flow.name, self._clone_run_id
277+
),
278+
system_msg=True,
279+
)
280+
281+
inputs = []
282+
283+
for task_ds in self._origin_ds_set:
284+
_, step_name, task_id = task_ds.pathspec.split("/")
285+
if task_ds["_task_ok"] and step_name != "_parameters":
286+
inputs.append((step_name, task_id))
287+
288+
with futures.ThreadPoolExecutor(max_workers=self._max_workers) as executor:
289+
all_tasks = [
290+
executor.submit(self.clone_task, step_name, task_id)
291+
for (step_name, task_id) in inputs
292+
]
293+
_, _ = futures.wait(all_tasks)
294+
self._logger("Cloning original run is done", system_msg=True)
295+
self._params_task.mark_resume_done()
296+
self._metadata.stop_heartbeat()
297+
237298
def execute(self):
238299
(
239300
should_skip_clone_only_execution,
@@ -1361,8 +1422,6 @@ def _launch(self):
13611422
# disabling sidecars for cloned tasks due to perf reasons
13621423
args.top_level_options["event-logger"] = "nullSidecarLogger"
13631424
args.top_level_options["monitor"] = "nullSidecarMonitor"
1364-
if self.task.should_skip_cloning:
1365-
args.command_options["clone-wait-only"] = True
13661425
else:
13671426
# decorators may modify the CLIArgs object in-place
13681427
for deco in self.task.decos:

metaflow/task.py

+27-40
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
)
2121
from .unbounded_foreach import UBF_CONTROL
2222
from .util import all_equal, get_username, resolve_identity, unicode_type
23+
from .clone_util import clone_task_helper
2324
from .metaflow_current import current
2425
from metaflow.tracing import get_trace_id
2526
from metaflow.util import namedtuple_with_defaults
@@ -284,57 +285,43 @@ def clone_only(
284285
task_id,
285286
clone_origin_task,
286287
retry_count,
287-
wait_only=False,
288288
):
289289
if not clone_origin_task:
290290
raise MetaflowInternalError(
291291
"task.clone_only needs a valid clone_origin_task value."
292292
)
293-
if wait_only:
294-
print("Not cloning anything in wait_only mode.")
295-
return
293+
origin_run_id, _, origin_task_id = clone_origin_task.split("/")
296294

295+
msg = {
296+
"task_id": task_id,
297+
"msg": "Cloning task from {}/{}/{}/{} to {}/{}/{}/{}".format(
298+
self.flow.name,
299+
origin_run_id,
300+
step_name,
301+
origin_task_id,
302+
self.flow.name,
303+
run_id,
304+
step_name,
305+
task_id,
306+
),
307+
"step_name": step_name,
308+
"run_id": run_id,
309+
"flow_name": self.flow.name,
310+
"ts": round(time.time()),
311+
}
312+
self.event_logger.log(msg)
297313
# If we actually have to do the clone ourselves, proceed...
298-
# 1. initialize output datastore
299-
output = self.flow_datastore.get_task_datastore(
300-
run_id, step_name, task_id, attempt=0, mode="w"
301-
)
302-
303-
output.init_task()
304-
305-
origin_run_id, origin_step_name, origin_task_id = clone_origin_task.split("/")
306-
# 2. initialize origin datastore
307-
origin = self.flow_datastore.get_task_datastore(
308-
origin_run_id, origin_step_name, origin_task_id
309-
)
310-
metadata_tags = ["attempt_id:{0}".format(retry_count)]
311-
output.clone(origin)
312-
self.metadata.register_metadata(
314+
clone_task_helper(
315+
self.flow.name,
316+
origin_run_id,
313317
run_id,
314318
step_name,
319+
origin_task_id,
315320
task_id,
316-
[
317-
MetaDatum(
318-
field="origin-task-id",
319-
value=str(origin_task_id),
320-
type="origin-task-id",
321-
tags=metadata_tags,
322-
),
323-
MetaDatum(
324-
field="origin-run-id",
325-
value=str(origin_run_id),
326-
type="origin-run-id",
327-
tags=metadata_tags,
328-
),
329-
MetaDatum(
330-
field="attempt",
331-
value=str(retry_count),
332-
type="attempt",
333-
tags=metadata_tags,
334-
),
335-
],
321+
self.flow_datastore,
322+
self.metadata,
323+
attempt_id=retry_count,
336324
)
337-
output.done()
338325

339326
def _finalize_control_task(self):
340327
# Update `_transition` which is expected by the NativeRuntime.

0 commit comments

Comments
 (0)