forked from parallelworks/test-workflow-action
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrun_workflow.py
More file actions
executable file
·113 lines (98 loc) · 3.77 KB
/
run_workflow.py
File metadata and controls
executable file
·113 lines (98 loc) · 3.77 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
from client import Client
import sys, json
import traceback
import time
# FIXME: Wont be able to stop the resource if it was just started!
from time import sleep
from client_functions import *
if __name__ == "__main__":
pw_user_host = sys.argv[1] # beluga.parallel.works
pw_api_key = sys.argv[2] # echo ${PW_API_KEY}
user = sys.argv[3] # echo ${PW_USER}
resource_names = sys.argv[4].split("---") # Not case sensitive
wf_name = sys.argv[5]
wf_xml_args = json.loads(sys.argv[6])
c = Client("https://" + pw_user_host, pw_api_key)
# add startCmd to wf_xml_args
startCmd = get_cmd(wf_name, c)
wf_xml_args["startCmd"] = startCmd
# Make sure we get to stopping the resources!
run_workflow = True
# Exit with error code:
exit_error = ""
# Starting resources
resource_status = []
for rname in resource_names:
if not rname:
continue
try:
resource_status.append(start_resource(rname, c))
except Exception as e:
msg = "ERROR: Unexpected error when starting resource " + rname
printd(msg)
traceback.print_exc()
run_workflow = False
exit_error += msg
last_state = {}
started = []
cluster_hosts = []
printd("\nWaiting for", len(resource_names), "cluster(s) to start...")
while True:
current_state = c.get_resources()
for cluster in current_state:
if cluster["name"] in resource_names and cluster["status"] == "on":
if cluster["name"] not in started:
state = cluster["state"]
if cluster["name"] not in last_state:
printd(cluster["name"], state)
last_state[cluster["name"]] = state
elif last_state[cluster["name"]] != state:
print(cluster["name"], state)
last_state[cluster["name"]] = state
if "masterNode" in cluster["state"]:
if cluster["state"]["masterNode"] != None:
ip = cluster["state"]["masterNode"]
entry = " ".join([cluster["name"], ip])
print(entry)
cluster_hosts.append(entry)
started.append(cluster["name"])
if len(started) == len(resource_names):
print("\nStarted all clusters")
break
time.sleep(5)
# Running workflow
if run_workflow:
if "not-found" in resource_status:
msg = "ERROR: Some resources were not found"
printd(msg)
run_workflow = False
exit_error += "\n" + msg
if run_workflow:
try:
# Launching workflow
response = launch_workflow(wf_name, wf_xml_args, user, c)
# Waiting for workflow to complete
state = wait_workflow(wf_name, c)
if state != "completed":
msg = "Workflow final state is " + state
printd(msg)
exit_error += "\n" + msg
except Exception:
msg = "Workflow launch failed unexpectedly"
printd(msg)
traceback.print_exc()
exit_error += "\n" + msg
else:
msg = "Aborting workflow launch"
printd(msg)
exit_error += "\n" + msg
# Stoping resources
sleep(5)
for rname, rstatus in zip(resource_names, resource_status):
printd(rname, "status", rstatus)
# Do not stop the pool if it was already started!
# FIXME: Even with this precaution a pool with ongoing work could be stopped
if rstatus == "started":
stop_resource(rname, c)
if exit_error:
raise (Exception(exit_error))