Skip to content

Commit 53c14c7

Browse files
authored
feat: submit method upgrade snakemake (#1604)
#### Added - sbatch script for snakemake headjob - added code to run the snakemake submitter interactively, that is, without submitting it to the cluster - slurm-executor-plugin to install instructions - added some more logging and a logfile for the balsamic wrapper - added analysis_status.txt for failed or cancelled jobs, including failed QC metrics #### Changed - cluster resources were moved from a json to a cluster_workflow profile - rule resources were more optimised based on benchmark metrics (with memory added to each rule) - renamed some rules to allow for workflow specific resource allocation - removed threads from all rules - updated balsamic to v9.9.0 #### Removed - removed mailing functionality - removed unused code for benchmark plotting - removed code to disable variant callers as we never use this functionality and are not targeted towards researchers
1 parent 828a627 commit 53c14c7

File tree

134 files changed

+2581
-2921
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

134 files changed

+2581
-2921
lines changed

BALSAMIC/assets/scripts/immediate_submit.py

Lines changed: 0 additions & 78 deletions
This file was deleted.
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
#!/usr/bin/env python3
2+
from __future__ import annotations
3+
4+
import logging
5+
import re
6+
import subprocess
7+
from pathlib import Path
8+
from typing import Dict, List, Tuple, Optional
9+
from datetime import datetime
10+
import click
11+
12+
13+
LOG = logging.getLogger(__name__)
14+
15+
16+
def find_job_logs(log_root: Path) -> Dict[str, Path]:
17+
"""
18+
Recursively find *.log files whose basename is a numeric jobid.
19+
Returns {jobid -> log_path}.
20+
"""
21+
job_logs: Dict[str, Path] = {}
22+
for p in log_root.rglob("*.log"):
23+
if p.stem.isdigit(): # e.g. "9727982.log" -> "9727982"
24+
job_logs[p.stem] = p
25+
else:
26+
LOG.debug(f"Skipping non-job log file: {p}")
27+
LOG.info(f"Discovered {len(job_logs)} job logs under {log_root}")
28+
return job_logs
29+
30+
31+
def get_job_state(jobid: str) -> Optional[str]:
32+
"""
33+
Return raw output of `scontrol show job JOBID`, or None if the query fails.
34+
"""
35+
try:
36+
LOG.debug(f"Running show job scontrol {jobid}")
37+
result = subprocess.run(
38+
["scontrol", "show", "job", jobid],
39+
capture_output=True,
40+
text=True,
41+
check=True,
42+
)
43+
return result.stdout
44+
except FileNotFoundError:
45+
LOG.error("scontrol executable not found: scontrol")
46+
return None
47+
except subprocess.CalledProcessError as e:
48+
LOG.warning(f"Could not check job {jobid} (may not exist). rc={e.returncode}")
49+
LOG.debug(f"scontrol stderr for {jobid} {e.stderr}")
50+
return None
51+
52+
53+
def parse_state(scontrol_output: str) -> Optional[str]:
54+
"""
55+
Extract JobState from scontrol text, e.g. 'JobState=FAILED'.
56+
Returns the state string (e.g. 'FAILED') or None if not found.
57+
"""
58+
m = re.search(r"JobState=(\S+)", scontrol_output)
59+
state = m.group(1) if m else None
60+
if state is None:
61+
LOG.debug("JobState not found in scontrol output")
62+
return state
63+
64+
65+
def write_results(
66+
output_file: Path,
67+
failed: List[Tuple[str, Path]],
68+
cancelled: List[Tuple[str, Path]],
69+
unknown: List[str],
70+
) -> None:
71+
"""
72+
Append job results to output_file.
73+
Each run is prefixed with a timestamp header.
74+
"""
75+
output_file.parent.mkdir(parents=True, exist_ok=True)
76+
77+
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
78+
79+
with output_file.open("a") as out_f:
80+
out_f.write(f"=== Job status check at {timestamp} ===\n")
81+
82+
if failed:
83+
out_f.write("Failed jobs:\n")
84+
for jobid, log_path in failed:
85+
out_f.write(f"{jobid}\t{log_path}\n")
86+
out_f.write("\n")
87+
88+
if cancelled:
89+
out_f.write("Cancelled jobs:\n")
90+
for jobid, log_path in cancelled:
91+
out_f.write(f"{jobid}\t{log_path}\n")
92+
out_f.write("\n")
93+
94+
if unknown:
95+
out_f.write("Unknown status jobs:\n")
96+
for jobid in unknown:
97+
out_f.write(f"{jobid}\tNA\n")
98+
out_f.write("\n")
99+
100+
if not failed and not cancelled:
101+
out_f.write("SUCCESSFUL\n\n")
102+
103+
LOG.info(
104+
f"Appended results to {output_file} (failed={len(failed)}, cancelled={len(cancelled)} unknown={len(unknown)})"
105+
)
106+
107+
108+
@click.command()
109+
@click.argument(
110+
"log_dir", type=click.Path(exists=True, file_okay=False, path_type=Path)
111+
)
112+
@click.option(
113+
"--output",
114+
"-o",
115+
required=True,
116+
type=click.Path(writable=True, path_type=Path),
117+
help="Path to output file for results (FAILED/CANCELLED or SUCCESS).",
118+
)
119+
@click.option(
120+
"--log-level",
121+
default="INFO",
122+
show_default=True,
123+
type=click.Choice(
124+
["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG"], case_sensitive=False
125+
),
126+
help="Logging verbosity.",
127+
)
128+
def check_failed_jobs(log_dir: Path, output: Path, log_level: str) -> None:
129+
"""
130+
Recursively scan LOG_DIR for SLURM *.log files (stdout+stderr combined),
131+
extract job IDs from filenames, and check their states via `scontrol show job JOBID`.
132+
"""
133+
logging.basicConfig(
134+
level=getattr(logging, log_level.upper(), logging.INFO),
135+
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
136+
)
137+
138+
LOG.info("Scanning logs under: %s", log_dir)
139+
job_logs = find_job_logs(log_dir)
140+
141+
failed: List[Tuple[str, Path]] = []
142+
cancelled: List[Tuple[str, Path]] = []
143+
unknown: List[str] = []
144+
145+
if not job_logs:
146+
LOG.warning("No job logs found (no files matching '*.log')")
147+
return
148+
149+
for jobid in sorted(job_logs.keys(), key=int):
150+
out_text = get_job_state(jobid)
151+
if not out_text:
152+
# Can't classify without job info; skip but note it.
153+
LOG.warning(
154+
f"Missing scontrol output for job {jobid} -- setting status UNKNOWN"
155+
)
156+
unknown.append(jobid)
157+
continue
158+
159+
state = parse_state(out_text)
160+
if state == "FAILED":
161+
failed.append((jobid, job_logs[jobid]))
162+
elif state == "CANCELLED":
163+
cancelled.append((jobid, job_logs[jobid]))
164+
else:
165+
LOG.debug(f"Job {jobid} state is {state}")
166+
167+
write_results(output, failed, cancelled, unknown)
168+
169+
170+
if __name__ == "__main__":
171+
check_failed_jobs()

BALSAMIC/commands/config/case.py

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,12 @@
4040
OPTION_TUMOR_SAMPLE_NAME,
4141
OPTION_UMI_MIN_READS,
4242
)
43-
from BALSAMIC.constants.analysis import BIOINFO_TOOL_ENV, AnalysisWorkflow, Gender
43+
from BALSAMIC.constants.analysis import (
44+
BIOINFO_TOOL_ENV,
45+
AnalysisWorkflow,
46+
Gender,
47+
LogFile,
48+
)
4449
from BALSAMIC.constants.cache import GenomeVersion
4550
from BALSAMIC.constants.constants import FileType
4651
from BALSAMIC.constants.paths import (
@@ -57,9 +62,11 @@
5762
get_panel_chrom,
5863
get_sample_list,
5964
get_gens_references,
65+
get_snakefile,
6066
)
6167
from BALSAMIC.utils.io import read_json, write_json
6268
from BALSAMIC.utils.utils import get_absolute_paths_dict
69+
from BALSAMIC.utils.logging import add_file_logging
6370

