diff --git a/README.md b/README.md index e18f50da..bd3bdae7 100644 --- a/README.md +++ b/README.md @@ -28,21 +28,11 @@ Follow these instructions to run the ncov-ingest pipeline _without_ all the bell ### GISAID -To pull sequences directly from GISAID, you are required to set two environment variables: +To add new GISAID sequences: -- `GISAID_API_ENDPOINT` -- `GISAID_USERNAME_AND_PASSWORD` - -Then run the ncov-ingest pipeline with the nextstrain CLI: - -```sh -nextstrain build \ - --image nextstrain/ncov-ingest \ - --env GISAID_API_ENDPOINT \ - --env GISAID_USERNAME_AND_PASSWORD \ - . \ - --configfile config/local_gisaid.yaml -``` +1. Download them from GISAID using the "Input for the Augur pipeline" option. This results in a file of the form `gisaid_auspice_input_hcov-19_2026_01_07_11.tar` (exact date will vary). +2. Upload this file to s3: `aws s3 cp /path/to/downloads/gisaid_auspice_input_hcov-19_2026_01_07_11.tar s3://nextstrain-ncov-private/gisaid-tars/unprocessed/` +3. Trigger the ncov-ingest GISAID action. After successful processing, tars are compressed with zstd and moved to `s3://nextstrain-ncov-private/gisaid-tars/processed/` (as `.tar.zst` files). ### GenBank diff --git a/bin/process-unprocessed-tars b/bin/process-unprocessed-tars new file mode 100755 index 00000000..d360ce83 --- /dev/null +++ b/bin/process-unprocessed-tars @@ -0,0 +1,157 @@ +#!/usr/bin/env python3 +""" +Download all unprocessed GISAID tar files from S3, process them to NDJSON, +and output a manifest of successfully processed tars. +""" +import io +import json +import shlex +import subprocess +import sys +import tarfile +import tempfile +from pathlib import Path + +from augur.io.json import load_ndjson + + +def process_tar_file(tar_path, output_file): + """ + Extract and process a single tar file, streaming records to output_file. + Returns the number of records processed, or None if processing failed. + """ + with tempfile.TemporaryDirectory() as extract_dir: + extract_path = Path(extract_dir) + + # Extract tar (strip leading slashes from absolute paths) + with tarfile.open(tar_path, 'r') as tar: + # Strip leading slashes to avoid extracting to root filesystem + members = tar.getmembers() + for member in members: + if member.name.startswith('/'): + member.name = member.name.lstrip('/') + # Use filter='data' to safely extract archives (Python 3.12+) + # Falls back to no filter for older Python versions + try: + tar.extractall(extract_path, members=members, filter='data') + except TypeError: + # Python < 3.12 doesn't support filter parameter + tar.extractall(extract_path, members=members) + + # Find metadata and sequences + metadata_files = list(extract_path.glob("**/*.metadata.tsv")) + sequence_files = list(extract_path.glob("**/*.sequences.fasta")) + + if not metadata_files or not sequence_files: + print(f" WARNING: Missing metadata or sequences in {tar_path.name}", file=sys.stderr) + return None + + if len(metadata_files) > 1 or len(sequence_files) > 1: + print(f" WARNING: Multiple metadata or sequence files found in {tar_path.name}, using first", file=sys.stderr) + + metadata_file = metadata_files[0] + sequence_file = sequence_files[0] + + # Process through augur curate + transform pipeline + cmd = ( + f'augur curate passthru --metadata {shlex.quote(str(metadata_file))} ' + f'--fasta {shlex.quote(str(sequence_file))} ' + f'--seq-id-column strain --seq-field sequence | ./bin/transform-to-gisaid-cache' + ) + result = subprocess.run(cmd, shell=True, capture_output=True, text=False) + + if result.returncode != 0: + print(f" WARNING: Transform failed for {tar_path.name}: {result.stderr.decode()}", file=sys.stderr) + return None + + # Stream records directly to output file + record_count = 0 + for record in load_ndjson(io.BytesIO(result.stdout)): + json.dump(record, output_file) + output_file.write('\n') + record_count += 1 + + return record_count + + +def main(): + source_path = sys.argv[1] # e.g., s3://nextstrain-ncov-private/gisaid-tars/unprocessed/ OR local path + output_ndjson = sys.argv[2] # e.g., data/gisaid/tar-combined.ndjson + manifest_file = sys.argv[3] # e.g., data/gisaid/tar-processed-manifest.txt + + # Check if source is S3 or local path + if source_path.startswith("s3://"): + # Download all unprocessed tar files from S3 + tmp_dir = Path("tmp/gisaid-tars-unprocessed") + tmp_dir.mkdir(parents=True, exist_ok=True) + + print(f"Downloading tar files from {source_path}...", file=sys.stderr) + subprocess.run([ + "aws", "s3", "cp", source_path, str(tmp_dir), + "--recursive", "--exclude", "*", "--include", "*.tar", + "--no-progress" + ], check=True) + else: + # Use local path directly + tmp_dir = Path(source_path) + print(f"Using local tar files from {source_path}...", file=sys.stderr) + if not tmp_dir.exists(): + print(f"ERROR: Local path does not exist: {source_path}", file=sys.stderr) + sys.exit(1) + + tar_files = list(tmp_dir.glob("*.tar")) + if not tar_files: + print("No tar files found", file=sys.stderr) + # Create empty outputs so Snakemake doesn't fail + Path(output_ndjson).touch() + Path(manifest_file).write_text("") + return + + print(f"Found {len(tar_files)} tar files to process:", file=sys.stderr) + for tar in sorted(tar_files): + print(f" - {tar.name}", file=sys.stderr) + print("", file=sys.stderr) + + processed_tars = [] + failed_tars = [] + total_records = 0 + + # Open output file for streaming + with open(output_ndjson, 'w') as output_file: + # Sort by filename to ensure consistent ordering + # Process newest first so dedup-by-gisaid-id keeps the newest record (it keeps first) + for tar_path in sorted(tar_files, reverse=True): + print(f"Processing {tar_path.name}...", file=sys.stderr) + try: + record_count = process_tar_file(tar_path, output_file) + if record_count is not None: + total_records += record_count + processed_tars.append(tar_path.name) + print(f" Processed {record_count} records", file=sys.stderr) + else: + failed_tars.append(tar_path.name) + except Exception as e: + print(f" ERROR processing {tar_path.name}: {e}", file=sys.stderr) + failed_tars.append(tar_path.name) + continue + + print(f"\nWrote {total_records} total records to {output_ndjson}", file=sys.stderr) + + # Write manifest + with open(manifest_file, 'w') as f: + for tar_name in processed_tars: + f.write(f"{tar_name}\n") + + print(f"\nSuccessfully processed {len(processed_tars)}/{len(tar_files)} tar files", file=sys.stderr) + if processed_tars: + print("Processed tars:", file=sys.stderr) + for tar_name in processed_tars: + print(f" ✓ {tar_name}", file=sys.stderr) + + if failed_tars: + print(f"\nWarning: {len(failed_tars)} tar files failed to process:", file=sys.stderr) + for tar_name in failed_tars: + print(f" ✗ {tar_name}", file=sys.stderr) + +if __name__ == "__main__": + main() diff --git a/manual-upload/README.md b/manual-upload/README.md deleted file mode 100644 index 0960b599..00000000 --- a/manual-upload/README.md +++ /dev/null @@ -1,106 +0,0 @@ -# Nextstrain manual upload - -> [!NOTE] -> External users can ignore this directory! -> This build is tailored for the internal Nextstrain team to -> run manually to upload GISAID files to our private AWS S3 bucket. - -This workflow uploads files which have been manually downloaded from GISAID to S3, storing them as "unprocessed" pairs of files. - -## Run the workflow - -This workflow is expected to be run manually after downloading files from GISAID. -The GISAID files are expected to be saved as - -- `-metadata.tsv` -- `-sequences.fasta` - -`` is the date the files were downloaded from GISAID. -`` is the number of the download since GISAID limits the number of records per download. - -For example, if you had to split the data between two downloads on 2025-04-11, -then save the files as -- `2025-04-11-1-metadata.tsv` -- `2025-04-11-1-sequences.fasta` -- `2025-04-11-2-metadata.tsv` -- `2025-04-11-2-sequences.fasta` - -The directory in which you save the GISAID files depends on which command you are -using to run the workflow. - -### With `nextstrain run` - -When running with `nextstrain run`, you can save the GISAID files in the `data` -directory within any arbitrary analysis directory. However, you must also create -a `config.yaml` within the analysis directory to specify the `gisaid_pairs` to upload. - -Continuing the example above, your analysis directory should look like -``` - -├── config.yaml -└── data - ├── 2025-04-11-1-metadata.tsv - ├── 2025-04-11-1-sequences.fasta - ├── 2025-04-11-2-metadata.tsv - └── 2025-04-11-2-sequences.fasta -``` - -With the `config.yaml` specifying the `gisaid_pairs` you want to upload - -```yaml -gisaid_pairs: - - 2025-04-11-1 - - 2025-04-11-2 -``` - -Make sure you have the latest ncov-ingest pathogen setup. - -```shell -$ nextstrain update ncov-ingest@master -Checking for newer versions of Nextstrain CLI… - -nextstrain-cli is up to date! - -Updating ncov-ingest@master pathogen version… -'ncov-ingest@master' already up-to-date. - -Updated ncov-ingest@master pathogen version! - -All updates successful! -``` - -Then run the workflow -``` -nextstrain run \ - --env AWS_ACCESS_KEY_ID \ - --env AWS_SECRET_ACCESS_KEY \ - ncov-ingest@master \ - manual-upload \ - -``` - -### With `nextstrain build` - -When running with `nextstrain build` the files must be saved _within_ the -ncov-ingest repo. - -Save the downloaded GISAID metadata and sequences as: -- `manual-upload/data/-metadata.tsv` -- `manual-upload/data/-sequences.fasta` - -The workflow can be run from the top level pathogen repo directory with: -``` -nextstrain build \ - --env AWS_ACCESS_KEY_ID \ - --env AWS_SECRET_ACCESS_KEY \ - manual-upload \ - --config gisaid_pairs=["2025-04-11-1", "2025-04-11-2"] -``` - -### Required environment variables - -You need to have AWS credentials with permissions to upload to the private -AWS S3 bucket `nextstrain-ncov-private` - -- `AWS_ACCESS_KEY_ID` -- `AWS_SECRET_ACCESS_KEY` diff --git a/manual-upload/Snakefile b/manual-upload/Snakefile deleted file mode 100644 index db322918..00000000 --- a/manual-upload/Snakefile +++ /dev/null @@ -1,59 +0,0 @@ -""" -This handles uploads of files downloaded from GISAID to AWS S3. -""" -import os.path - - -# Use default configuration values. Extend with Snakemake's --configfile/--config options. -configfile: os.path.join(workflow.basedir, "config.yaml") - -# Use custom configuration from analysis directory (i.e. working dir), if any. -if os.path.exists("config.yaml"): - configfile: "config.yaml" - - -wildcard_constraints: - # Constrain GISAID pair names to YYYY-MM-DD-N - gisaid_pair = r'\d{4}-\d{2}-\d{2}(-\d+)?' - - -rule upload_gisaid_pairs: - input: - upload_flags=expand([ - "data/{gisaid_pair}-metadata.upload", - "data/{gisaid_pair}-sequences.upload", - ], gisaid_pair=config["gisaid_pairs"]), - - -rule upload_gisaid_metadata: - input: - metadata="data/{gisaid_pair}-metadata.tsv", - output: - flag="data/{gisaid_pair}-metadata.upload", - params: - s3_dst=config["s3_dst"], - shell: - r""" - {workflow.basedir}/../vendored/upload-to-s3 \ - --quiet \ - {input.metadata:q} \ - {params.s3_dst:q}/{wildcards.gisaid_pair}-metadata.tsv.zst \ - 2>&1 | tee {output.flag:q} - """ - - -rule upload_gisaid_sequences: - input: - sequences="data/{gisaid_pair}-sequences.fasta", - output: - flag="data/{gisaid_pair}-sequences.upload", - params: - s3_dst=config["s3_dst"], - shell: - r""" - {workflow.basedir}/../vendored/upload-to-s3 \ - --quiet \ - {input.sequences:q} \ - {params.s3_dst:q}/{wildcards.gisaid_pair}-sequences.fasta.zst \ - 2>&1 | tee {output.flag:q} - """ diff --git a/manual-upload/config.yaml b/manual-upload/config.yaml deleted file mode 100644 index 77fab1a9..00000000 --- a/manual-upload/config.yaml +++ /dev/null @@ -1,2 +0,0 @@ -# AWS S3 destination for the downloaded GISAID files -s3_dst: "s3://nextstrain-ncov-private/gisaid-downloads/unprocessed" diff --git a/workflow/snakemake_rules/fetch_sequences.smk b/workflow/snakemake_rules/fetch_sequences.smk index 1561068f..245b0ca8 100644 --- a/workflow/snakemake_rules/fetch_sequences.smk +++ b/workflow/snakemake_rules/fetch_sequences.smk @@ -18,9 +18,7 @@ Produces different final outputs for GISAID vs GenBank/RKI: rki_ndjson = "data/rki.ndjson" """ -wildcard_constraints: - # Constrain GISAID pair names to "gisaid_cache" or YYYY-MM-DD-N - gisaid_pair = r'gisaid_cache|\d{4}-\d{2}-\d{2}(-\d+)?' +# No wildcard constraints needed - processing all tars in bulk if config.get("s3_src"): @@ -39,92 +37,34 @@ if config.get("s3_src"): ./vendored/download-from-s3 {params.s3_file:q} {output.ndjson:q} """ - checkpoint fetch_unprocessed_files: + rule process_all_unprocessed_tars: """ - Fetch unprocessed GISAID files. - These are pairs of metadata.tsv.zst and sequences.fasta.zst files. - - This is a checkpoint because the DAG needs to be re-evaluated to determine - which `gisaid_pair` need to be processed. + Download all unprocessed tar files from S3 and process them to a single NDJSON. + Outputs a manifest of successfully processed tars. """ output: - directory("data/unprocessed-gisaid-downloads/"), + ndjson=temp("data/gisaid/tar-combined.ndjson"), + manifest="data/gisaid/tar-processed-manifest.txt", params: - s3_prefix=f"{config['s3_src']}/gisaid-downloads/unprocessed/" - shell: - r""" - aws s3 cp {params.s3_prefix:q} {output:q} \ - --recursive \ - --exclude "*" \ - --include "*-metadata.tsv.zst" \ - --include "*-sequences.fasta.zst" - """ - - rule decompress_unprocessed_files: - input: - metadata="data/unprocessed-gisaid-downloads/{gisaid_pair}-metadata.tsv.zst", - sequences="data/unprocessed-gisaid-downloads/{gisaid_pair}-sequences.fasta.zst", - output: - metadata=temp("data/gisaid/{gisaid_pair}-metadata.tsv"), - sequences=temp("data/gisaid/{gisaid_pair}-sequences.fasta"), + s3_unprocessed=f"{config['s3_src']}/gisaid-tars/unprocessed/" + log: "logs/process_all_unprocessed_tars.txt" shell: r""" - zstd --decompress --stdout {input.metadata:q} > {output.metadata:q} - zstd --decompress --stdout {input.sequences:q} > {output.sequences:q} + ./bin/process-unprocessed-tars \ + {params.s3_unprocessed:q} \ + {output.ndjson:q} \ + {output.manifest:q} \ + 2>&1 | tee {log:q} >&2 """ - -rule link_gisaid_metadata_and_fasta: - input: - metadata="data/gisaid/{gisaid_pair}-metadata.tsv", - sequences="data/gisaid/{gisaid_pair}-sequences.fasta", - output: - ndjson=temp("data/gisaid/{gisaid_pair}.ndjson"), - params: - seq_id_column="strain", - seq_field="sequence", - log: "logs/link_gisaid_metadata_and_fasta/{gisaid_pair}.txt" - shell: - r""" - augur curate passthru \ - --metadata {input.metadata:q} \ - --fasta {input.sequences:q} \ - --seq-id-column {params.seq_id_column:q} \ - --seq-field {params.seq_field:q} \ - | ./bin/transform-to-gisaid-cache \ - > {output.ndjson:q} \ - 2> {log:q} - """ - -def aggregate_gisaid_ndjsons(wildcards): +rule concatenate_gisaid_ndjsons: """ - Input function for rule concatenate_gisaid_ndjsons to check which - GISAID pairs to include the output. + Concatenate the cached GISAID NDJSON with newly processed tar files, + then deduplicate to keep the newest record for each GISAID ID. """ - if len(config.get("gisaid_pairs", [])): - GISAID_PAIRS = config["gisaid_pairs"] - elif config.get('s3_src') and hasattr(checkpoints, "fetch_unprocessed_files"): - # Use checkpoint for the Nextstrain automation - checkpoint_output = checkpoints.fetch_unprocessed_files.get(**wildcards).output[0] - GISAID_PAIRS, = glob_wildcards(os.path.join(checkpoint_output, "{gisaid_pair}-metadata.tsv.zst")) - # Reverse sort to list latest downloads first - GISAID_PAIRS.sort(reverse=True) - # Add the GISAID cache last to prioritize the latest downloads - GISAID_PAIRS.append("gisaid_cache") - else: - # Create wildcards for pairs of GISAID downloads - GISAID_PAIRS, = glob_wildcards("data/gisaid/{gisaid_pair}-metadata.tsv") - # Reverse sort to list latest downloads first - GISAID_PAIRS.sort(reverse=True) - - assert len(GISAID_PAIRS), "No GISAID metadata and sequences inputs were found" - - return expand("data/gisaid/{gisaid_pair}.ndjson", gisaid_pair=GISAID_PAIRS) - - -rule concatenate_gisaid_ndjsons: input: - ndjsons=aggregate_gisaid_ndjsons, + gisaid_cache="data/gisaid/gisaid_cache.ndjson" if config.get('s3_src') else [], + tar_records="data/gisaid/tar-combined.ndjson" if config.get('s3_src') else [], output: ndjson=temp("data/gisaid.ndjson"), params: @@ -132,10 +72,16 @@ rule concatenate_gisaid_ndjsons: log: "logs/concatenate_gisaid_ndjsons.txt" shell: r""" - (cat {input.ndjsons:q} \ - | ./bin/dedup-by-gisaid-id \ - --id-field {params.gisaid_id_field:q} \ - > {output.ndjson:q}) 2> {log:q} + tar_count=$(if [[ -s {input.tar_records:q} ]]; then wc -l < {input.tar_records:q}; else echo 0; fi) + cache_count=$(if [[ -s {input.gisaid_cache:q} ]]; then wc -l < {input.gisaid_cache:q}; else echo 0; fi) + echo "Concatenating: tar_records=$tar_count lines, gisaid_cache=$cache_count lines" >&2 + + cat {input.tar_records:q} {input.gisaid_cache:q} \ + | ./bin/dedup-by-gisaid-id --id-field {params.gisaid_id_field:q} \ + > {output.ndjson:q} 2> {log:q} + + output_count=$(wc -l < {output.ndjson:q}) + echo "Deduplication complete: output=$output_count lines (removed $((tar_count + cache_count - output_count)) duplicates)" >&2 """ rule fetch_ncbi_dataset_package: diff --git a/workflow/snakemake_rules/upload.smk b/workflow/snakemake_rules/upload.smk index 9523b66a..e8fd9d92 100644 --- a/workflow/snakemake_rules/upload.smk +++ b/workflow/snakemake_rules/upload.smk @@ -124,49 +124,36 @@ rule remove_rerun_touchfile: """ -def all_processed_gisaid_pairs(wildcards): +rule mv_all_processed_tars: """ - Check which unprocessed files were fetched so that we only move the - `gisaid_pairs` that were processed in this run of the workflow. - - Only returns the unprocessed files if `s3_src` is the same as `s3_dst` so - that trials runs don't accidentally mark files as processed. - """ - if (hasattr(checkpoints, "fetch_unprocessed_files") and - config["s3_src"] == config["s3_dst"]): - checkpoint_output = checkpoints.fetch_unprocessed_files.get(**wildcards).output[0] - return expand( - "data/mv-processed/{gisaid_pair}.done", - gisaid_pair=glob_wildcards(os.path.join(checkpoint_output, "{gisaid_pair}-metadata.tsv.zst")).gisaid_pair - ) - else: - return [] - -rule mv_processed_gisaid_pair: - """ - Move the processed gisaid_pair from /unprocessed to /processed on AWS S3 - so that they are not reprocessed in the next run of the automated ingest. - - The records in the processed gisaid_pair should be included in the cached - gisaid.ndjson, so only move it after the gisaid.ndjson was successfully - uploaded to S3. + Compress and move all processed tars from /unprocessed to /processed on S3. + Compresses with zstd before moving to save storage costs. + Only runs after successful upload, reads manifest to know which files to move. + Skips if this is a trial run (s3_src != s3_dst). """ input: ndjson_flag="data/gisaid/gisaid.ndjson.zst.upload", + manifest="data/gisaid/tar-processed-manifest.txt", output: - flag=touch("data/mv-processed/{gisaid_pair}.done") + flag=touch("data/mv-processed/all-tars.done") params: - s3_dst=f"{config['s3_dst']}/gisaid-downloads/processed/", - s3_src=f"{config['s3_src']}/gisaid-downloads/unprocessed/", + s3_dst=f"{config['s3_dst']}/gisaid-tars/processed/", + s3_src=f"{config['s3_src']}/gisaid-tars/unprocessed/", shell: r""" - aws s3 mv \ - {params.s3_src:q} \ - {params.s3_dst:q} \ - --recursive \ - --exclude "*" \ - --include "{wildcards.gisaid_pair}-metadata.tsv.zst" \ - --include "{wildcards.gisaid_pair}-sequences.fasta.zst" + if [[ ! -s {input.manifest:q} ]]; then + echo "No tars to move (empty manifest)" + exit 0 + fi + + while IFS= read -r tar_name; do + echo "Compressing and moving $tar_name to processed/" + aws s3 cp -- "{params.s3_src}$tar_name" - \ + | zstd \ + | aws s3 cp - "{params.s3_dst}$tar_name.zst" \ + && aws s3 rm -- "{params.s3_src}$tar_name" \ + && echo " Moved $tar_name -> $tar_name.zst" + done < {input.manifest:q} """ @@ -183,7 +170,7 @@ rule upload: "nextclade_21L.tsv.zst", ] ], - mv_processed=all_processed_gisaid_pairs, + mv_processed=["data/mv-processed/all-tars.done"] if (config.get("s3_src") and config.get("s3_dst") and config["s3_src"] == config["s3_dst"]) else [], output: touch(f"data/{database}/upload.done") benchmark: