Skip to content

Commit 1e78768

Browse files
authored
Merge branch 'develop' into aa-slidetag-one
2 parents 514078b + 78d002d commit 1e78768

File tree

2 files changed

+220
-83
lines changed

2 files changed

+220
-83
lines changed

pipeline_versions.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,4 @@ atac 2.7.1 2025-02-25
3737
snm3C 4.0.4 2024-08-06
3838
SmartSeq2SingleSample 5.1.21 2024-09-11
3939
Optimus 7.10.0 2025-02-25
40-
Multiome 5.12.0 2025-02-25
40+
Multiome 5.12.0 2025-02-25

scripts/firecloud_api/firecloud_api.py

Lines changed: 219 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -79,83 +79,183 @@ def get_user_token(self, credentials: credentials):
7979
return credentials.token
8080

8181
def submit_job(self, submission_data_file):
82-
token = self.get_user_token(self.delegated_creds)
83-
headers = self.build_auth_headers(token)
84-
url = f"{self.base_url}/workspaces/{self.namespace}/{quote(self.workspace_name)}/submissions"
85-
response = requests.post(url, json=submission_data_file, headers=headers)
82+
"""
83+
Submits a job to Terra/Firecloud with retry logic for intermittent 500 errors.
8684
87-
# Print status code and response body for debugging
88-
logging.info(f"Response status code for submitting job: {response.status_code}")
89-
logging.info(f"Response body: {response.text}")
85+
:param submission_data_file: The JSON data for the submission
86+
:return: The submission ID if successful, None otherwise
87+
"""
88+
# Set up retry parameters
89+
max_retry_duration = 15 * 60 # 15 minutes in seconds
90+
start_time = time.time()
91+
retry_delay = 5 # Start with a 5-second delay between retries
92+
max_retry_delay = 30 # Maximum retry delay in seconds
93+
max_attempts = 10 # Maximum number of retry attempts
94+
95+
attempts = 0
96+
while attempts < max_attempts:
97+
attempts += 1
98+
99+
# Check if we've exceeded the maximum retry duration
100+
current_time = time.time()
101+
if current_time - start_time > max_retry_duration:
102+
logging.error(f"Exceeded maximum retry duration of {max_retry_duration/60} minutes.")
103+
return None
90104

