Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
75 commits
Select commit Hold shift + click to select a range
57f260c
Adjusted resources for full size dataset
jessicaway Oct 31, 2025
cf7f068
update process tsv step
jessicaway Nov 14, 2025
b070aeb
update process tsv task
jessicaway Nov 18, 2025
3697404
fix syntax error
jessicaway Nov 19, 2025
be721dc
add a wdl to get wgs med coverage
jessicaway Nov 19, 2025
0d3ab74
update syntax issues
jessicaway Nov 19, 2025
3a901a7
change required columns
jessicaway Nov 20, 2025
fbdab74
confirm issue is in freemix percent
jessicaway Nov 21, 2025
38d4ddf
let's try keeping all col and adding freemix
jessicaway Nov 24, 2025
13b12bd
use a docker with google sdk
jessicaway Nov 24, 2025
b5782a7
hopefully fix age issue
jessicaway Nov 25, 2025
9cc3d94
fix unbound variable issue
jessicaway Nov 25, 2025
486eb29
try out parallelization
jessicaway Dec 1, 2025
ebd5489
fix issues
jessicaway Dec 1, 2025
1ba379b
I tested it this time
jessicaway Dec 1, 2025
c554829
Add the median metrics from a sep tsv
jessicaway Dec 2, 2025
95daea0
adjust resources
jessicaway Dec 2, 2025
cf18807
add spark config to hail steps
jessicaway Dec 5, 2025
98371ff
give add annotations access to more mem
jessicaway Dec 10, 2025
b392fd5
swap out code for annotate_coverage
jessicaway Jan 9, 2026
ab31080
add option to skip summary
jessicaway Jan 12, 2026
bf93e1c
update combine vcfs step
jessicaway Jan 13, 2026
cffba9e
New strategy for combine VCFs
jessicaway Jan 16, 2026
fa5d5e0
update docker
jessicaway Jan 16, 2026
caacaa4
another fix
jessicaway Jan 16, 2026
6983b03
more updates for combine vcfs steps
jessicaway Jan 22, 2026
477e76f
try again
jessicaway Jan 22, 2026
ad901d6
more changes again
jessicaway Jan 22, 2026
d79634c
more changes again
jessicaway Jan 22, 2026
228427e
fix indentation issue
jessicaway Jan 23, 2026
9242c81
another update
jessicaway Jan 26, 2026
1b8af77
...
jessicaway Jan 26, 2026
1122ece
please work this time
jessicaway Jan 26, 2026
60bca17
try glob instead of read_lines
jessicaway Jan 26, 2026
07a27c5
fix syntax error
jessicaway Jan 26, 2026
429120f
update again
jessicaway Jan 26, 2026
3745aaf
again
jessicaway Jan 27, 2026
1016145
try making a change myself...
jessicaway Jan 27, 2026
071d9c7
simplify again
jessicaway Jan 27, 2026
4d6cfad
again
jessicaway Jan 27, 2026
0189df1
add debugging print
jessicaway Jan 27, 2026
f0b1d22
why is AI working against me - rawr
jessicaway Jan 27, 2026
117169e
fix shard naming
jessicaway Jan 27, 2026
79a097a
One step forward or two steps back?
jessicaway Jan 28, 2026
343789d
try to go back to the tar approach
jessicaway Jan 28, 2026
cad1e16
go back to tar implementation
jessicaway Jan 29, 2026
7696c85
simplify (I think)
jessicaway Jan 29, 2026
7461f5c
update code to mkae file name unique and conditionally run later rounds
jessicaway Jan 29, 2026
8c53e01
dumb
jessicaway Jan 29, 2026
15ea8dd
updata docker
jessicaway Jan 30, 2026
5f00bae
invalid workflow fix
jessicaway Jan 30, 2026
09d961e
dummy commit
jessicaway Jan 30, 2026
b8156de
supposedly valid WDL
jessicaway Jan 30, 2026
c363afe
was it the comment??
jessicaway Feb 2, 2026
69c4f4d
Is AI really helping??
jessicaway Feb 2, 2026
2711646
make comments accurate
jessicaway Feb 2, 2026
bc48776
update docker again
jessicaway Feb 2, 2026
078ed07
fix call caching and index ordering
jessicaway Feb 9, 2026
cb99f3d
update merge conditions
jessicaway Feb 9, 2026
1bce990
add last step back
jessicaway Feb 9, 2026
6ec26e0
Fix errors
jessicaway Feb 9, 2026
0bd085e
update add annotations
jessicaway Feb 10, 2026
b754d58
wdl fix
jessicaway Feb 10, 2026
1637945
update temp dir
jessicaway Feb 10, 2026
53f5009
pass the right folder
jessicaway Feb 10, 2026
7bcb1b5
another fix
jessicaway Feb 10, 2026
f94e1df
Update resources
jessicaway Feb 19, 2026
096e1ac
more refactors
jessicaway Feb 19, 2026
367ee35
fix error
jessicaway Feb 20, 2026
4c62669
Add the mtSwirl files and run from dockers build off these
jessicaway Feb 20, 2026
82f7c50
Merge branch 'develop' into jw_new_mt_coverage_merge_pipeline
jessicaway Feb 24, 2026
dafd47e
fix import statements
jessicaway Feb 24, 2026
2c6954e
Merge remote-tracking branch 'origin/jw_new_mt_coverage_merge_pipelin…
jessicaway Feb 24, 2026
0ba132c
fix typo
jessicaway Feb 24, 2026
a746b8d
fix typo again
jessicaway Feb 24, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .dockstore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ workflows:
subclass: WDL
primaryDescriptorPath: /all_of_us/cmrg/FixItFelixAndVariantCall.wdl

