Skip to content

Commit f03482a

Browse files
committed
Add timeout configurations for network and SLURM commands
1 parent 7b7cfdf commit f03482a

File tree

2 files changed

+118
-47
lines changed

2 files changed

+118
-47
lines changed

config.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22
GITHUB_REPO_URL = 'https://github.com/WATonomous/infra-config'
33
ALLOCATE_RUNNER_SCRIPT_PATH = "apptainer.sh" # relative path from '/allocation_script'
44

5+
# Timeout configurations
6+
NETWORK_TIMEOUT = 30 # seconds for HTTP requests (GitHub API calls)
7+
SLURM_COMMAND_TIMEOUT = 60 # seconds for SLURM commands (sbatch, sacct, etc.)
8+
THREAD_SLEEP_TIMEOUT = 5 # seconds between polling cycles for threads
59

610
REPOS_TO_MONITOR = [
711
{

main.py

Lines changed: 114 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,13 @@
99
from dotenv import load_dotenv
1010

1111
from runner_size_config import get_runner_resources
12-
from config import ALLOCATE_RUNNER_SCRIPT_PATH,REPOS_TO_MONITOR
12+
from config import (
13+
ALLOCATE_RUNNER_SCRIPT_PATH,
14+
REPOS_TO_MONITOR,
15+
NETWORK_TIMEOUT,
16+
SLURM_COMMAND_TIMEOUT,
17+
THREAD_SLEEP_TIMEOUT
18+
)
1319
from RunningJob import RunningJob
1420
from KubernetesLogFormatter import KubernetesLogFormatter
1521

@@ -62,7 +68,7 @@ def get_gh_api(url, token, etag=None):
6268
if etag:
6369
headers['If-None-Match'] = etag
6470

65-
response = requests.get(url, headers=headers)
71+
response = requests.get(url, headers=headers, timeout=NETWORK_TIMEOUT)
6672

6773
response.raise_for_status()
6874

@@ -82,17 +88,25 @@ def get_gh_api(url, token, etag=None):
8288
else:
8389
logger.error(f"Unexpected status code: {response.status_code}")
8490
return None, etag
91+
except requests.exceptions.Timeout:
92+
logger.error(f"GitHub API request timed out after {NETWORK_TIMEOUT} seconds: {url}")
93+
return None, etag
94+
except requests.exceptions.ConnectionError as e:
95+
logger.error(f"Connection error while calling GitHub API: {e}")
96+
return None, etag
8597
except requests.exceptions.RequestException as e:
8698
logger.error(f"Exception occurred while calling GitHub API: {e}")
8799
return None, etag
88100

89-
def poll_github_actions_and_allocate_runners(token, sleep_time=5):
101+
def poll_github_actions_and_allocate_runners(token, sleep_time=THREAD_SLEEP_TIMEOUT):
90102
"""
91103
Polls each repository in REPOS_TO_MONITOR for queued workflows, then tries
92104
to allocate ephemeral runners.
93105
"""
94106
global POLLED_WITHOUT_ALLOCATING
95107

108+
logger.info(f"Starting GitHub Actions polling thread with {sleep_time}s intervals")
109+
96110
while True:
97111
try:
98112
something_allocated = False
@@ -204,19 +218,38 @@ def allocate_actions_runner(job_id, token, repo_api_base_url, repo_url, repo_nam
204218
reg_url = f'{repo_api_base_url}/actions/runners/registration-token'
205219
remove_url = f'{repo_api_base_url}/actions/runners/remove-token'
206220

207-
reg_resp = requests.post(reg_url, headers=headers)
208-
reg_resp.raise_for_status()
209-
reg_data = reg_resp.json()
210-
registration_token = reg_data["token"]
221+
try:
222+
reg_resp = requests.post(reg_url, headers=headers, timeout=NETWORK_TIMEOUT)
223+
reg_resp.raise_for_status()
224+
reg_data = reg_resp.json()
225+
registration_token = reg_data["token"]
226+
logger.debug("Successfully obtained registration token")
227+
except requests.exceptions.Timeout:
228+
logger.error(f"Registration token request timed out after {NETWORK_TIMEOUT} seconds")
229+
del allocated_jobs[(repo_name, job_id)]
230+
return False
231+
except requests.exceptions.RequestException as e:
232+
logger.error(f"Failed to get registration token: {e}")
233+
del allocated_jobs[(repo_name, job_id)]
234+
return False
211235

212236
# recommended small delay https://docs.github.com/en/rest/using-the-rest-api/best-practices-for-using-the-rest-api?apiVersion=2022-11-28#pause-between-mutative-requests
213237
time.sleep(1)
214238

215239
# Get removal token
216-
remove_resp = requests.post(remove_url, headers=headers)
217-
remove_resp.raise_for_status()
218-
remove_data = remove_resp.json()
219-
removal_token = remove_data["token"]
240+
try:
241+
remove_resp = requests.post(remove_url, headers=headers, timeout=NETWORK_TIMEOUT)
242+
remove_resp.raise_for_status()
243+
remove_data = remove_resp.json()
244+
removal_token = remove_data["token"]
245+
except requests.exceptions.Timeout:
246+
logger.error(f"Removal token request timed out after {NETWORK_TIMEOUT} seconds")
247+
del allocated_jobs[(repo_name, job_id)]
248+
return False
249+
except requests.exceptions.RequestException as e:
250+
logger.error(f"Failed to get removal token: {e}")
251+
del allocated_jobs[(repo_name, job_id)]
252+
return False
220253

221254
# Get job details to see labels
222255
job_api_url = f"{repo_api_base_url}/actions/jobs/{job_id}"
@@ -272,36 +305,45 @@ def allocate_actions_runner(job_id, token, repo_api_base_url, repo_url, repo_nam
272305
]
273306

274307
logger.info(f"Running command: {' '.join(command)}")
275-
result = subprocess.run(command, capture_output=True, text=True)
276-
output = result.stdout.strip()
277-
error_output = result.stderr.strip()
278-
logger.info(f"Command stdout: {output}")
279-
if error_output:
280-
logger.error(f"Command stderr: {error_output}")
281-
282-
# Attempt to parse the SLURM job ID from output (e.g. "Submitted batch job 3828")
283-
if result.returncode == 0:
284-
try:
285-
slurm_job_id = int(output.split()[-1])
286-
# Store the SLURM job ID in allocated_jobs
287-
allocated_jobs[(repo_name, job_id)] = RunningJob(
288-
repo=repo_name,
289-
job_id=job_id,
290-
slurm_job_id=slurm_job_id,
291-
workflow_name=job_data['workflow_name'],
292-
job_name=job_data['name'],
293-
labels=labels
294-
)
295-
logger.info(f"Allocated runner for job {job_id} in {repo_name} with SLURM job ID {slurm_job_id}.")
296-
return True
297-
except (IndexError, ValueError) as parse_err:
298-
logger.error(f"Failed to parse SLURM job ID from: {output}. Error: {parse_err}")
299-
else:
300-
logger.error(f"sbatch command failed with return code {result.returncode}")
301-
302-
# If we get here, something failed, so remove from tracking and consider retry
303-
del allocated_jobs[(repo_name, job_id)]
304-
return False
308+
try:
309+
result = subprocess.run(command, capture_output=True, text=True, timeout=SLURM_COMMAND_TIMEOUT)
310+
output = result.stdout.strip()
311+
error_output = result.stderr.strip()
312+
logger.info(f"Command stdout: {output}")
313+
if error_output:
314+
logger.error(f"Command stderr: {error_output}")
315+
316+
# Attempt to parse the SLURM job ID from output (e.g. "Submitted batch job 3828")
317+
if result.returncode == 0:
318+
try:
319+
slurm_job_id = int(output.split()[-1])
320+
# Store the SLURM job ID in allocated_jobs
321+
allocated_jobs[(repo_name, job_id)] = RunningJob(
322+
repo=repo_name,
323+
job_id=job_id,
324+
slurm_job_id=slurm_job_id,
325+
workflow_name=job_data['workflow_name'],
326+
job_name=job_data['name'],
327+
labels=labels
328+
)
329+
logger.info(f"Allocated runner for job {job_id} in {repo_name} with SLURM job ID {slurm_job_id}.")
330+
return True
331+
except (IndexError, ValueError) as parse_err:
332+
logger.error(f"Failed to parse SLURM job ID from: {output}. Error: {parse_err}")
333+
else:
334+
logger.error(f"sbatch command failed with return code {result.returncode}")
335+
336+
# If we get here, something failed, so remove from tracking and consider retry
337+
del allocated_jobs[(repo_name, job_id)]
338+
return False
339+
except subprocess.TimeoutExpired:
340+
logger.error(f"SLURM command timed out after {SLURM_COMMAND_TIMEOUT} seconds: {' '.join(command)}")
341+
del allocated_jobs[(repo_name, job_id)]
342+
return False
343+
except subprocess.SubprocessError as e:
344+
logger.error(f"Subprocess error running SLURM command: {e}")
345+
del allocated_jobs[(repo_name, job_id)]
346+
return False
305347

306348
except Exception as e:
307349
logger.error(f"Exception in allocate_actions_runner for job_id {job_id}: {e}")
@@ -333,7 +375,8 @@ def check_slurm_status():
333375
]
334376

335377
try:
336-
sacct_result = subprocess.run(sacct_cmd, capture_output=True, text=True)
378+
logger.debug(f"Checking SLURM job status for job ID: {running_job.slurm_job_id}")
379+
sacct_result = subprocess.run(sacct_cmd, capture_output=True, text=True, timeout=SLURM_COMMAND_TIMEOUT)
337380
if sacct_result.returncode != 0:
338381
logger.error(f"sacct command failed with return code {sacct_result.returncode}")
339382
if sacct_result.stderr:
@@ -375,17 +418,23 @@ def check_slurm_status():
375418
logger.info(f"Slurm job {job_component} {status} in {duration}. Running Job Info: {str(running_job)}")
376419
to_remove.append(job_id)
377420

421+
except subprocess.TimeoutExpired:
422+
logger.error(f"SLURM status check timed out after {SLURM_COMMAND_TIMEOUT} seconds for job ID {running_job.slurm_job_id}")
423+
except subprocess.SubprocessError as e:
424+
logger.error(f"Subprocess error checking SLURM job status for job ID {running_job.slurm_job_id}: {e}")
378425
except Exception as e:
379426
logger.error(f"Error querying SLURM job details for job ID {running_job.slurm_job_id}: {e}")
380427

381428
# Remove completed/failed jobs
382429
for key in to_remove:
383430
del allocated_jobs[key]
384431

385-
def poll_slurm_statuses(sleep_time=5):
432+
def poll_slurm_statuses(sleep_time=THREAD_SLEEP_TIMEOUT):
386433
"""
387434
Wrapper function to poll check_slurm_status.
388435
"""
436+
logger.info(f"Starting SLURM status polling thread with {sleep_time}s intervals")
437+
389438
while True:
390439
try:
391440
check_slurm_status()
@@ -394,20 +443,38 @@ def poll_slurm_statuses(sleep_time=5):
394443
time.sleep(sleep_time)
395444

396445
if __name__ == "__main__":
446+
logger.info("Starting SLURM GitHub Actions runner with timeout configurations:")
447+
logger.info(f" Network timeout: {NETWORK_TIMEOUT}s")
448+
logger.info(f" SLURM command timeout: {SLURM_COMMAND_TIMEOUT}s")
449+
logger.info(f" Thread sleep timeout: {THREAD_SLEEP_TIMEOUT}s")
450+
397451
# Thread to poll GitHub for new queued workflows
398452
github_thread = threading.Thread(
399453
target=poll_github_actions_and_allocate_runners,
400-
args=(GITHUB_ACCESS_TOKEN, 2)
454+
args=(GITHUB_ACCESS_TOKEN, 2),
455+
name="GitHub-Poller"
401456
)
402457

403458
# Thread to poll SLURM job statuses
404459
slurm_thread = threading.Thread(
405460
target=poll_slurm_statuses,
406-
kwargs={'sleep_time': 5}
461+
kwargs={'sleep_time': THREAD_SLEEP_TIMEOUT},
462+
name="SLURM-Status-Poller"
407463
)
408464

465+
# Set threads as daemon so they exit when main thread exits
466+
github_thread.daemon = True
467+
slurm_thread.daemon = True
468+
409469
github_thread.start()
410470
slurm_thread.start()
411471

412-
github_thread.join()
413-
slurm_thread.join()
472+
try:
473+
github_thread.join()
474+
slurm_thread.join()
475+
except KeyboardInterrupt:
476+
logger.info("Received interrupt signal, shutting down gracefully...")
477+
except Exception as e:
478+
logger.error(f"Unexpected error in main thread: {e}")
479+
finally:
480+
logger.info("SLURM GitHub Actions runner shutdown complete.")

0 commit comments

Comments
 (0)