-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathsample_pipe.py
42 lines (33 loc) · 916 Bytes
/
sample_pipe.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
"""Sample pipe to view DAG and artifacts."""
import pandas as pd
from metaflow import FlowSpec, step
class SampleFlow(FlowSpec):
"""Sample Metaflow Flow"""
@step
def start(self):
"""Start step"""
self.x = 1
print("MF pipe is starting.")
self.next(self.p1, self.p2)
@step
def p1(self):
"""First parallel step"""
self.hi = pd.DataFrame([{"hello": "there"}])
print("Executing first parallel step.")
self.next(self.join)
@step
def p2(self):
"""Second parallel step"""
print("Executing second parallel step.")
self.next(self.join)
@step
def join(self, artifacts):
"""Join step"""
print("Executing join step.")
self.next(self.end)
@step
def end(self):
"""End step"""
print("MF pipe is all done.")
if __name__ == "__main__":
SampleFlow()