6471
LOG = logging.getLogger(__name__)
6572

@@ -129,6 +136,19 @@ def case_config(
129136
tumor_sample_name: str,
130137
umi_min_reads: str | None,
131138
):
139+
"""Configure BALSAMIC workflow based on input arguments."""
140+
141+
LOG.info(f"Starting configuring analysis case: {case_id}.")
142+
143+
LOG.info(f"Creating case analysis directory: {analysis_dir}/{case_id}.")
144+
Path(analysis_dir, case_id).mkdir(exist_ok=True)
145+
146+
log_file = Path(analysis_dir, case_id, LogFile.LOGNAME).as_posix()
147+
LOG.info(f"Setting BALSAMIC logfile path to: {log_file}.")
148+
add_file_logging(log_file, logger_name=__name__)
149+
150+
LOG.info(f"Running BALSAMIC version {balsamic_version} -- CONFIG CASE")
151+
132152
references_path: Path = Path(balsamic_cache, cache_version, genome_version)
133153
references: Dict[str, Path] = get_absolute_paths_dict(
134154
base_path=references_path,
@@ -154,7 +174,6 @@ def case_config(
154174
if path is not None
155175
}
156176
)
157-
158177
variants_observations = {
159178
"artefact_snv_observations": artefact_snv_observations,
160179
"clinical_snv_observations": clinical_snv_observations,
@@ -176,6 +195,8 @@ def case_config(
176195
analysis_fastq_dir: str = get_analysis_fastq_files_directory(
177196
case_dir=Path(analysis_dir, case_id).as_posix(), fastq_path=fastq_path
178197
)
198+
LOG.info(f"Prepared analysis fastq-dir: {analysis_fastq_dir}")
199+
179200
result_dir: Path = Path(analysis_dir, case_id, "analysis")
180201
log_dir: Path = Path(analysis_dir, case_id, "logs")
181202
script_dir: Path = Path(analysis_dir, case_id, "scripts")
@@ -186,6 +207,8 @@ def case_config(
186207
for directory in [result_dir, log_dir, script_dir, benchmark_dir]:
187208
directory.mkdir(exist_ok=True)
188209

210+
LOG.info("Created analysis and log directories.")
211+
LOG.info("Validating configuration data in pydantic model.")
189212
config_collection_dict = ConfigModel(
190213
sentieon={
191214
"sentieon_install_dir": sentieon_install_dir,
@@ -244,5 +267,10 @@ def case_config(
244267
write_json(json_obj=config_collection_dict, path=config_path)
245268
LOG.info(f"Config file saved successfully - {config_path}")
246269

247-
generate_graph(config_collection_dict, config_path)
270+
snakefile = get_snakefile(
271+
analysis_type=config_collection_dict["analysis"]["analysis_type"],
272+
analysis_workflow=config_collection_dict["analysis"]["analysis_workflow"],
273+
)
274+
275+
generate_graph(config_collection_dict, config_path, snakefile)
248276
LOG.info(f"BALSAMIC Workflow has been configured successfully!")

BALSAMIC/commands/config/pon.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
get_analysis_fastq_files_directory,
3636
get_bioinfo_tools_version,
3737
get_pon_sample_list,
38+
get_snakefile,
3839
)
3940
from BALSAMIC.utils.io import read_json, write_json
4041
from BALSAMIC.utils.utils import get_absolute_paths_dict
@@ -144,5 +145,9 @@ def pon_config(
144145
write_json(json_obj=config_collection_dict, path=config_path)
145146
LOG.info(f"PON config file saved successfully - {config_path}")
146147

147-
generate_graph(config_collection_dict, config_path)
148+
snakefile = get_snakefile(
149+
analysis_type=config_collection_dict["analysis"]["analysis_type"],
150+
analysis_workflow=config_collection_dict["analysis"]["analysis_workflow"],
151+
)
152+
generate_graph(config_collection_dict, config_path, snakefile)
148153
LOG.info(f"BALSAMIC PON workflow has been configured successfully!")

0 commit comments

Comments
 (0)