-
Notifications
You must be signed in to change notification settings - Fork 115
Expand file tree
/
Copy pathget_wgs_median_coverage.wdl
More file actions
188 lines (159 loc) · 5.34 KB
/
get_wgs_median_coverage.wdl
File metadata and controls
188 lines (159 loc) · 5.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
version 1.0
workflow get_wgs_median_coverage {
input {
File wgs_metrics_tsv
String output_tsv_name = "wgs_median_coverage.tsv"
Int num_shards = 64
}
call split_wgs_tsv as split_input {
input:
wgs_metrics_tsv = wgs_metrics_tsv,
num_shards = num_shards
}
scatter (part in split_input.parts) {
call get_wgs_median_coverage_chunk as chunk_res {
input:
wgs_metrics_tsv = part
}
}
call merge_tsvs as merge_results {
input:
part_tsvs = chunk_res.median_coverage_tsv,
output_tsv_name = output_tsv_name
}
output {
File median_coverage_tsv = merge_results.merged_tsv
}
}
# Split a 2-column TSV (research_id, metrics_path) into N shards, preserving header.
task split_wgs_tsv {
input {
File wgs_metrics_tsv
Int num_shards = 64
# Runtime parameters
Int memory_gb = 4
Int cpu = 1
Int disk_gb = 20
}
command <<<
set -euxo pipefail
mkdir -p parts
HEADER=$(head -n 1 "~{wgs_metrics_tsv}")
TOTAL_LINES=$(wc -l < "~{wgs_metrics_tsv}")
if [ "$TOTAL_LINES" -lt 2 ]; then
echo "Input TSV has no data rows" >&2
exit 1
fi
# Compute data rows and desired shard count
DATA_LINES=$((TOTAL_LINES - 1))
NUM=~{num_shards}
echo "TOTAL_LINES=$TOTAL_LINES DATA_LINES=$DATA_LINES NUM=$NUM" >&2
# Split data rows into ~NUM balanced chunks using GNU split lines mode
# Note: -n l/NUM requires a regular file (not stdin), so write data rows to a temp file first
tail -n +2 "~{wgs_metrics_tsv}" > data_rows.tsv
split -d -n l/$NUM data_rows.tsv parts/shard_
echo "Produced $(ls parts/shard_* 2>/dev/null | wc -l | tr -d ' ') raw parts" >&2
for f in parts/shard_*; do
# Prepend header
mv "$f" "$f.body"
printf '%s\n' "$HEADER" > "$f.tsv"
cat "$f.body" >> "$f.tsv"
rm -f "$f.body"
done
>>>
output {
Array[File] parts = glob("parts/shard_*.tsv")
}
runtime {
docker: "us.gcr.io/broad-gotc-prod/warp-tools:2.6.1"
memory: memory_gb + " GB"
cpu: cpu
disks: "local-disk " + disk_gb + " HDD"
}
}
# Extract WGS median coverage for a shard of the input TSV.
task get_wgs_median_coverage_chunk {
input {
File wgs_metrics_tsv
String output_tsv_name = "wgs_median_coverage.tsv"
# Runtime parameters (adjust as needed)
Int memory_gb = 4
Int cpu = 1
Int disk_gb = 20
}
command <<<
set -euxo pipefail
python3 <<'EOF'
import sys
import csv
import subprocess
import shlex
in_path = "~{wgs_metrics_tsv}"
out_path = "~{output_tsv_name}"
# Stream TSV and process two columns per row: research_id, metrics_file_path
with open(in_path, newline='') as f_in, open(out_path, 'w', newline='') as f_out:
reader = csv.reader(f_in, delimiter='\t')
writer = csv.writer(f_out, delimiter='\t')
header = next(reader, None)
# Write output header
writer.writerow(["research_id", "median_coverage"])
for row in reader:
if not row:
continue
research_id = row[0]
metrics_file_path = row[1]
try:
cmd = f"gsutil cat {shlex.quote(metrics_file_path)} | awk '/^[0-9]/ {{ print $4; exit }}'"
result = subprocess.run(cmd, shell=True, check=True, capture_output=True, text=True)
median_coverage = result.stdout.strip()
except Exception as e:
print(f"ERROR: Failed to extract median coverage for {research_id} from {metrics_file_path}: {e}", file=sys.stderr)
raise
writer.writerow([research_id, median_coverage])
print(f"Wrote {out_path}")
EOF
>>>
output {
File median_coverage_tsv = output_tsv_name
}
runtime {
docker: "us.gcr.io/broad-gotc-prod/warp-tools:2.6.1"
memory: memory_gb + " GB"
cpu: cpu
disks: "local-disk " + disk_gb + " HDD"
}
}
# Merge many per-shard TSVs into a single TSV, keeping a single header.
task merge_tsvs {
input {
Array[File] part_tsvs
String output_tsv_name = "wgs_median_coverage.tsv"
Int memory_gb = 2
Int cpu = 1
Int disk_gb = 10
}
command <<<
set -euxo pipefail
# Sort file list for reproducibility (avoid executing paths as commands)
for f in ~{sep=' ' part_tsvs}; do echo "$f"; done | sort > filelist.txt
first=true
while IFS= read -r f; do
if $first; then
cat "$f" > "~{output_tsv_name}"
first=false
else
tail -n +2 "$f" >> "~{output_tsv_name}"
fi
done < filelist.txt
ls -lh "~{output_tsv_name}"
>>>
output {
File merged_tsv = output_tsv_name
}
runtime {
docker: "us.gcr.io/broad-gotc-prod/warp-tools:2.6.1"
memory: memory_gb + " GB"
cpu: cpu
disks: "local-disk " + disk_gb + " HDD"
}
}