Skip to content

Commit 2eb8d3a

Browse files
authored
Merge pull request #1638 from Clinical-Genomics/release_v18.0.1
Changed: ^^^^^^^^ * moved default resource allocation to snakemake command #1632 * increased memory of samtools fixmate #1632 * increased runtime for rule all #1632 * no rerun for rule all #1632 * increased head-job runtime to 7 days #1632 * improved information on failed job status #1636 * scontrol replaced with sacct in jobstatus script #1636 * add attempt mem bump to vep_somatic_research_sv #1632 Removed: ^^^^^^^^ * exome argument panel bed callback function #1632 * removed -l flag in head-job sbatch script #1632
2 parents bf54401 + 0797ddf commit 2eb8d3a

File tree

17 files changed

+315
-148
lines changed

17 files changed

+315
-148
lines changed

BALSAMIC/assets/scripts/scan_finished_jobid_status.py

Lines changed: 165 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
from __future__ import annotations
33

44
import logging
5-
import re
65
import subprocess
76
from pathlib import Path
87
from typing import Dict, List, Tuple, Optional
@@ -12,6 +11,9 @@
1211

1312
LOG = logging.getLogger(__name__)
1413

14+
FAILURE_LIKE_STATES = {"FAILED", "CANCELLED", "TIMEOUT", "OUT_OF_MEMORY"}
15+
SUCCESS_STATES = {"COMPLETED"}
16+
1517