91-
if response.status_code == 201:
92105
try:
93-
# Parse the response as JSON
94-
response_json = response.json()
95-
96-
# Extract the submissionId
97-
submission_id = response_json.get("submissionId", None)
98-
if submission_id:
99-
logging.info(f"Submission ID extracted: {submission_id}")
100-
return submission_id
101-
else:
102-
logging.error("Error: submissionId not found in the response.")
106+
token = self.get_user_token(self.delegated_creds)
107+
headers = self.build_auth_headers(token)
108+
url = f"{self.base_url}/workspaces/{self.namespace}/{quote(self.workspace_name)}/submissions"
109+
110+
logging.info(f"Submitting job, attempt {attempts}/{max_attempts}")
111+
response = requests.post(url, json=submission_data_file, headers=headers)
112+
113+
# Print status code and response body for debugging
114+
logging.info(f"Response status code for submitting job: {response.status_code}")
115+
116+
# Handle different response codes
117+
if response.status_code == 201: # Success
118+
try:
119+
# Parse the response as JSON
120+
response_json = response.json()
121+
logging.info(f"Response body: {response.text}")
122+
123+
# Extract the submissionId
124+
submission_id = response_json.get("submissionId", None)
125+
if submission_id:
126+
logging.info(f"Submission ID extracted: {submission_id}")
127+
return submission_id
128+
else:
129+
logging.error("Error: submissionId not found in the response.")
130+
return None
131+
except json.JSONDecodeError:
132+
logging.error("Error: Failed to parse JSON response.")
133+
logging.error(f"Response body: {response.text}")
134+
# If we can't parse the JSON but got a 201, we might still want to retry
135+
if attempts < max_attempts:
136+
time.sleep(retry_delay)
137+
retry_delay = min(retry_delay * 1.5, max_retry_delay)
138+
continue
139+
return None
140+
141+
elif response.status_code == 500: # Server error, retry
142+
logging.warning(f"Received 500 error. Retrying in {retry_delay} seconds...")
143+
logging.warning(f"Response body: {response.text}")
144+
time.sleep(retry_delay)
145+
# Implement exponential backoff with a cap
146+
retry_delay = min(retry_delay * 1.5, max_retry_delay)
147+
continue
148+
149+
elif response.status_code >= 400 and response.status_code < 500: # Client error
150+
# For 4xx errors, only retry a few times as they might be temporary auth issues
151+
logging.error(f"Client error (4xx): {response.status_code}")
152+
logging.error(f"Response body: {response.text}")
153+
if response.status_code == 401 or response.status_code == 403:
154+
# Auth errors might be temporary, retry with token refresh
155+
self.delegated_creds.refresh(Request())
156+
if attempts < 3: # Only retry auth errors a few times
157+
time.sleep(retry_delay)
158+
continue
103159
return None
104-
except json.JSONDecodeError:
105-
logging.error("Error: Failed to parse JSON response.")
106-
return None
107-
else:
108-
logging.error(f"Failed to submit job. Status code: {response.status_code}")
109-
logging.error(f"Response body: {response.text}")
110-
return None
160+
161+
else: # Other error codes
162+
logging.error(f"Failed to submit job. Status code: {response.status_code}")
163+
logging.error(f"Response body: {response.text}")
164+
if attempts < max_attempts:
165+
time.sleep(retry_delay)
166+
retry_delay = min(retry_delay * 1.5, max_retry_delay)
167+
continue
168+
return None
169+
170+
except requests.exceptions.RequestException as e:
171+
# Handle network errors
172+
logging.warning(f"Network error occurred: {e}. Retrying in {retry_delay} seconds...")
173+
time.sleep(retry_delay)
174+
# Implement exponential backoff with a cap
175+
retry_delay = min(retry_delay * 1.5, max_retry_delay)
176+
continue
177+
178+
logging.error(f"Failed to submit job after {max_attempts} attempts.")
179+
return None
111180

112181

113182
def create_new_method_config(self, branch_name, pipeline_name):
114183
"""
115184
Creates a new method configuration in the workspace via Firecloud API.
185+
Includes a retry mechanism for 404 errors from Dockstore.
116186
117187
:param branch_name: The branch name
118188
:param pipeline_name: The name of the pipeline
119189
:return: The name of the created method configuration or None if failed
120190
"""
121-
122191
# Create method config name with test type
123192
method_config_name = self.get_method_config_name(pipeline_name, branch_name, args.test_type)
124193

125-
payload = {
126-
"deleted": False,
127-
"inputs": {},
128-
"methodConfigVersion": 0,
129-
"methodRepoMethod": {
130-
"methodUri": f"dockstore://github.com/broadinstitute/warp/{pipeline_name}/{branch_name}",
131-
"sourceRepo": "dockstore",
132-
"methodPath": f"github.com/broadinstitute/warp/{pipeline_name}",
133-
"methodVersion": f"{branch_name}"
134-
},
135-
"name": method_config_name,
136-
"namespace": "warp-pipelines",
137-
"outputs": {},
138-
"prerequisites": {}
139-
}
140-
logging.info(f"Creating new method configuration: {json.dumps(payload, indent=2)}")
194+
# Flag to track if we've already retried for a 404 error
195+
dockstore_404_retried = False
196+
197+
# Function to create the payload
198+
def create_payload():
199+
return {
200+
"deleted": False,
201+
"inputs": {},
202+
"methodConfigVersion": 0,
203+
"methodRepoMethod": {
204+
"methodUri": f"dockstore://github.com/broadinstitute/warp/{pipeline_name}/{branch_name}",
205+
"sourceRepo": "dockstore",
206+
"methodPath": f"github.com/broadinstitute/warp/{pipeline_name}",
207+
"methodVersion": f"{branch_name}"
208+
},
209+
"name": method_config_name,
210+
"namespace": "warp-pipelines",
211+
"outputs": {},
212+
"prerequisites": {}
213+
}
214+
215+
# Attempt to create the method configuration
216+
def attempt_creation():
217+
payload = create_payload()
218+
logging.info(f"Creating new method configuration: {json.dumps(payload, indent=2)}")
219+
220+
# Construct the API endpoint URL for creating a new method configuration
221+
url = f"{self.base_url}/workspaces/{self.namespace}/{quote(self.workspace_name)}/method_configs/{self.namespace}/{method_config_name}"
141222

