Skip to content

Commit eaada37

Browse files
carzeclaude
andcommitted
feat: update join_read_classifications to accept TSV and tarball inputs
- Accept TSV (and tar.zst/tar.gz tarballs) for kallisto_summary, kraken2_reads, and vnp_reads instead of Parquet-only - Add _resolve_file() helper to extract single-file tarballs (zstd and other compression formats) to a temp dir before loading - Add _duckdb_reader() helper to dispatch read_parquet vs read_csv based on file extension - Kallisto input is now single-sample TSV; remove SAMPLE_ID filter - Update parameter_meta patterns to reflect accepted formats Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent e8cee77 commit eaada37

1 file changed

Lines changed: 57 additions & 18 deletions

File tree

pipes/WDL/tasks/tasks_metagenomics.wdl

Lines changed: 57 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2506,18 +2506,18 @@ task join_read_classifications {
25062506

25072507
parameter_meta {
25082508
kallisto_summary: {
2509-
description: "Kallisto summary in Parquet format. Multi-sample file filtered by sample_id at runtime. Skipped if not provided or empty.",
2510-
patterns: ["*.parquet"],
2509+
description: "Kallisto summary TSV for a single sample, or a tar.zst/tar.gz tarball containing one such file. Skipped if not provided or empty.",
2510+
patterns: ["*.tsv", "*.tar.zst", "*.tar.gz"],
25112511
category: "common"
25122512
}
25132513
kraken2_reads: {
2514-
description: "Kraken2 per-read classifications in Parquet format. Filtered by sample_id at runtime. Skipped if not provided or empty.",
2515-
patterns: ["*.parquet"],
2514+
description: "Kraken2 per-read classifications in Parquet or TSV format, or a tar.zst/tar.gz tarball containing one such file. Filtered by sample_id at runtime. Skipped if not provided or empty.",
2515+
patterns: ["*.parquet", "*.tsv", "*.tar.zst", "*.tar.gz"],
25162516
category: "common"
25172517
}
25182518
vnp_reads: {
2519-
description: "VirNucPro per-read classifications in Parquet format. All rows used (not filtered by sample_id). Skipped if not provided or empty.",
2520-
patterns: ["*.parquet"],
2519+
description: "VirNucPro per-read classifications in Parquet or TSV format, or a tar.zst/tar.gz tarball containing one such file. All rows used (not filtered by sample_id). Skipped if not provided or empty.",
2520+
patterns: ["*.parquet", "*.tsv", "*.tar.zst", "*.tar.gz"],
25212521
category: "common"
25222522
}
25232523
genomad_virus_summary: {
@@ -2535,19 +2535,52 @@ task join_read_classifications {
25352535

25362536
command <<<
25372537
set -e
2538-
pip install duckdb --quiet --no-cache-dir
2538+
pip install duckdb zstandard --quiet --no-cache-dir
25392539
python3<<CODE
25402540
import os
25412541
import sys
2542+
import tarfile
2543+
import tempfile
25422544
25432545
import duckdb
2546+
import zstandard as zstd
25442547
25452548
25462549
def _file_is_usable(path):
25472550
"""Return True if path is provided, exists, and is non-empty."""
25482551
return bool(path) and os.path.isfile(path) and os.path.getsize(path) > 0
25492552
25502553
2554+
def _resolve_file(path):
2555+
"""If path is a tarball, extract it to a temp dir and return the single inner file path.
2556+
Supports .tar.zst, .tar.gz, .tar.bz2, and .tar. Otherwise returns path unchanged."""
2557+
if not any(path.endswith(ext) for ext in ('.tar.zst', '.tar.gz', '.tar.bz2', '.tar')):
2558+
return path
2559+
extract_dir = tempfile.mkdtemp()
2560+
if path.endswith('.tar.zst'):
2561+
dctx = zstd.ZstdDecompressor()
2562+
with open(path, 'rb') as fh:
2563+
with dctx.stream_reader(fh) as reader:
2564+
with tarfile.open(fileobj=reader, mode='r|') as tar:
2565+
tar.extractall(path=extract_dir)
2566+
else:
2567+
with tarfile.open(path, 'r:*') as tar:
2568+
tar.extractall(path=extract_dir)
2569+
files = [os.path.join(extract_dir, f) for f in os.listdir(extract_dir)
2570+
if os.path.isfile(os.path.join(extract_dir, f))]
2571+
if len(files) != 1:
2572+
print(f"ERROR: expected exactly 1 file in tarball {path}, found {len(files)}", file=sys.stderr)
2573+
sys.exit(1)
2574+
return files[0]
2575+
2576+
2577+
def _duckdb_reader(path):
2578+
"""Return the appropriate DuckDB reader function call string for a given file path."""
2579+
if path.endswith('.parquet'):
2580+
return f"read_parquet('{path}')"
2581+
return f"read_csv('{path}', delim='\\t', header=true, auto_detect=true)"
2582+
2583+
25512584
summary = "~{default='__NONE__' kallisto_summary}"
25522585
vnp = "~{default='__NONE__' vnp_reads}"
25532586
kraken2 = "~{default='__NONE__' kraken2_reads}"
@@ -2560,6 +2593,13 @@ has_vnp = _file_is_usable(vnp)
25602593
has_kraken2 = _file_is_usable(kraken2)
25612594
has_genomad = _file_is_usable(virus_summary)
25622595
2596+
if has_kallisto:
2597+
summary = _resolve_file(summary)
2598+
if has_vnp:
2599+
vnp = _resolve_file(vnp)
2600+
if has_kraken2:
2601+
kraken2 = _resolve_file(kraken2)
2602+
25632603
if not any([has_kallisto, has_vnp, has_kraken2, has_genomad]):
25642604
print("ERROR: All input files are missing or empty — nothing to join.", file=sys.stderr)
25652605
sys.exit(1)
@@ -2569,7 +2609,7 @@ con = duckdb.connect()
25692609
# ── 1. Kallisto summary ──────────────────────────────────────────────
25702610
if has_kallisto:
25712611
print(f"Reading Kallisto summary: {summary}", file=sys.stderr)
2572-
con.execute("""
2612+
con.execute(f"""
25732613
CREATE TABLE kallisto AS
25742614
SELECT
25752615
SAMPLE_ID,
@@ -2578,9 +2618,8 @@ if has_kallisto:
25782618
TAXONOMY_LINEAGE AS KALLISTO_TAXONOMY_LINEAGE,
25792619
TAXONOMY_NAME AS KALLISTO_TAXONOMY_NAME,
25802620
SEQUENCE_LENGTH AS KALLISTO_SEQUENCE_LENGTH
2581-
FROM read_parquet($1)
2582-
WHERE SAMPLE_ID = $2
2583-
""", [summary, sample_id])
2621+
FROM {_duckdb_reader(summary)}
2622+
""")
25842623
n_kallisto = con.execute("SELECT count(*) FROM kallisto").fetchone()[0]
25852624
print(f" {n_kallisto:,} Kallisto reads loaded.", file=sys.stderr)
25862625
else:
@@ -2596,7 +2635,7 @@ else:
25962635
# ── 2. VirNucPro reads ───────────────────────────────────────────────
25972636
if has_vnp:
25982637
print(f"Reading VirNucPro reads: {vnp}", file=sys.stderr)
2599-
con.execute("""
2638+
con.execute(f"""
26002639
CREATE TABLE vnp AS
26012640
SELECT
26022641
read_id AS READ_ID,
@@ -2617,8 +2656,8 @@ if has_vnp:
26172656
n_ambiguous AS VNP_N_AMBIGUOUS,
26182657
viral_proportion AS VNP_VIRAL_PROPORTION,
26192658
nonviral_proportion AS VNP_NONVIRAL_PROPORTION
2620-
FROM read_parquet($1)
2621-
""", [vnp])
2659+
FROM {_duckdb_reader(vnp)}
2660+
""")
26222661
n_vnp = con.execute("SELECT count(*) FROM vnp").fetchone()[0]
26232662
print(f" {n_vnp:,} VNP reads loaded.", file=sys.stderr)
26242663
else:
@@ -2674,16 +2713,16 @@ print(f" {n_kv:,} rows after Kallisto + VNP full outer join.", file=sys.stderr)
26742713
# the Kallisto/VNP side when joining.
26752714
if has_kraken2:
26762715
print(f"Reading Kraken2 reads: {kraken2}", file=sys.stderr)
2677-
con.execute("""
2716+
con.execute(f"""
26782717
CREATE TABLE k2 AS
26792718
SELECT
26802719
READ_ID,
26812720
TAXONOMY_ID AS K2_TAXONOMY_ID,
26822721
TAX_NAME AS K2_TAX_NAME,
26832722
KINGDOM AS K2_KINGDOM
2684-
FROM read_parquet($1)
2685-
WHERE SAMPLE_ID = $2
2686-
""", [kraken2, sample_id])
2723+
FROM {_duckdb_reader(kraken2)}
2724+
WHERE SAMPLE_ID = $1
2725+
""", [sample_id])
26872726
n_k2 = con.execute("SELECT count(*) FROM k2").fetchone()[0]
26882727
print(f" {n_k2:,} Kraken2 reads loaded.", file=sys.stderr)
26892728
else:

0 commit comments

Comments
 (0)