@@ -2393,7 +2393,7 @@ task parse_centrifuger_reads {
23932393 input {
23942394 File centrifuger_output
23952395 File taxonomy_db
2396- String sample_id = sub ( basename ( centrifuger_output ), " \\ .centrifuger \\ .tsv$" , "" )
2396+ String ? sample_id_override
23972397 Boolean resolve_strains = false
23982398
23992399 Int machine_mem_gb = 16
@@ -2411,8 +2411,8 @@ task parse_centrifuger_reads {
24112411 patterns : ["*.duckdb" ],
24122412 category : "required"
24132413 }
2414- sample_id : {
2415- description : "Sample identifier. Defaults to the filename stem after stripping .centrifuger.tsv." ,
2414+ sample_id_override : {
2415+ description : "Optional sample identifier. Defaults to the filename stem after stripping .centrifuger.tsv." ,
24162416 category : "common"
24172417 }
24182418 resolve_strains : {
@@ -2421,7 +2421,9 @@ task parse_centrifuger_reads {
24212421 }
24222422 }
24232423
2424- String out_compressed = sample_id + ".centrifuger_reads_classified.tsv.zst"
2424+ String derived_sample_id = sub (basename (centrifuger_output ), "\\ .centrifuger\\ .tsv$" , "" )
2425+ String sample_id = select_first ([sample_id_override , derived_sample_id ])
2426+ String out_compressed = sample_id + ".centrifuger_reads_classified.tsv.zst"
24252427
24262428 command <<<
24272429 set -e
@@ -3071,3 +3073,127 @@ CODE
30713073 maxRetries : 2
30723074 }
30733075}
3076+
3077+ task filter_viral_reads {
3078+ meta {
3079+ description : "Filter a joined read-classification parquet to retain reads with any viral signal (OR semantics across K2 kingdom, Kallisto hit, VNP viral call, and geNomad viral taxonomy). Emits a filtered parquet and a plain-text list of READ_IDs."
3080+ }
3081+
3082+ input {
3083+ File joined_parquet
3084+ String ? sample_id_override
3085+
3086+ Int machine_mem_gb = 16
3087+ String docker = "quay.io/broadinstitute/py3-bio:0.1.5"
3088+ }
3089+
3090+ parameter_meta {
3091+ joined_parquet : {
3092+ description : "Joined read-classification parquet (output of join_read_classifications). Must contain READ_ID, K2_KINGDOM, KALLISTO_DB_ID, VNP_CALL, and GENOMAD_TAXONOMY columns." ,
3093+ patterns : ["*.parquet" ],
3094+ category : "required"
3095+ }
3096+ sample_id_override : {
3097+ description : "Optional sample identifier. Defaults to the filename stem after stripping .read_classifications.parquet." ,
3098+ category : "common"
3099+ }
3100+ }
3101+
3102+ String derived_sample_id = sub (basename (joined_parquet ), "\\ .read_classifications\\ .parquet$" , "" )
3103+ String sample_id = select_first ([sample_id_override , derived_sample_id ])
3104+ String out_parquet = sample_id + ".viral_reads.parquet"
3105+ String out_read_ids = sample_id + ".viral_read_ids.txt"
3106+
3107+ command <<<
3108+ set -e
3109+ pip install duckdb --quiet --no-cache-dir
3110+ python3 << CODE
3111+ import sys
3112+ import duckdb
3113+
3114+ INPUT = "~{joined_parquet }"
3115+ OUT_PARQUET = "~{out_parquet }"
3116+ OUT_IDS = "~{out_read_ids }"
3117+
3118+ REQUIRED_COLUMNS = {
3119+ "READ_ID", "K2_KINGDOM", "KALLISTO_DB_ID", "VNP_CALL", "GENOMAD_TAXONOMY",
3120+ }
3121+
3122+ def _qsql(s):
3123+ return s.replace("'", "''")
3124+
3125+ con = duckdb.connect()
3126+ con.execute(f"CREATE VIEW input_reads AS SELECT * FROM read_parquet('{_qsql(INPUT)}')")
3127+
3128+ # -- Schema check ---------------------------------------------------------
3129+ cols = con.execute("DESCRIBE SELECT * FROM input_reads").fetchall()
3130+ col_names = {c[0] for c in cols}
3131+ missing = REQUIRED_COLUMNS - col_names
3132+ if missing:
3133+ print(f"ERROR: Missing required columns: {sorted(missing)}", file=sys.stderr)
3134+ sys.exit(1)
3135+
3136+ # -- Input stats ----------------------------------------------------------
3137+ n_input = con.execute("SELECT count(*) FROM input_reads").fetchone()[0]
3138+ print(f"Input: {n_input:,} rows", file=sys.stderr)
3139+
3140+ # -- Per-rule diagnostic counts ------------------------------------------
3141+ rule_counts = con.execute("""
3142+ SELECT
3143+ count(*) FILTER (WHERE K2_KINGDOM = 'Viruses') AS r1,
3144+ count(*) FILTER (WHERE KALLISTO_DB_ID IS NOT NULL AND KALLISTO_DB_ID != '') AS r2,
3145+ count(*) FILTER (WHERE VNP_CALL IN ('Viral', 'Unclassified', 'Ambiguous', 'Multi-mapped')) AS r3,
3146+ count(*) FILTER (WHERE starts_with(GENOMAD_TAXONOMY, 'Viruses')) AS r4
3147+ FROM input_reads
3148+ """).fetchone()
3149+
3150+ print(f" Rule 1 - K2_KINGDOM = 'Viruses': {rule_counts[0]:>10,}", file=sys.stderr)
3151+ print(f" Rule 2 - KALLISTO_DB_ID not empty: {rule_counts[1]:>10,}", file=sys.stderr)
3152+ print(f" Rule 3 - VNP_CALL in viral whitelist: {rule_counts[2]:>10,}", file=sys.stderr)
3153+ print(f" Rule 4 - GENOMAD_TAXONOMY ~ 'Viruses': {rule_counts[3]:>10,}", file=sys.stderr)
3154+
3155+ # -- Filter (OR semantics) -----------------------------------------------
3156+ con.execute("""
3157+ CREATE TABLE filtered AS
3158+ SELECT *
3159+ FROM input_reads
3160+ WHERE K2_KINGDOM = 'Viruses'
3161+ OR (KALLISTO_DB_ID IS NOT NULL AND KALLISTO_DB_ID != '')
3162+ OR VNP_CALL IN ('Viral', 'Unclassified', 'Ambiguous', 'Multi-mapped')
3163+ OR starts_with(GENOMAD_TAXONOMY, 'Viruses')
3164+ """)
3165+
3166+ n_filtered = con.execute("SELECT count(*) FROM filtered").fetchone()[0]
3167+ pct = (n_filtered / n_input * 100) if n_input else 0.0
3168+ print(f"\nFiltered: {n_filtered:,} rows ({pct:.1f}% of input)", file=sys.stderr)
3169+
3170+ # -- Write outputs --------------------------------------------------------
3171+ con.execute(f"COPY filtered TO '{_qsql(OUT_PARQUET)}' (FORMAT PARQUET, COMPRESSION ZSTD)")
3172+ print(f"Parquet: {OUT_PARQUET}", file=sys.stderr)
3173+
3174+ con.execute(f"""
3175+ COPY (SELECT READ_ID FROM filtered ORDER BY READ_ID)
3176+ TO '{_qsql(OUT_IDS)}' (FORMAT CSV, DELIMITER '\t', HEADER FALSE)
3177+ """)
3178+ print(f"Read IDs: {OUT_IDS}", file=sys.stderr)
3179+
3180+ con.close()
3181+ CODE
3182+ >>>
3183+
3184+ output {
3185+ File viral_reads_parquet = "~{out_parquet }"
3186+ File viral_read_ids = "~{out_read_ids }"
3187+ }
3188+
3189+ runtime {
3190+ docker : docker
3191+ memory : "~{machine_mem_gb } GB"
3192+ cpu : 2
3193+ disks : "local-disk ~{ceil (size (joined_parquet , 'GB' ) * 3 + 20 )} HDD"
3194+ disk : "~{ceil (size (joined_parquet , 'GB' ) * 3 + 20 )} GB"
3195+ dx_instance_type : "mem1_ssd1_v2_x2"
3196+ preemptible : 2
3197+ maxRetries : 2
3198+ }
3199+ }
0 commit comments