142-
# Construct the API endpoint URL for creating a new method configuration
143-
url = f"{self.base_url}/workspaces/{self.namespace}/{quote(self.workspace_name)}/method_configs/{self.namespace}/{method_config_name}"
223+
token = self.get_user_token(self.delegated_creds)
224+
headers = self.build_auth_headers(token)
144225

145-
token = self.get_user_token(self.delegated_creds)
146-
headers = self.build_auth_headers(token)
226+
# Create the new method configuration in the workspace
227+
response = requests.put(url, headers=headers, json=payload)
228+
229+
return response
147230

148-
# Create the new method configuration in the workspace
149-
response = requests.put(url, headers=headers, json=payload)
231+
# First attempt
232+
response = attempt_creation()
150233

151-
# Check if the method configuration was created successfully
234+
# Check if we got a 404 error (likely from Dockstore)
235+
if response.status_code == 404 and not dockstore_404_retried:
236+
error_message = response.text
237+
logging.warning(f"Received 404 error, possibly from Dockstore: {error_message}")
238+
logging.info(f"Waiting 5 minutes before retrying...")
239+
240+
# Wait for 5 minutes (300 seconds)
241+
time.sleep(300)
242+
243+
# Mark that we've retried for this error
244+
dockstore_404_retried = True
245+
246+
# Retry the creation
247+
logging.info("Retrying method configuration creation after 5-minute wait")
248+
response = attempt_creation()
249+
250+
# Final check if the method configuration was created successfully
152251
if response.status_code == 200:
153252
logging.info(f"Method configuration {method_config_name} created successfully.")
154253
return method_config_name
155254
else:
156255
logging.error(f"Failed to create method configuration. Status code: {response.status_code}")
157256
logging.error(f"Response body: {response.text}")
158-
raise Exception(f"Failed to create method configuration for {pipeline_name} on branch {branch_name}")
257+
raise Exception(f"Failed to create method configuration for {pipeline_name} on the branch {branch_name}")
258+
159259

160260
def upload_test_inputs(self, pipeline_name, test_inputs, branch_name, test_type):
161261
"""
@@ -228,6 +328,7 @@ def upload_test_inputs(self, pipeline_name, test_inputs, branch_name, test_type)
228328
def poll_job_status(self, submission_id):
229329
"""
230330
Polls the status of a submission until it is complete and returns a dictionary of workflow IDs and their statuses.
331+
Includes retry mechanism for handling intermittent 500 errors.
231332
232333
:param submission_id: The ID of the submission to poll
233334
:return: Dictionary with workflow IDs as keys and their statuses as values
@@ -236,44 +337,80 @@ def poll_job_status(self, submission_id):
236337
status_url = f"{self.base_url}/workspaces/{self.namespace}/{self.workspace_name}/submissions/{submission_id}"
237338
workflow_status_map = {}
238339

340+
# Set up retry parameters
341+
max_retry_duration = 15 * 60 # 15 minutes in seconds
342+
start_time = time.time()
343+
retry_delay = 5 # Start with a 5-second delay between retries
344+
max_retry_delay = 30 # Maximum retry delay in seconds
345+
239346
# Continuously poll the status of the submission until completion
240347
while True:
241-
# Get the token and headers
242-
token = self.get_user_token(self.delegated_creds)
243-
headers = self.build_auth_headers(token)
244-
status_response = requests.get(status_url, headers=headers)
348+
# Check if we've exceeded the maximum retry duration
349+
current_time = time.time()
350+
if current_time - start_time > max_retry_duration:
351+
logging.error(f"Exceeded maximum retry duration of {max_retry_duration/60} minutes.")
352+
return workflow_status_map
245353