- name: get_wgs_median_coverage
subclass: WDL
primaryDescriptorPath: /all_of_us/mitochondria/get_wgs_median_coverage.wdl

- name: GvsAoUReblockGvcf
subclass: WDL
primaryDescriptorPath: /all_of_us/cmrg/GvsAoUReblockGvcf.wdl
Expand Down
188 changes: 188 additions & 0 deletions all_of_us/mitochondria/get_wgs_median_coverage.wdl
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
version 1.0

workflow get_wgs_median_coverage {

input {
File wgs_metrics_tsv
String output_tsv_name = "wgs_median_coverage.tsv"
Int num_shards = 64
}

call split_wgs_tsv as split_input {
input:
wgs_metrics_tsv = wgs_metrics_tsv,
num_shards = num_shards
}

scatter (part in split_input.parts) {
call get_wgs_median_coverage_chunk as chunk_res {
input:
wgs_metrics_tsv = part
}
}

call merge_tsvs as merge_results {
input:
part_tsvs = chunk_res.median_coverage_tsv,
output_tsv_name = output_tsv_name
}

output {
File median_coverage_tsv = merge_results.merged_tsv
}
}

# Split a 2-column TSV (research_id, metrics_path) into N shards, preserving header.
task split_wgs_tsv {
input {
File wgs_metrics_tsv
Int num_shards = 64

# Runtime parameters
Int memory_gb = 4
Int cpu = 1
Int disk_gb = 20
}

command <<<
set -euxo pipefail

mkdir -p parts
HEADER=$(head -n 1 "~{wgs_metrics_tsv}")
TOTAL_LINES=$(wc -l < "~{wgs_metrics_tsv}")
if [ "$TOTAL_LINES" -lt 2 ]; then
echo "Input TSV has no data rows" >&2
exit 1
fi

# Compute data rows and desired shard count
DATA_LINES=$((TOTAL_LINES - 1))
NUM=~{num_shards}
echo "TOTAL_LINES=$TOTAL_LINES DATA_LINES=$DATA_LINES NUM=$NUM" >&2

# Split data rows into ~NUM balanced chunks using GNU split lines mode
# Note: -n l/NUM requires a regular file (not stdin), so write data rows to a temp file first
tail -n +2 "~{wgs_metrics_tsv}" > data_rows.tsv
split -d -n l/$NUM data_rows.tsv parts/shard_
echo "Produced $(ls parts/shard_* 2>/dev/null | wc -l | tr -d ' ') raw parts" >&2
for f in parts/shard_*; do
# Prepend header
mv "$f" "$f.body"
printf '%s\n' "$HEADER" > "$f.tsv"
cat "$f.body" >> "$f.tsv"
rm -f "$f.body"
done
>>>

output {
Array[File] parts = glob("parts/shard_*.tsv")
}

runtime {
docker: "us.gcr.io/broad-gotc-prod/warp-tools:2.6.1"
memory: memory_gb + " GB"
cpu: cpu
disks: "local-disk " + disk_gb + " HDD"
}
}

