Skip to content

Commit 9f028e7

Browse files
authored
Fix trigger on finish bugs, and refactor code (#2218)
1 parent e2b93da commit 9f028e7

File tree

1 file changed

+79
-142
lines changed

1 file changed

+79
-142
lines changed

metaflow/plugins/events_decorator.py

+79-142
Original file line numberDiff line numberDiff line change
@@ -398,111 +398,23 @@ def flow_init(
398398
)
399399
elif self.attributes["flow"]:
400400
# flow supports the format @trigger_on_finish(flow='FooFlow')
401-
if is_stringish(self.attributes["flow"]):
402-
self.triggers.append(
403-
{
404-
"fq_name": self.attributes["flow"],
405-
}
406-
)
407-
elif isinstance(self.attributes["flow"], dict):
408-
if "name" not in self.attributes["flow"]:
409-
raise MetaflowException(
410-
"The *flow* attribute for *@trigger_on_finish* is missing the "
411-
"*name* key."
412-
)
413-
flow_name = self.attributes["flow"]["name"]
414-
415-
if not is_stringish(flow_name) or "." in flow_name:
416-
raise MetaflowException(
417-
"The *name* attribute of the *flow* is not a valid string"
418-
)
419-
result = {"fq_name": flow_name}
420-
if "project" in self.attributes["flow"]:
421-
if is_stringish(self.attributes["flow"]["project"]):
422-
result["project"] = self.attributes["flow"]["project"]
423-
else:
424-
raise MetaflowException(
425-
"The *project* attribute of the *flow* is not a string"
426-
)
427-
if "project_branch" in self.attributes["flow"]:
428-
if is_stringish(self.attributes["flow"]["project_branch"]):
429-
result["branch"] = self.attributes["flow"]["project_branch"]
430-
else:
431-
raise MetaflowException(
432-
"The *project_branch* attribute of the *flow* is not a string"
433-
)
434-
self.triggers.append(result)
435-
elif callable(self.attributes["flow"]) and not isinstance(
401+
flow = self.attributes["flow"]
402+
if callable(flow) and not isinstance(
436403
self.attributes["flow"], DeployTimeField
437404
):
438-
trig = DeployTimeField(
439-
"fq_name", [str, dict], None, self.attributes["flow"], False
440-
)
405+
trig = DeployTimeField("fq_name", [str, dict], None, flow, False)
441406
self.triggers.append(trig)
442407
else:
443-
raise MetaflowException(
444-
"Incorrect type for *flow* attribute in *@trigger_on_finish* "
445-
" decorator. Supported type is string or Dict[str, str] - \n"
446-
"@trigger_on_finish(flow='FooFlow') or "
447-
"@trigger_on_finish(flow={'name':'FooFlow', 'project_branch': 'branch'})"
448-
)
408+
self.triggers.extend(self._parse_static_triggers([flow]))
449409
elif self.attributes["flows"]:
450410
# flows attribute supports the following formats -
451411
# 1. flows=['FooFlow', 'BarFlow']
452-
if isinstance(self.attributes["flows"], list):
453-
for flow in self.attributes["flows"]:
454-
if is_stringish(flow):
455-
self.triggers.append(
456-
{
457-
"fq_name": flow,
458-
}
459-
)
460-
elif isinstance(flow, dict):
461-
if "name" not in flow:
462-
raise MetaflowException(
463-
"One or more flows in the *flows* attribute for "
464-
"*@trigger_on_finish* is missing the "
465-
"*name* key."
466-
)
467-
flow_name = flow["name"]
468-
469-
if not is_stringish(flow_name) or "." in flow_name:
470-
raise MetaflowException(
471-
"The *name* attribute '%s' is not a valid string"
472-
% str(flow_name)
473-
)
474-
result = {"fq_name": flow_name}
475-
if "project" in flow:
476-
if is_stringish(flow["project"]):
477-
result["project"] = flow["project"]
478-
else:
479-
raise MetaflowException(
480-
"The *project* attribute of the *flow* '%s' is not "
481-
"a string" % flow_name
482-
)
483-
if "project_branch" in flow:
484-
if is_stringish(flow["project_branch"]):
485-
result["branch"] = flow["project_branch"]
486-
else:
487-
raise MetaflowException(
488-
"The *project_branch* attribute of the *flow* %s "
489-
"is not a string" % flow_name
490-
)
491-
self.triggers.append(result)
492-
else:
493-
raise MetaflowException(
494-
"One or more flows in *flows* attribute in "
495-
"*@trigger_on_finish* decorator have an incorrect type. "
496-
"Supported type is string or Dict[str, str]- \n"
497-
"@trigger_on_finish(flows=['FooFlow', 'BarFlow']"
498-
)
499-
elif callable(self.attributes["flows"]) and not isinstance(
500-
self.attributes["flows"], DeployTimeField
501-
):
502-
trig = DeployTimeField(
503-
"flows", list, None, self.attributes["flows"], False
504-
)
412+
flows = self.attributes["flows"]
413+
if callable(flows) and not isinstance(flows, DeployTimeField):
414+
trig = DeployTimeField("flows", list, None, flows, False)
505415
self.triggers.append(trig)
416+
elif isinstance(flows, list):
417+
self.triggers.extend(self._parse_static_triggers(flows))
506418
else:
507419
raise MetaflowException(
508420
"Incorrect type for *flows* attribute in *@trigger_on_finish* "
@@ -519,26 +431,7 @@ def flow_init(
519431
for trigger in self.triggers:
520432
if isinstance(trigger, DeployTimeField):
521433
continue
522-
if trigger["fq_name"].count(".") == 0:
523-
# fully qualified name is just the flow name
524-
trigger["flow"] = trigger["fq_name"]
525-
elif trigger["fq_name"].count(".") >= 2:
526-
# fully qualified name is of the format - project.branch.flow_name
527-
trigger["project"], tail = trigger["fq_name"].split(".", maxsplit=1)
528-
trigger["branch"], trigger["flow"] = tail.rsplit(".", maxsplit=1)
529-
else:
530-
raise MetaflowException(
531-
"Incorrect format for *flow* in *@trigger_on_finish* "
532-
"decorator. Specify either just the *flow_name* or a fully "
533-
"qualified name like *project_name.branch_name.flow_name*."
534-
)
535-
# TODO: Also sanity check project and branch names
536-
if not re.match(r"^[A-Za-z0-9_]+$", trigger["flow"]):
537-
raise MetaflowException(
538-
"Invalid flow name *%s* in *@trigger_on_finish* "
539-
"decorator. Only alphanumeric characters and "
540-
"underscores(_) are allowed." % trigger["flow"]
541-
)
434+
self._parse_fq_name(trigger)
542435

543436
self.options = self.attributes["options"]
544437

@@ -593,9 +486,67 @@ def flow_init(
593486
run_objs.append(run_obj)
594487
current._update_env({"trigger": Trigger.from_runs(run_objs)})
595488

489+
@staticmethod
490+
def _parse_static_triggers(flows):
491+
results = []
492+
for flow in flows:
493+
if is_stringish(flow):
494+
results.append(
495+
{
496+
"fq_name": flow,
497+
}
498+
)
499+
elif isinstance(flow, dict):
500+
if "name" not in flow:
501+
if len(flows) > 1:
502+
raise MetaflowException(
503+
"One or more flows in the *flows* attribute for "
504+
"*@trigger_on_finish* is missing the "
505+
"*name* key."
506+
)
507+
raise MetaflowException(
508+
"The *flow* attribute for *@trigger_on_finish* is missing the "
509+
"*name* key."
510+
)
511+
flow_name = flow["name"]
512+
513+
if not is_stringish(flow_name) or "." in flow_name:
514+
raise MetaflowException(
515+
f"The *name* attribute of the *flow* {flow_name} is not a valid string"
516+
)
517+
result = {"fq_name": flow_name}
518+
if "project" in flow:
519+
if is_stringish(flow["project"]):
520+
result["project"] = flow["project"]
521+
else:
522+
raise MetaflowException(
523+
f"The *project* attribute of the *flow* {flow_name} is not a string"
524+
)
525+
if "project_branch" in flow:
526+
if is_stringish(flow["project_branch"]):
527+
result["branch"] = flow["project_branch"]
528+
else:
529+
raise MetaflowException(
530+
f"The *project_branch* attribute of the *flow* {flow_name} is not a string"
531+
)
532+
results.append(result)
533+
else:
534+
if len(flows) > 1:
535+
raise MetaflowException(
536+
"One or more flows in the *flows* attribute for "
537+
"*@trigger_on_finish* decorator have an incorrect type. "
538+
"Supported type is string or Dict[str, str]- \n"
539+
"@trigger_on_finish(flows=['FooFlow', 'BarFlow']"
540+
)
541+
raise MetaflowException(
542+
"Incorrect type for *flow* attribute in *@trigger_on_finish* "
543+
" decorator. Supported type is string or Dict[str, str] - \n"
544+
"@trigger_on_finish(flow='FooFlow') or "
545+
"@trigger_on_finish(flow={'name':'FooFlow', 'project_branch': 'branch'})"
546+
)
547+
return results
548+
596549
def _parse_fq_name(self, trigger):
597-
if isinstance(trigger, DeployTimeField):
598-
trigger["fq_name"] = deploy_time_eval(trigger["fq_name"])
599550
if trigger["fq_name"].count(".") == 0:
600551
# fully qualified name is just the flow name
601552
trigger["flow"] = trigger["fq_name"]
@@ -615,32 +566,18 @@ def _parse_fq_name(self, trigger):
615566
"decorator. Only alphanumeric characters and "
616567
"underscores(_) are allowed." % trigger["flow"]
617568
)
618-
return trigger
619569

620570
def format_deploytime_value(self):
621-
for trigger in self.triggers:
622-
# Case were trigger is a function that returns a list
623-
# Need to do this bc we need to iterate over list and process
624-
if isinstance(trigger, DeployTimeField):
625-
deploy_value = deploy_time_eval(trigger)
626-
if isinstance(deploy_value, list):
627-
self.triggers = deploy_value
571+
if len(self.triggers) == 1 and isinstance(self.triggers[0], DeployTimeField):
572+
deploy_value = deploy_time_eval(self.triggers[0])
573+
if isinstance(deploy_value, list):
574+
self.triggers = deploy_value
628575
else:
629-
break
630-
for trigger in self.triggers:
631-
# Entire trigger is a function (returns either string or dict)
632-
old_trig = trigger
633-
if isinstance(trigger, DeployTimeField):
634-
trigger = deploy_time_eval(trigger)
635-
if isinstance(trigger, dict):
636-
trigger["fq_name"] = trigger.get("name")
637-
trigger["project"] = trigger.get("project")
638-
trigger["branch"] = trigger.get("branch")
639-
# We also added this bc it won't be formatted yet
640-
if isinstance(trigger, str):
641-
trigger = {"fq_name": trigger}
642-
trigger = self._parse_fq_name(trigger)
643-
self.triggers[self.triggers.index(old_trig)] = trigger
576+
self.triggers = [deploy_value]
577+
triggers = self._parse_static_triggers(self.triggers)
578+
for trigger in triggers:
579+
self._parse_fq_name(trigger)
580+
self.triggers = triggers
644581

645582
def get_top_level_options(self):
646583
return list(self._option_values.items())

0 commit comments

Comments
 (0)