246-
# Check if the response status code is successful (200)
247-
if status_response.status_code != 200:
248-
logging.error(f"Error: Received status code {status_response.status_code}")
249-
logging.info(f"Response content: {status_response.text}")
250-
return {}
251354
try:
252-
# Parse the response as JSON
253-
status_data = status_response.json()
254-
except json.JSONDecodeError:
255-
logging.error("Error decoding JSON response.")
256-
logging.info(f"Response content: {status_response.text}")
257-
return {}
258-
259-
# Retrieve workflows and their statuses
260-
workflows = status_data.get("workflows", [])
261-
for workflow in workflows:
262-
workflow_id = workflow.get("workflowId")
263-
workflow_status = workflow.get("status")
264-
if workflow_id and workflow_status:
265-
workflow_status_map[workflow_id] = workflow_status
266-
267-
# Check if the submission is complete
268-
submission_status = status_data.get("status", "")
269-
if submission_status == "Done":
270-
break
271-
272-
# Wait for 20 seconds before polling again
273-
time.sleep(20)
355+
# Get the token and headers
356+
token = self.get_user_token(self.delegated_creds)
357+
headers = self.build_auth_headers(token)
358+
status_response = requests.get(status_url, headers=headers)
359+
360+
# Check for 500 errors and retry if necessary
361+
if status_response.status_code == 500:
362+
logging.warning(f"Received 500 error. Retrying in {retry_delay} seconds...")
363+
time.sleep(retry_delay)
364+
# Implement exponential backoff with a cap
365+
retry_delay = min(retry_delay * 1.5, max_retry_delay)
366+
continue
367+
368+
# Check if the response status code is successful (200)
369+
if status_response.status_code != 200:
370+
logging.error(f"Error: Received status code {status_response.status_code}")
371+
logging.info(f"Response content: {status_response.text}")
372+
# For non-500 errors, wait and retry a few times
373+
if time.time() - start_time <= 60: # Only retry for the first minute for non-500 errors
374+
logging.warning(f"Retrying in {retry_delay} seconds...")
375+
time.sleep(retry_delay)
376+
continue
377+
return {}
378+
379+
try:
380+
# Parse the response as JSON
381+
status_data = status_response.json()
382+
# Reset retry delay after successful request
383+
retry_delay = 5
384+
except json.JSONDecodeError:
385+
logging.error("Error decoding JSON response.")
386+
logging.info(f"Response content: {status_response.text}")
387+
time.sleep(retry_delay)
388+
continue
389+
390+
# Retrieve workflows and their statuses
391+
workflows = status_data.get("workflows", [])
392+
for workflow in workflows:
393+
workflow_id = workflow.get("workflowId")
394+
workflow_status = workflow.get("status")
395+
if workflow_id and workflow_status:
396+
workflow_status_map[workflow_id] = workflow_status
397+
398+
# Check if the submission is complete
399+
submission_status = status_data.get("status", "")
400+
if submission_status == "Done":
401+
break
402+
403+
# Wait for 20 seconds before polling again
404+
time.sleep(20)
405+
406+
except requests.exceptions.RequestException as e:
407+
# Handle network errors
408+
logging.warning(f"Network error occurred: {e}. Retrying in {retry_delay} seconds...")
409+
time.sleep(retry_delay)
410+
# Implement exponential backoff with a cap
411+
retry_delay = min(retry_delay * 1.5, max_retry_delay)
274412

275413
return workflow_status_map
276-
277414
def quote_values(self, inputs_json):
278415
"""
279416
Format JSON values with proper handling of nested structures

0 commit comments

Comments
 (0)