Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 4 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,11 @@ Follow these instructions to run the ncov-ingest pipeline _without_ all the bell

### GISAID

To pull sequences directly from GISAID, you are required to set two environment variables:
To add new GISAID sequences:

- `GISAID_API_ENDPOINT`
- `GISAID_USERNAME_AND_PASSWORD`

Then run the ncov-ingest pipeline with the nextstrain CLI:

```sh
nextstrain build \
--image nextstrain/ncov-ingest \
--env GISAID_API_ENDPOINT \
--env GISAID_USERNAME_AND_PASSWORD \
. \
--configfile config/local_gisaid.yaml
```
1. Download them from GISAID using the "Input for the Augur pipeline" option. This results in a file of the form `gisaid_auspice_input_hcov-19_2026_01_07_11.tar` (exact date will vary).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd push to rename the file during download to our own standardized name, e.g.2026-01-07-01.tar, so that we are not at the mercy of the default name that GISAID produces. This ensures that the downloaded *.tar are sorted as expected for the deduplication process.

2. Upload this file to s3: `aws s3 cp /path/to/downloads/gisaid_auspice_input_hcov-19_2026_01_07_11.tar s3://nextstrain-ncov-private/gisaid-downloads/unprocessed/`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if we are using the .tar, I'd still want to compress these before/during upload so that they don't bloat our S3 bucket.

3. Trigger the ncov-ingest GISAID action.

### GenBank

Expand Down
144 changes: 144 additions & 0 deletions bin/process-unprocessed-tars
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
#!/usr/bin/env python3
"""
Download all unprocessed GISAID tar files from S3, process them to NDJSON,
and output a manifest of successfully processed tars.
"""
import subprocess
import sys
import tempfile
import tarfile
from pathlib import Path
from augur.io.json import dump_ndjson, load_ndjson
import os

def main():
source_path = sys.argv[1] # e.g., s3://nextstrain-ncov-private/gisaid-downloads/unprocessed/ OR local path
output_ndjson = sys.argv[2] # e.g., data/gisaid/unprocessed-combined.ndjson
manifest_file = sys.argv[3] # e.g., data/gisaid/processed-manifest.txt

# Check if source is S3 or local path
if source_path.startswith("s3://"):
# Download all tar files from S3
tmp_dir = Path("tmp/unprocessed-tars")
tmp_dir.mkdir(parents=True, exist_ok=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

non-blocking

Should this tmp_dir get cleaned up at the end of the script?


print(f"Downloading tar files from {source_path}...", file=sys.stderr)
subprocess.run([
"aws", "s3", "cp", source_path, str(tmp_dir),
"--recursive", "--exclude", "*", "--include", "*.tar",
"--no-progress"
], check=True)
else:
# Use local path directly
tmp_dir = Path(source_path)
print(f"Using local tar files from {source_path}...", file=sys.stderr)
if not tmp_dir.exists():
print(f"ERROR: Local path does not exist: {source_path}", file=sys.stderr)
sys.exit(1)

tar_files = list(tmp_dir.glob("*.tar"))
if not tar_files:
print("No tar files found", file=sys.stderr)
# Create empty outputs so Snakemake doesn't fail
Path(output_ndjson).touch()
Path(manifest_file).write_text("")
return

print(f"Found {len(tar_files)} tar files to process:", file=sys.stderr)
for tar in sorted(tar_files):
print(f" - {tar.name}", file=sys.stderr)
print("", file=sys.stderr)

processed_tars = []
all_records = []

# Sort by filename to ensure consistent ordering
# Process oldest first so newer records overwrite during deduplication
for tar_path in sorted(tar_files):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dedup-by-gisaid-id script keeps the first record, so this needs to be sorted in reverse order to keep the newer records.

print(f"Processing {tar_path.name}...", file=sys.stderr)
try:
# Extract and process
with tempfile.TemporaryDirectory() as extract_dir:
extract_path = Path(extract_dir)

# Extract tar (strip leading slashes from absolute paths)
with tarfile.open(tar_path, 'r') as tar:
# Strip leading slashes to avoid extracting to root filesystem
members = tar.getmembers()
for member in members:
if member.name.startswith('/'):
member.name = member.name.lstrip('/')
tar.extractall(extract_path, members=members)

# Find metadata and sequences
metadata_files = list(extract_path.glob("**/*.metadata.tsv"))
sequence_files = list(extract_path.glob("**/*.sequences.fasta"))

if not metadata_files or not sequence_files:
print(f" WARNING: Missing metadata or sequences in {tar_path.name}", file=sys.stderr)
continue

metadata_file = metadata_files[0]
sequence_file = sequence_files[0]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

non-blocking

Might be nice to output warning if there are multiple files.


# Process through augur curate + transform
proc = subprocess.Popen([
"augur", "curate", "passthru",
"--metadata", str(metadata_file),
"--fasta", str(sequence_file),
"--seq-id-column", "strain",
"--seq-field", "sequence"
], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=False)

proc2 = subprocess.Popen([
"./bin/transform-to-gisaid-cache"
], stdin=proc.stdout, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=False)

proc.stdout.close()
stdout, stderr = proc2.communicate()

if proc2.returncode != 0:
print(f" WARNING: Transform failed for {tar_path.name}: {stderr.decode()}", file=sys.stderr)
continue

# Parse records
import io
records = list(load_ndjson(io.BytesIO(stdout)))
all_records.extend(records)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm worried we could run into out of memory errors if we hold all records in memory here. I'd write out the records per tarfile in case we ever need to do a full reprocess.

processed_tars.append(tar_path.name)
print(f" Processed {len(records)} records", file=sys.stderr)

except Exception as e:
print(f" ERROR processing {tar_path.name}: {e}", file=sys.stderr)
continue

# Write combined NDJSON
print(f"\nWriting {len(all_records)} total records to {output_ndjson}", file=sys.stderr)
lines_written = 0
with open(output_ndjson, 'w') as f:
# dump_ndjson writes to stdout by default, so we need to redirect
import json
for record in all_records:
json.dump(record, f)
f.write('\n')
lines_written += 1

print(f"Wrote {lines_written} lines to {output_ndjson}", file=sys.stderr)

# Write manifest
with open(manifest_file, 'w') as f:
for tar_name in processed_tars:
f.write(f"{tar_name}\n")

print(f"\nSuccessfully processed {len(processed_tars)}/{len(tar_files)} tar files", file=sys.stderr)
if processed_tars:
print("Processed tars:", file=sys.stderr)
for tar_name in processed_tars:
print(f" ✓ {tar_name}", file=sys.stderr)

failed_count = len(tar_files) - len(processed_tars)
if failed_count > 0:
print(f"\nWarning: {failed_count} tar files failed to process", file=sys.stderr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

non-blocking

Could be nice to have a list of failed tar files for debugging.


if __name__ == "__main__":
main()
106 changes: 0 additions & 106 deletions manual-upload/README.md

This file was deleted.

59 changes: 0 additions & 59 deletions manual-upload/Snakefile

This file was deleted.

2 changes: 0 additions & 2 deletions manual-upload/config.yaml

This file was deleted.

Loading