# Extract WGS median coverage for a shard of the input TSV.
task get_wgs_median_coverage_chunk {
input {
File wgs_metrics_tsv
String output_tsv_name = "wgs_median_coverage.tsv"

# Runtime parameters (adjust as needed)
Int memory_gb = 4
Int cpu = 1
Int disk_gb = 20
}

command <<<
set -euxo pipefail

python3 <<'EOF'
import sys
import csv
import subprocess
import shlex

in_path = "~{wgs_metrics_tsv}"
out_path = "~{output_tsv_name}"

# Stream TSV and process two columns per row: research_id, metrics_file_path
with open(in_path, newline='') as f_in, open(out_path, 'w', newline='') as f_out:
reader = csv.reader(f_in, delimiter='\t')
writer = csv.writer(f_out, delimiter='\t')
header = next(reader, None)
# Write output header
writer.writerow(["research_id", "median_coverage"])

for row in reader:
if not row:
continue
research_id = row[0]
metrics_file_path = row[1]
try:
cmd = f"gsutil cat {shlex.quote(metrics_file_path)} | awk '/^[0-9]/ {{ print $4; exit }}'"
result = subprocess.run(cmd, shell=True, check=True, capture_output=True, text=True)
median_coverage = result.stdout.strip()
except Exception as e:
print(f"ERROR: Failed to extract median coverage for {research_id} from {metrics_file_path}: {e}", file=sys.stderr)
raise
writer.writerow([research_id, median_coverage])
print(f"Wrote {out_path}")
EOF
>>>

output {
File median_coverage_tsv = output_tsv_name
}

runtime {
docker: "us.gcr.io/broad-gotc-prod/warp-tools:2.6.1"
memory: memory_gb + " GB"
cpu: cpu
disks: "local-disk " + disk_gb + " HDD"
}
}

# Merge many per-shard TSVs into a single TSV, keeping a single header.
task merge_tsvs {
input {
Array[File] part_tsvs
String output_tsv_name = "wgs_median_coverage.tsv"

Int memory_gb = 2
Int cpu = 1
Int disk_gb = 10
}

command <<<
set -euxo pipefail

# Sort file list for reproducibility (avoid executing paths as commands)
for f in ~{sep=' ' part_tsvs}; do echo "$f"; done | sort > filelist.txt
first=true
while IFS= read -r f; do
if $first; then
cat "$f" > "~{output_tsv_name}"
first=false
else
tail -n +2 "$f" >> "~{output_tsv_name}"
fi
done < filelist.txt
ls -lh "~{output_tsv_name}"
>>>

output {
File merged_tsv = output_tsv_name
}

runtime {
docker: "us.gcr.io/broad-gotc-prod/warp-tools:2.6.1"
memory: memory_gb + " GB"
cpu: cpu
disks: "local-disk " + disk_gb + " HDD"
}
}
7 changes: 7 additions & 0 deletions all_of_us/mitochondria/mitochondria_pipeline.changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
# aou_9.0.0
2026-02-24

* minor refactor to rename directory where subworkflows and tasks are. This has no affect on pipeline outputs

* Version used to process v9 data

# beta release
2025-08-07 (Date of Last Commit)

Expand Down
11 changes: 5 additions & 6 deletions all_of_us/mitochondria/mitochondria_pipeline.wdl
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
version 1.0

