Skip to content

Commit b98195e

Browse files
committed
fix: stream zstd writes and raise memory in parse_kraken2_reads
OOM kills were occurring because all read rows were accumulated in a list before writing, adding gigabytes of Python tuples on top of the ~2.8 GB already used by the taxonomy dicts. Rows are now written to the zstd compressor immediately as each line is parsed. Also raises the default machine_mem_gb from 8 to 16 to provide adequate headroom for the taxonomy dict overhead (~2.8M entries across 4 dicts).
1 parent 9f3d1d9 commit b98195e

File tree

1 file changed

+40
-51
lines changed

1 file changed

+40
-51
lines changed

pipes/WDL/tasks/tasks_metagenomics.wdl

Lines changed: 40 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1986,7 +1986,7 @@ task parse_kraken2_reads {
19861986
else sub(basename(kraken2_reads_output), "\\.kraken2\\.reads\\.txt(\\.gz)?$", "")
19871987
Boolean resolve_strains = false
19881988

1989-
Int machine_mem_gb = 8
1989+
Int machine_mem_gb = 16
19901990
String docker = "quay.io/broadinstitute/py3-bio:0.1.5"
19911991
}
19921992

@@ -2130,70 +2130,59 @@ task parse_kraken2_reads {
21302130
classified_count = 0
21312131
unclassified_count = 0
21322132
2133-
# Collect rows
2134-
rows = []
2135-
2136-
try:
2137-
for line in f:
2138-
# Skip empty lines
2139-
line = line.strip()
2140-
if not line:
2141-
continue
2142-
2143-
# Parse Kraken2 output format
2144-
# Format: C/U <read_id> <taxid> <length> <kmer_info>
2145-
parts = line.split('\t')
2133+
cctx = zstd.ZstdCompressor()
2134+
with open(output_file, 'wb') as raw_f:
2135+
with cctx.stream_writer(raw_f) as compressor:
2136+
compressor.write(b'SAMPLE_ID\tREAD_ID\tTAXONOMY_ID\tTAX_NAME\tKINGDOM\tTAX_RANK\n')
2137+
try:
2138+
for line in f:
2139+
# Skip empty lines
2140+
line = line.strip()
2141+
if not line:
2142+
continue
21462143
2147-
if len(parts) < 3:
2148-
print(f"Warning: Skipping malformed line: {line[:100]}", file=sys.stderr)
2149-
continue
2144+
# Parse Kraken2 output format
2145+
# Format: C/U <read_id> <taxid> <length> <kmer_info>
2146+
parts = line.split('\t')
21502147
2151-
classification = parts[0].strip() # C or U
2152-
read_id = parts[1].strip()
2153-
taxid_str = parts[2].strip()
2148+
if len(parts) < 3:
2149+
print(f"Warning: Skipping malformed line: {line[:100]}", file=sys.stderr)
2150+
continue
21542151
2155-
# Handle unclassified reads (taxid = 0)
2156-
try:
2157-
taxid = int(taxid_str)
2158-
except ValueError:
2159-
print(f"Warning: Invalid taxid '{taxid_str}' for read {read_id}", file=sys.stderr)
2160-
continue
2152+
classification = parts[0].strip() # C or U
2153+
read_id = parts[1].strip()
2154+
taxid_str = parts[2].strip()
21612155
2162-
if classification == 'U':
2163-
unclassified_count += 1
2164-
tax_name = 'Unclassified'
2165-
kingdom = 'Unclassified'
2166-
tax_rank = 'unclassified'
2167-
else:
2168-
classified_count += 1
2169-
tax_name = tax_db.get_name(taxid)
2170-
kingdom = tax_db.get_kingdom(taxid)
2171-
tax_rank = tax_db.get_rank(taxid, resolve_strains=resolve_strains)
2156+
# Handle unclassified reads (taxid = 0)
2157+
try:
2158+
taxid = int(taxid_str)
2159+
except ValueError:
2160+
print(f"Warning: Invalid taxid '{taxid_str}' for read {read_id}", file=sys.stderr)
2161+
continue
21722162
2173-
rows.append((sample_id, read_id, taxid, tax_name, kingdom, tax_rank))
2163+
if classification == 'U':
2164+
unclassified_count += 1
2165+
tax_name = 'Unclassified'
2166+
kingdom = 'Unclassified'
2167+
tax_rank = 'unclassified'
2168+
else:
2169+
classified_count += 1
2170+
tax_name = tax_db.get_name(taxid)
2171+
kingdom = tax_db.get_kingdom(taxid)
2172+
tax_rank = tax_db.get_rank(taxid, resolve_strains=resolve_strains)
21742173
2175-
finally:
2176-
f.close()
2174+
row = (sample_id, read_id, taxid, tax_name, kingdom, tax_rank)
2175+
compressor.write(('\t'.join(str(v) for v in row) + '\n').encode('utf-8'))
21772176
2178-
# Write output as TSV
2179-
_write_tsv(rows, output_file)
2177+
finally:
2178+
f.close()
21802179
21812180
print(f"\nProcessing complete:", file=sys.stderr)
21822181
print(f" Classified reads: {classified_count}", file=sys.stderr)
21832182
print(f" Unclassified reads: {unclassified_count}", file=sys.stderr)
21842183
print(f" Total reads: {classified_count + unclassified_count}", file=sys.stderr)
21852184
21862185
2187-
def _write_tsv(rows, output_file):
2188-
"""Write rows as zstd-compressed TSV."""
2189-
cctx = zstd.ZstdCompressor()
2190-
with open(output_file, 'wb') as raw_f:
2191-
with cctx.stream_writer(raw_f) as compressor:
2192-
compressor.write(b'SAMPLE_ID\tREAD_ID\tTAXONOMY_ID\tTAX_NAME\tKINGDOM\tTAX_RANK\n')
2193-
for row in rows:
2194-
compressor.write(('\t'.join(str(v) for v in row) + '\n').encode('utf-8'))
2195-
2196-
21972186
tax_db = DuckDBTaxonomyDatabase("~{taxonomy_db}", resolve_strains=~{true="True" false="False" resolve_strains})
21982187
parse_kraken2_output("~{kraken2_reads_output}", tax_db, "~{out_compressed}", "~{sample_id}")
21992188
CODE

0 commit comments

Comments
 (0)