1618
def find_job_logs(log_root: Path) -> Dict[str, Path]:
1719
"""
@@ -30,35 +32,57 @@ def find_job_logs(log_root: Path) -> Dict[str, Path]:
3032

3133
def get_job_state(jobid: str) -> Optional[str]:
3234
"""
33-
Return raw output of `scontrol show job JOBID`, or None if the query fails.
35+
Look up the final job state via `sacct`.
36+
37+
We prefer the top-level job record (e.g. '10683002')
38+
over step records (e.g. '10683002.batch', '10683002.0').
39+
40+
Returns a normalized state string like 'COMPLETED', 'FAILED',
41+
'CANCELLED', etc., or None if not found.
3442
"""
43+
cmd = [
44+
"sacct",
45+
"-j",
46+
jobid,
47+
"--noheader", # no column headers
48+
"--parsable2", # '|' separator, stable columns
49+
"-o",
50+
"JobID,State",
51+
]
52+
3553
try:
36-
LOG.debug(f"Running show job scontrol {jobid}")
54+
LOG.debug("Running: %s", " ".join(cmd))
3755
result = subprocess.run(
38-
["scontrol", "show", "job", jobid],
56+
cmd,
3957
capture_output=True,
4058
text=True,
4159
check=True,
4260
)
43-
return result.stdout
4461
except FileNotFoundError:
45-
LOG.error("scontrol executable not found: scontrol")
62+
LOG.error("sacct executable not found: sacct")
4663
return None
4764
except subprocess.CalledProcessError as e:
48-
LOG.warning(f"Could not check job {jobid} (may not exist). rc={e.returncode}")
49-
LOG.debug(f"scontrol stderr for {jobid} {e.stderr}")
65+
LOG.warning("Could not check job %s via sacct (rc=%s)", jobid, e.returncode)
66+
LOG.debug("sacct stderr for %s: %s", jobid, e.stderr)
5067
return None
5168

69+
lines = [ln.strip() for ln in result.stdout.splitlines() if ln.strip()]
70+
if not lines:
71+
LOG.debug("No sacct records returned for job %s", jobid)
72+
return None
73+
74+
# Each line looks like: "10683002|FAILED" or "10683002.0|CANCELLED+"
75+
records = [ln.split("|") for ln in lines]
76+
77+
# Prefer the exact jobid (no step suffix)
78+
parent_record = next((r for r in records if r[0] == jobid), None)
79+
chosen = parent_record or records[0]
80+
81+
raw_state = chosen[1]
82+
# Normalize things like "CANCELLED+" or "FAILED node_fail"
83+
state = raw_state.split()[0].rstrip("+")
84+
LOG.debug("Job %s sacct raw state=%r -> normalized=%r", jobid, raw_state, state)
5285

53-
def parse_state(scontrol_output: str) -> Optional[str]:
54-
"""
55-
Extract JobState from scontrol text, e.g. 'JobState=FAILED'.
56-
Returns the state string (e.g. 'FAILED') or None if not found.
57-
"""
58-
m = re.search(r"JobState=(\S+)", scontrol_output)
59-
state = m.group(1) if m else None
60-
if state is None:
61-
LOG.debug("JobState not found in scontrol output")
6286
return state
6387

6488

@@ -67,44 +91,153 @@ def write_results(
6791
failed: List[Tuple[str, Path]],
6892
cancelled: List[Tuple[str, Path]],
6993
unknown: List[str],
94+
resolved_failures: List[Tuple[str, Path, str, str]],
7095
) -> None:
7196
"""
7297
Append job results to output_file.
7398
Each run is prefixed with a timestamp header.
99+
100+
resolved_failures items are (jobid, log_path, state, rule_key).
74101
"""
75102
output_file.parent.mkdir(parents=True, exist_ok=True)
76103

77104
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
78105

79106
with output_file.open("a") as out_f:
80-
out_f.write(f"=== Job status check at {timestamp} ===\n")
107+
out_f.write(f"=== Job status check at {timestamp} ===\n\n")
108+
109+
# If there are no *unresolved* failures/cancellations, consider run successful.
110+
if not failed and not cancelled:
111+
out_f.write("SUCCESSFUL\n\n")
81112

82113
if failed:
83-
out_f.write("Failed jobs:\n")
114+
out_f.write("FAILED JOBS (no successful retry):\n")
84115
for jobid, log_path in failed:
85116
out_f.write(f"{jobid}\t{log_path}\n")
86117
out_f.write("\n")
87118

88119
if cancelled:
89-
out_f.write("Cancelled jobs:\n")
120+
out_f.write("CANCELLED JOBS (no successful retry):\n")
90121
for jobid, log_path in cancelled:
91122
out_f.write(f"{jobid}\t{log_path}\n")
92123
out_f.write("\n")
93124

125+
if resolved_failures:
126+
out_f.write("\n")
127+
out_f.write("NOTE:\n")
128+
out_f.write("Some jobs failed but succeeded on retry:\n")
129+
for jobid, log_path, state, rule_key in resolved_failures:
130+
out_f.write(f"{jobid}\t{log_path}\t{state}\n")
131+
out_f.write("\n")
132+
94133
if unknown:
95134
out_f.write("Unknown status jobs:\n")
96135
for jobid in unknown:
97136
out_f.write(f"{jobid}\tNA\n")
98137
out_f.write("\n")
99138

100-
if not failed and not cancelled:
101-
out_f.write("SUCCESSFUL\n\n")
102-
103139
LOG.info(
104-
f"Appended results to {output_file} (failed={len(failed)}, cancelled={len(cancelled)} unknown={len(unknown)})"
140+
"Appended results to %s (failed=%d, cancelled=%d, resolved_failures=%d, unknown=%d)",
141+
output_file,
142+
len(failed),
143+
len(cancelled),
144+
len(resolved_failures),
145+
len(unknown),
105146
)
106147

107148

149+
def derive_rule_key(log_root: Path, log_path: Path) -> str:
150+
"""
151+
Derive the unique log-directory path for the rule + any wildcard as a key
152+
153+
Prefer path relative to log_root; fall back to absolute parent directory.
154+
"""
155+
try:
156+
return str(log_path.parent.relative_to(log_root))
157+
except ValueError:
158+
return str(log_path.parent)
159+
160+
161+
def group_jobs_by_rule(
162+
log_dir: Path, job_logs: Dict[str, Path]
163+
) -> Tuple[Dict[str, List[Tuple[str, Path, Optional[str]]]], List[str]]:
164+
"""
165+
Query sacct for each job and group them per rule directory.
166+
167+
Returns:
168+
rule_to_jobs: {rule_key -> [(jobid, log_path, state), ...]}
169+
unknown: list of jobids with missing sacct info
170+
"""
171+
rule_to_jobs: Dict[str, List[Tuple[str, Path, Optional[str]]]] = {}
172+
unknown: List[str] = []
173+
174+
for jobid in sorted(job_logs.keys(), key=int):
175+
log_path = job_logs[jobid]
176+
177+
state = get_job_state(jobid)
178+
if state is None:
179+
LOG.warning(
180+
"Missing sacct state for job %s -- setting status UNKNOWN", jobid
181+
)
182+
unknown.append(jobid)
183+
continue
184+
rule_key = derive_rule_key(log_dir, log_path)
185+
186+
LOG.debug("Job %s in rule dir %s has state %s", jobid, rule_key, state)
187+
188+
rule_to_jobs.setdefault(rule_key, []).append((jobid, log_path, state))
189+
190+
return rule_to_jobs, unknown
191+
192+
193+
def classify_jobs(
194+
rule_to_jobs: Dict[str, List[Tuple[str, Path, Optional[str]]]]
195+
) -> Tuple[
196+
List[Tuple[str, Path]],
197+
List[Tuple[str, Path]],
198+
List[Tuple[str, Path, str, str]],
199+
]:
200+
"""
201+
Classify jobs into:
202+
- failed (no successful retry for that rule) -> **only the latest attempt**
203+
- cancelled (no successful retry for that rule) -> **only the latest attempt**
204+
- resolved_failures (failed/cancelled but with a successful retry)
205+
"""
206+
failed: List[Tuple[str, Path]] = []
207+
cancelled: List[Tuple[str, Path]] = []
208+
resolved_failures: List[Tuple[str, Path, str, str]] = []
209+
210+
for rule_key, jobs in rule_to_jobs.items():
211+
# jobs for this rule are in ascending jobid order (because of sorted(...) in group_jobs_by_rule)
212+
has_success = any(state in SUCCESS_STATES for _, _, state in jobs)
213+
214+
# All failure-like attempts for this rule
215+
failure_like_jobs = [
216+
(jobid, log_path, state)
217+
for jobid, log_path, state in jobs
218+
if state in FAILURE_LIKE_STATES
219+
]
220+
221+
if not failure_like_jobs:
222+
# Nothing to do for this rule
223+
continue
224+
225+
if has_success:
226+
# Rule eventually succeeded; treat *all* failure-like attempts as resolved.
227+
for jobid, log_path, state in failure_like_jobs:
228+
resolved_failures.append((jobid, log_path, state, rule_key))
229+
else:
230+
# Rule never succeeded: only report the **latest** failure-like attempt.
231+
latest_jobid, latest_log_path, latest_state = failure_like_jobs[-1]
232+
if latest_state == "FAILED":
233+
failed.append((latest_jobid, latest_log_path))
234+
else:
235+
# CANCELLED, TIMEOUT, OUT_OF_MEMORY, ...
236+
cancelled.append((latest_jobid, latest_log_path))
237+
238+
return failed, cancelled, resolved_failures
239+
240+
108241
@click.command()
109242
@click.argument(
110243
"log_dir", type=click.Path(exists=True, file_okay=False, path_type=Path)
@@ -128,7 +261,11 @@ def write_results(
128261
def check_failed_jobs(log_dir: Path, output: Path, log_level: str) -> None:
129262
"""
130263
Recursively scan LOG_DIR for SLURM *.log files (stdout+stderr combined),
131-
extract job IDs from filenames, and check their states via `scontrol show job JOBID`.
264+
extract job IDs from filenames, and check their states via `sacct`.
265+
266+
If multiple jobs share the same rule log directory and at least one of them
267+
completes successfully, earlier failures in that directory are reported under
268+
a separate heading as “Failed jobs with successful retry”.
132269
"""
133270
logging.basicConfig(
134271
level=getattr(logging, log_level.upper(), logging.INFO),
@@ -138,33 +275,14 @@ def check_failed_jobs(log_dir: Path, output: Path, log_level: str) -> None:
138275
LOG.info("Scanning logs under: %s", log_dir)
139276
job_logs = find_job_logs(log_dir)
140277

141-
failed: List[Tuple[str, Path]] = []
142-
cancelled: List[Tuple[str, Path]] = []
143-
unknown: List[str] = []
144-
145278
if not job_logs:
146279
LOG.warning("No job logs found (no files matching '*.log')")
147280
return
148281

149-
for jobid in sorted(job_logs.keys(), key=int):
150-
out_text = get_job_state(jobid)
151-
if not out_text:
152-
# Can't classify without job info; skip but note it.
153-
LOG.warning(
154-
f"Missing scontrol output for job {jobid} -- setting status UNKNOWN"
155-
)
156-
unknown.append(jobid)
157-
continue
158-
159-
state = parse_state(out_text)
160-
if state == "FAILED":
161-
failed.append((jobid, job_logs[jobid]))
162-
elif state in ["CANCELLED", "TIMEOUT", "OUT_OF_MEMORY"]:
163-
cancelled.append((jobid, job_logs[jobid]))
164-
else:
165-
LOG.debug(f"Job {jobid} state is {state}")
282+
rule_to_jobs, unknown = group_jobs_by_rule(log_dir, job_logs)
283+
failed, cancelled, resolved_failures = classify_jobs(rule_to_jobs)
166284

167-
write_results(output, failed, cancelled, unknown)
285+
write_results(output, failed, cancelled, unknown, resolved_failures)
168286

169287

170288
if __name__ == "__main__":

BALSAMIC/commands/config/case.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,12 @@ def case_config(
151151

152152
LOG.info(f"Starting configuring analysis case: {case_id}.")
153153

154+
if exome and not panel_bed:
155+
raise click.BadParameter(
156+
"If --exome is provided, --panel-bed must also be provided.",
157+
param_hint=["--panel-bed"],
158+
)
159+
154160
LOG.info(f"Creating case analysis directory: {analysis_dir}/{case_id}.")
155161
Path(analysis_dir, case_id).mkdir(exist_ok=True)
156162

BALSAMIC/commands/options.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
from BALSAMIC.constants.paths import WORKFLOW_PROFILE, CACHE_PROFILE
2424
from BALSAMIC.utils.cli import (
2525
validate_cache_version,
26-
validate_exome_option,
2726
validate_umi_min_reads,
2827
)
2928

@@ -222,7 +221,6 @@
222221
is_flag=True,
223222
default=False,
224223
help="Assign exome parameters to TGA workflow",
225-
callback=validate_exome_option,
226224
)
227225

228226
OPTION_FASTQ_PATH = click.option(

BALSAMIC/constants/analysis.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ class LogFile:
1414
class SubmitSnakemake:
1515
"""Constants for sbatch script running snakemake on cluster"""
1616

17-
MAX_RUN_HOURS: int = 120
17+
MAX_RUN_HOURS: int = 168
1818

1919

2020
class SnakemakeDAG:

BALSAMIC/constants/workflow_profile/config.yaml

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,10 @@ max-status-checks-per-second: 1
66
max-jobs-per-second: 1
77
restart-times: 2
88

9-
default-resources:
10-
runtime: 120
11-
mem_mb: 4000
12-
slurm_partition: "core"
13-
149
set-resources:
10+
all:
11+
runtime: 60
12+
mem_mb: 4000
1513
post_process_tnscope_info_fields_wgs:
1614
runtime: 60
1715
mem_mb: min(230000, 5000 * attempt)
@@ -471,7 +469,7 @@ set-resources:
471469
mem_mb: min(230000, 25000 * attempt)
472470
runtime: 1440
473471
samtools_fixmate:
474-
mem_mb: min(230000, 50000 * attempt)
472+
mem_mb: min(230000, 80000 * attempt)
475473
runtime: 560
476474
bam_compress_tumor:
477475
mem_mb: min(230000, 8000 * attempt)

BALSAMIC/models/sbatchsubmitter.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ def _build_sbatch_header(self) -> str:
6565
- optional partition if `self.headjob_partition` is set
6666
"""
6767
lines = [
68-
"#!/bin/bash -l",
68+
"#!/bin/bash",
6969
f"#SBATCH --account={self.account}",
7070
f"#SBATCH --job-name=BALSAMIC_snakemake_submit.{self.case_id}.%j",
7171
f"#SBATCH --output={self.log_path}/BALSAMIC_snakemake_submit.{self.case_id}.%j.out",

BALSAMIC/models/snakemake.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ def get_command(self) -> str:
137137

138138
def get_slurm_job_arguments(self) -> str:
139139
return (
140-
f'--default-resources slurm_extra="--qos={self.qos}" '
140+
f'--default-resources slurm_extra="--qos={self.qos}" runtime=120 mem_mb=4000 '
141141
f"slurm_partition={self.workflow_partition} slurm_account={self.account}"
142142
)
143143

BALSAMIC/snakemake_rules/annotation/somatic_sv_annotation.rule

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ rule vep_somatic_research_sv:
1212
benchmark:
1313
Path(benchmark_dir, "vep_somatic_research_sv." + config["analysis"]["case_id"] + ".svdb.tsv").as_posix()
1414
resources:
15-
mem_mb = lambda wc: (140000 if config_model.analysis.sequencing_type == SequencingType.WGS else 35000)
15+
mem_mb = lambda wc, attempt: min(230000, (140000 if config_model.analysis.sequencing_type == SequencingType.WGS else 35000) * attempt)
1616
singularity:
1717
Path(singularity_image, config["bioinfo_tools"].get("ensembl-vep") + ".sif").as_posix()
1818
params:

0 commit comments

Comments
 (0)