import "../../all_of_us/mitochondria/subworkflows_and_tasks/AlignAndCallR1_v2_5_Single.wdl" as AlignAndCallR1_Single
import "../../all_of_us/mitochondria/subworkflows_and_tasks/AlignAndCallR2_v2_5_Single.wdl" as AlignAndCallR2_Single
import "../../all_of_us/mitochondria/subworkflows_and_tasks/LiftoverTools_v2_5_Single.wdl" as LiftoverTools_Single
import "../../all_of_us/mitochondria/subworkflows_and_tasks/ProduceSelfReferenceFiles_v2_5_Single.wdl" as ProduceSelfReferenceFiles_Single
import "../../all_of_us/mitochondria/subworkflows_and_tasks/MongoTasks_v2_5_Single.wdl" as MongoTasks_Single
import "../../all_of_us/mitochondria/single_sample_subworkflows_and_tasks/AlignAndCallR1_v2_5_Single.wdl" as AlignAndCallR1_Single
import "../../all_of_us/mitochondria/single_sample_subworkflows_and_tasks/AlignAndCallR2_v2_5_Single.wdl" as AlignAndCallR2_Single
import "../../all_of_us/mitochondria/single_sample_subworkflows_and_tasks/LiftoverTools_v2_5_Single.wdl" as LiftoverTools_Single
import "../../all_of_us/mitochondria/single_sample_subworkflows_and_tasks/ProduceSelfReferenceFiles_v2_5_Single.wdl" as ProduceSelfReferenceFiles_Single
import "../../all_of_us/mitochondria/single_sample_subworkflows_and_tasks/MongoTasks_v2_5_Single.wdl" as MongoTasks_Single

workflow MitochondriaPipeline {

Expand Down
50 changes: 50 additions & 0 deletions all_of_us/mitochondria/mtSwirl_refactor/Terra/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Docker image for Step 3 (combine VCFs + homref imputation from coverage.h5)
#
# This image MUST include:
# - Hail + Spark runtime (we deliberately inherit from the current AoU mito image)
# - mtSwirl source code at /opt/mtSwirl so WDL can call:
# python3 /opt/mtSwirl/generate_mtdna_call_mt/Terra/combine_vcfs_and_homref_from_covdb.py
#
# Build context should be the mtSwirl repo root:
# docker build -f generate_mtdna_call_mt/Terra/Dockerfile -t aou-mitochondrial-combine-vcfs-covdb:dev .

FROM us.gcr.io/broad-gotc-prod/aou-mitochondrial-annotate-coverage:1.0.0

ENV PYTHONUNBUFFERED=1 \
PYTHONPATH=/opt/mtSwirl

WORKDIR /opt/mtSwirl

# Add Google Cloud SDK tooling so WDL tasks can manually download gs:// inputs
# (e.g. shard MT tarballs) at runtime without relying on Cromwell localization.
# We install the minimal packages needed for `gcloud storage cp` and `gsutil cp`.
RUN apt-get update \
&& apt-get install -y --no-install-recommends \
ca-certificates \
curl \
gnupg \
apt-transport-https \
&& echo "deb [signed-by=/usr/share/keyrings/cloud.google.gpg] https://packages.cloud.google.com/apt cloud-sdk main" \
> /etc/apt/sources.list.d/google-cloud-sdk.list \
&& curl -fsSL https://packages.cloud.google.com/apt/doc/apt-key.gpg \
| gpg --dearmor -o /usr/share/keyrings/cloud.google.gpg \
&& apt-get update \
&& apt-get install -y --no-install-recommends google-cloud-cli \
&& rm -rf /var/lib/apt/lists/*

# Step 3 needs to read the v2 coverage DB (`coverage.h5`). The base image does not
# necessarily include h5py, so install it explicitly.
#
# Notes:
# - We keep this minimal to avoid bloating the Hail image.
# - h5py provides manylinux wheels for linux/amd64.
RUN python3 -m pip install --no-cache-dir \
h5py \
gcsfs

# Layer in the mtSwirl code. We copy the full package to avoid missing module imports.
# NOTE: build context should be the mtSwirl_refector directory.
COPY ./ /opt/mtSwirl/generate_mtdna_call_mt/

# Keep entrypoint neutral.
ENTRYPOINT ["python3"]
Empty file.
102 changes: 102 additions & 0 deletions all_of_us/mitochondria/mtSwirl_refactor/Terra/build_vcf_shard_mt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
#!/usr/bin/env python3
"""Build a multi-sample Hail MatrixTable shard from a shard manifest TSV.

Inputs
------
A shard manifest TSV with columns:
- s (sample ID)
- vcf (path, typically gs://...)

Outputs
-------
Writes a Hail MatrixTable directory at --out-mt.

Design goals
------------
* Keep the same schema/formatting as the existing combine Step 3 output so that
downstream merge stages are stable.
* Only do VCF import + basic canonicalization here.
* Do NOT apply covdb hom-ref imputation here (that happens once on the final MT).

"""

from __future__ import annotations

import argparse
import logging
import os

import hail as hl

from generate_mtdna_call_mt.merging_utils import vcf_merging


def _parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description="Import one VCF shard manifest into a Hail MT")
p.add_argument("--shard-tsv", required=True, help="Shard manifest TSV (columns: s, vcf)")
p.add_argument("--out-mt", required=True, help="Output MT path (directory ending in .mt recommended)")
p.add_argument("--temp-dir", required=True, help="Temp directory for Hail/Spark scratch")
p.add_argument("--chunk-size", type=int, default=100, help="Chunk size for hierarchical merge (default: 100)")
p.add_argument(
"--include-extra-v2-fields",
action="store_true",
help="Include v2.1 extra entry fields (AD/OriginalSelfRefAlleles/SwappedFieldIDs)",
)
p.add_argument(
"--n-final-partitions",
type=int,
default=128,
help="Target partitions for the shard MT (default: 128)",
)
p.add_argument("--overwrite", action="store_true")
return p.parse_args()


def main(args: argparse.Namespace) -> None:
logging.basicConfig(
level=logging.INFO,
format="%(levelname)s: %(asctime)s: %(message)s",
datefmt="%m/%d/%Y %I:%M:%S %p",
)
logger = logging.getLogger("build_vcf_shard_mt")

hl.init(tmp_dir=args.temp_dir)

logger.info("Reading shard manifest: %s", args.shard_tsv)
ht = hl.import_table(args.shard_tsv, types={"s": hl.tstr, "vcf": hl.tstr}, impute=False)
ht = ht.filter(hl.is_defined(ht.s) & hl.is_defined(ht.vcf) & (hl.len(ht.vcf) > 0))

pairs = ht.select("s", "vcf")
# Collect to driver: shard sizes are intentionally small (e.g., 2500)
rows = pairs.aggregate(hl.agg.collect(hl.struct(s=pairs.s, vcf=pairs.vcf)))
vcf_paths = {r.s: r.vcf for r in rows}

logger.info("Shard contains %d samples", len(vcf_paths))

if len(vcf_paths) == 0:
raise ValueError("Shard manifest produced 0 usable (s, vcf) rows")

out = args.out_mt
if hl.hadoop_exists(f"{out}/_SUCCESS") and not args.overwrite:
logger.info("Output exists, reading: %s", out)
_ = hl.read_matrix_table(out)
return

logger.info("Importing and merging VCFs for shard...")
combined_mt, _meta = vcf_merging(
vcf_paths=vcf_paths,
temp_dir=args.temp_dir,
logger=logger,
n_final_partitions=args.n_final_partitions,
chunk_size=args.chunk_size,
include_extra_v2_fields=args.include_extra_v2_fields,
num_merges=1,
single_sample=True,
)

logger.info("Checkpointing shard MT: %s", out)
combined_mt = combined_mt.naive_coalesce(args.n_final_partitions).checkpoint(out, overwrite=args.overwrite)


if __name__ == "__main__":
main(_parse_args())
Loading