diff --git a/poetry.lock b/poetry.lock index 880a2e8..d532afd 100644 --- a/poetry.lock +++ b/poetry.lock @@ -307,7 +307,29 @@ pygments = ">=2.7.2" [package.extras] dev = ["argcomplete", "attrs (>=19.2)", "hypothesis (>=3.56)", "mock", "requests", "setuptools", "xmlschema"] +[[package]] +name = "tqdm" +version = "4.67.1" +description = "Fast, Extensible Progress Meter" +optional = false +python-versions = ">=3.7" +groups = ["main"] +files = [ + {file = "tqdm-4.67.1-py3-none-any.whl", hash = "sha256:26445eca388f82e72884e0d580d5464cd801a3ea01e63e5601bdff9ba6a48de2"}, + {file = "tqdm-4.67.1.tar.gz", hash = "sha256:f8aef9c52c08c13a65f30ea34f4e5aac3fd1a34959879d7e59e63027286627f2"}, +] + +[package.dependencies] +colorama = {version = "*", markers = "platform_system == \"Windows\""} + +[package.extras] +dev = ["nbval", "pytest (>=6)", "pytest-asyncio (>=0.24)", "pytest-cov", "pytest-timeout"] +discord = ["requests"] +notebook = ["ipywidgets (>=6)"] +slack = ["slack-sdk"] +telegram = ["requests"] + [metadata] lock-version = "2.1" python-versions = ">=3.13" -content-hash = "2c29ea0786a7dd48ee7d70cbb762805916e7a2d10755fc2141dfaa14e965d12b" +content-hash = "8548043b901cdcf4003399b40fcc2ab9f211f2db0d5cf9b3ffb9fc93b2a4a29d" diff --git a/pyproject.toml b/pyproject.toml index 1e8798d..876c9ab 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,7 +10,8 @@ requires-python = ">=3.13" dependencies = [ "click (>=8.3.0,<9.0.0)", "pysam (>=0.23.3,<0.24.0)", - "pytest (>=8.4.2,<9.0.0)" + "pytest (>=8.4.2,<9.0.0)", + "tqdm (>=4.67.1,<5.0.0)" ] [tool.poetry] diff --git a/src/harp/harp.py b/src/harp/harp.py index cda3aed..c8d6263 100644 --- a/src/harp/harp.py +++ b/src/harp/harp.py @@ -1,5 +1,9 @@ +from pathlib import Path + import click +from harp.pipeline import pipeline + @click.group() @click.version_option(prog_name="HARP") @@ -10,18 +14,37 @@ def cli(): @cli.command() @click.option( - "--bam", type=click.Path(exists=True), help="Input BAM file (indexed)" + "--bam", + type=click.Path(exists=True, dir_okay=False, path_type=Path), + required=True, + help="Input BAM file (indexed)", +) +@click.option( + "--vcf", + type=click.Path(exists=True, dir_okay=False, path_type=Path), + required=True, + help="Input phased VCF file", +) +@click.option( + "--out", + type=click.Path(dir_okay=False, path_type=Path), + required=True, + help="Output TSV file", ) @click.option( - "--vcf", type=click.Path(exists=True), help="Input phased VCF file" + "--threads", + type=int, + default=4, + show_default=True, + help="Number of parallel threads to use", ) -@click.option("--out", type=click.Path(), help="Output TSV file") -def run(bam, vcf, out): +def run(bam: Path, vcf: Path, out: Path, threads: int): """Compute haplotype-specific allele counts for SNVs.""" - click.echo(f"BAM: {bam}") - click.echo(f"VCF: {vcf}") - click.echo(f"Output TSV: {out}") - click.echo("HARP CLI skeleton ready for future implementation.") + click.echo( + f"Running HARP on:\n BAM: {bam}\n VCF: {vcf}\n Output: {out}" + ) + pipeline(bam_path=bam, vcf_path=vcf, out_path=out, threads=threads) + click.echo("HARP finished successfully.") if __name__ == "__main__": diff --git a/src/harp/harp_errors.py b/src/harp/harp_errors.py new file mode 100644 index 0000000..a932688 --- /dev/null +++ b/src/harp/harp_errors.py @@ -0,0 +1,19 @@ +""" +Custom exceptions for HARP. +""" + + +class HarpError(Exception): + """Base class for all HARP-specific errors.""" + + +class VCFParsingError(HarpError): + """Raised when a VCF file cannot be opened or parsed.""" + + +class BAMProcessingError(HarpError): + """Raised when a BAM file cannot be processed.""" + + +class InvalidVariantError(HarpError): + """Raised when variant data is malformed or inconsistent.""" diff --git a/src/harp/harp_io.py b/src/harp/harp_io.py new file mode 100644 index 0000000..7d9414f --- /dev/null +++ b/src/harp/harp_io.py @@ -0,0 +1,67 @@ +""" +I/O functions for reading variants in genomic intervals. +""" + +from pathlib import Path +from typing import Iterator + +import pysam + +from harp.harp_errors import VCFParsingError +from harp.harp_types import Counts, Variant, VariantKey + + +def load_variants_chunk( + vcf_path: Path, chrom: str, start: int, end: int +) -> dict[VariantKey, Variant]: + """ + Load variants from a VCF only within a specified genomic interval. + + Args: + vcf_path (Path): Path to VCF file (gzipped/indexed or plain). + chrom (str): Chromosome name. + start (int): 0-based start position (inclusive). + end (int): 0-based end position (exclusive). + + Returns: + dict[VariantKey, Variant]: Dictionary of variants within the interval. + """ + try: + vcffile = pysam.VariantFile(str(vcf_path)) + except Exception as e: + raise VCFParsingError(f"Failed to open VCF: {vcf_path}") from e + + variants: dict[VariantKey, Variant] = {} + + for record in vcffile.fetch(chrom, start, end): + # Only simple bi-allelic SNVs + if ( + len(record.alts) != 1 + or len(record.ref) != 1 + or len(record.alts[0]) != 1 + ): + continue + + key = VariantKey(chrom=chrom, pos=record.pos - 1) + variants[key] = Variant( + key=key, + ref=record.ref, + alt=record.alts[0], + counts=Counts(), + ) + + return variants + + +def open_bam_iterator( + bam_path: Path, chrom: str, start: int, end: int +) -> Iterator[pysam.AlignedSegment]: + """ + Open a BAM file, check it exists, and return an + iterator over reads for the given region. + """ + if not bam_path.exists(): + raise FileNotFoundError(f"BAM file not found: {bam_path}") + + bamfile = pysam.AlignmentFile(str(bam_path), "rb") + return bamfile.fetch(chrom, start, end) diff --git a/src/harp/harp_types.py b/src/harp/harp_types.py new file mode 100644 index 0000000..aa5b7af --- /dev/null +++ b/src/harp/harp_types.py @@ -0,0 +1,85 @@ +""" +Custom data types used throughout HARP. +""" + +from dataclasses import dataclass + + +@dataclass(frozen=True) +class VariantKey: + """ + Represents a genomic variant key uniquely identified by chromosome + and position. + + Attributes: + chrom (str): Chromosome name. + pos (int): 0-based genomic position. + """ + + chrom: str + pos: int + + +@dataclass +class Counts: + """ + Stores allele counts for haplotypes 1 and 2. + + Attributes: + h1_ref (int): Number of reads supporting REF allele on haplotype 1. + h1_alt (int): Number of reads supporting ALT allele on haplotype 1. + h2_ref (int): Number of reads supporting REF allele on haplotype 2. + h2_alt (int): Number of reads supporting ALT allele on haplotype 2. + """ + + h1_ref: int = 0 + h1_alt: int = 0 + h2_ref: int = 0 + h2_alt: int = 0 + + def __iadd__(self, other: "Counts") -> "Counts": + """In-place addition of counts from another `Counts` object.""" + self.h1_ref += other.h1_ref + self.h1_alt += other.h1_alt + self.h2_ref += other.h2_ref + self.h2_alt += other.h2_alt + return self + + +@dataclass +class Variant: + """ + Represents a single SNV with reference/alternate alleles and counts. + + Attributes: + key (VariantKey): Genomic location of the variant. + ref (str): Reference allele. + alt (str): Alternate allele. + counts (Counts): Read support counts for each haplotype/allele. + """ + + key: VariantKey + ref: str + alt: str + counts: Counts + + def __iadd__(self, other: "Variant") -> "Variant": + """ + In-place addition of counts from another Variant. + + Raises: + ValueError: if ref/alt alleles or key do not match. + """ + if self.key != other.key: + raise ValueError( + f"Cannot merge variants with different keys: " + f"{self.key} != {other.key}" + ) + if self.ref != other.ref or self.alt != other.alt: + raise ValueError( + f"Cannot merge variants with different alleles: " + f"{self.ref}/{self.alt} != {other.ref}/{other.alt}" + ) + + self.counts += other.counts + return self diff --git a/src/harp/pipeline.py b/src/harp/pipeline.py new file mode 100644 index 0000000..ce4e932 --- /dev/null +++ b/src/harp/pipeline.py @@ -0,0 +1,49 @@ +from concurrent.futures import ProcessPoolExecutor, as_completed +from pathlib import Path + +import pysam +from tqdm import tqdm + +from harp.harp_types import Variant, VariantKey +from harp.process import merge_results, process_chunk +from harp.writer import write_results + +CHUNK_SIZE = 200_000 # Increase to reduce number of futures + + +def pipeline( + bam_path: Path, vcf_path: Path, out_path: Path, threads: int +) -> None: + """Run HARP pipeline with multiprocessing and efficient merging.""" + + global_variants: dict[VariantKey, Variant] = {} + + # Collect chromosome lengths + with pysam.AlignmentFile(str(bam_path), "rb") as bamfile: + chrom_lengths = { + chrom: bamfile.get_reference_length(chrom) + for chrom in bamfile.references + } + + # Submit jobs + futures = [] + with ProcessPoolExecutor(max_workers=threads) as executor: + for chrom, length in chrom_lengths.items(): + for start in range(0, length, CHUNK_SIZE): + end = min(start + CHUNK_SIZE, length) + futures.append( + executor.submit( + process_chunk, bam_path, vcf_path, chrom, start, end + ) + ) + + # Merge as they complete + for fut in tqdm( + as_completed(futures), total=len(futures), desc="Processing chunks" + ): + chunk_result = fut.result() + if chunk_result: + merge_results(global_variants, chunk_result) + + # Write final results + write_results(out_path, global_variants) diff --git a/src/harp/process.py b/src/harp/process.py new file mode 100644 index 0000000..4f7c8b6 --- /dev/null +++ b/src/harp/process.py @@ -0,0 +1,85 @@ +from pathlib import Path + +from harp.harp_io import load_variants_chunk, open_bam_iterator +from harp.harp_types import Variant, VariantKey + + +def process_chunk( + bam_path: Path, + vcf_path: Path, + chrom: str, + start: int, + end: int, +) -> dict[VariantKey, Variant]: + """ + Process a genomic interval from a BAM and VCF file, + counting allele support per haplotype. + + Args: + bam_path (Path): Path to the haplotagged BAM file. + vcf_path (Path): Path to the VCF file containing variants. + chrom (str): Chromosome name. + start (int): Start position of the interval (0-based, inclusive). + end (int): End position of the interval (0-based, exclusive). + + Returns: + dict[VariantKey, Variant]: Dictionary mapping VariantKey to + Variant with updated counts. + """ + variants = load_variants_chunk(vcf_path, chrom, start, end) + if not variants: + return {} + + for read in open_bam_iterator(bam_path, chrom, start, end): + if read.is_secondary or read.is_supplementary: + continue + if not read.has_tag("HP"): + continue + + hap = read.get_tag("HP") + if hap not in (1, 2): + continue + + for read_idx, ref_idx in read.get_aligned_pairs(matches_only=True): + if ref_idx is None or VariantKey(chrom, ref_idx) not in variants: + continue + + key = VariantKey(chrom, ref_idx) + var: Variant = variants[key] + + base = read.query_sequence[read_idx] + if base == var.ref: + if hap == 1: + var.counts.h1_ref += 1 + else: + var.counts.h2_ref += 1 + elif base == var.alt: + if hap == 1: + var.counts.h1_alt += 1 + else: + var.counts.h2_alt += 1 + + return variants + + +def merge_results( + global_variants: dict[VariantKey, Variant], + chunk_result: dict[VariantKey, Variant], +): + """ + Merge counts from a single chunk into the global variants dictionary. + + Args: + global_variants (dict[VariantKey, Variant]): Dictionary storing + cumulative Variant counts. + chunk_result (dict[VariantKey, Variant]): Dictionary with Variant + counts from one chunk. + + Modifies: + global_variants in-place by summing counts with chunk_result. + """ + for key, var in chunk_result.items(): + if key not in global_variants: + global_variants[key] = var + else: + global_variants[key] += var diff --git a/src/harp/writer.py b/src/harp/writer.py new file mode 100644 index 0000000..7e973e5 --- /dev/null +++ b/src/harp/writer.py @@ -0,0 +1,27 @@ +from pathlib import Path + +from harp.harp_types import Variant, VariantKey + + +def write_results( + out_path: Path, global_variants: dict[VariantKey, Variant] +) -> None: + """ + Write the global variant counts to a TSV file. + + Args: + out_path (Path): Path to the output TSV file. + global_variants (dict[VariantKey, Variant]): Merged variant counts. + """ + with open(out_path, "w") as outfile: + outfile.write("chrom\tpos\th1_REF\th1_ALT\th2_REF\th2_ALT\n") + for key in sorted( + global_variants.keys(), key=lambda k: (k.chrom, k.pos) + ): + var = global_variants[key] + c = var.counts + outfile.write( + f"{key.chrom}\t{key.pos}\t" + f"{c.h1_ref}\t{c.h1_alt}\t" + f"{c.h2_ref}\t{c.h2_alt}\n" + ) diff --git a/tests/harp/test_harp.py b/tests/harp/test_harp.py index cbff7ca..d2d272b 100644 --- a/tests/harp/test_harp.py +++ b/tests/harp/test_harp.py @@ -4,7 +4,7 @@ def test_run_command_with_invalid_paths(): - """CLI should exit with code 2 if BAM/VCF paths do not exist.""" + """CLI should exit with code != 0 if BAM/VCF paths do not exist.""" runner = CliRunner() result = runner.invoke( cli, @@ -18,32 +18,10 @@ def test_run_command_with_invalid_paths(): "out.tsv", ], ) - assert result.exit_code == 2 + assert result.exit_code != 0 assert ( - "Error" in result.output - or "No such file or directory" in result.output - ) - - -def test_run_command_with_valid_paths(tmp_path): - """CLI should run successfully when BAM/VCF exist (even as empty files).""" - bam = tmp_path / "dummy.bam" - bam.touch() - vcf = tmp_path / "dummy.vcf" - vcf.touch() - out = tmp_path / "out.tsv" - - runner = CliRunner() - result = runner.invoke( - cli, - ["run", "--bam", str(bam), "--vcf", str(vcf), "--out", str(out)], - ) - assert result.exit_code == 0 - assert f"BAM: {bam}" in result.output - assert f"VCF: {vcf}" in result.output - assert f"Output TSV: {out}" in result.output - assert ( - "HARP CLI skeleton ready for future implementation." in result.output + "No such file or directory" in result.output + or "Error" in result.output ) diff --git a/tests/harp/test_harp_io.py b/tests/harp/test_harp_io.py new file mode 100644 index 0000000..fb03add --- /dev/null +++ b/tests/harp/test_harp_io.py @@ -0,0 +1,69 @@ +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from harp.harp_errors import VCFParsingError +from harp.harp_io import load_variants_chunk +from harp.harp_types import Counts, Variant, VariantKey + + +def test_load_variants_chunk_invalid_file(tmp_path: Path): + """Test that loading an invalid file raises VCFParsingError.""" + bad_file = tmp_path / "bad.vcf" + bad_file.write_text("not a VCF") + with pytest.raises(VCFParsingError): + load_variants_chunk(bad_file, "chr1", 0, 10) + + +@patch("harp.harp_io.pysam.VariantFile") +def test_load_variants_chunk_basic(mock_variantfile, tmp_path: Path): + """Test that load_variants_chunk creates Variant objects + from mocked VCF records.""" + # Mock one record + mock_record = MagicMock() + mock_record.chrom = "chr1" + mock_record.pos = 5 + mock_record.ref = "A" + mock_record.alts = ["T"] + + # Configure fetch to return the mocked record + mock_vcf = MagicMock() + mock_vcf.fetch.return_value = [mock_record] + mock_variantfile.return_value = mock_vcf + + vcf_file = tmp_path / "dummy.vcf" + vcf_file.write_text("") # content not used due to mocking + + variants = load_variants_chunk(vcf_file, "chr1", 0, 10) + key = VariantKey("chr1", 4) # 0-based + + assert key in variants + var = variants[key] + assert isinstance(var, Variant) + assert var.ref == "A" + assert var.alt == "T" + assert isinstance(var.counts, Counts) + + # Ensure fetch was called with the correct parameters + mock_vcf.fetch.assert_called_once_with("chr1", 0, 10) + + +@patch("harp.harp_io.pysam.VariantFile") +def test_load_variants_chunk_filters(mock_variantfile, tmp_path: Path): + """Test that non-SNVs are skipped.""" + mock_record = MagicMock() + mock_record.chrom = "chr1" + mock_record.pos = 5 + mock_record.ref = "AT" # not a SNV + mock_record.alts = ["T"] + + mock_vcf = MagicMock() + mock_vcf.fetch.return_value = [mock_record] + mock_variantfile.return_value = mock_vcf + + vcf_file = tmp_path / "dummy.vcf" + vcf_file.write_text("") + + variants = load_variants_chunk(vcf_file, "chr1", 0, 10) + assert variants == {} # non-SNV skipped diff --git a/tests/harp/test_harp_types.py b/tests/harp/test_harp_types.py new file mode 100644 index 0000000..6feb262 --- /dev/null +++ b/tests/harp/test_harp_types.py @@ -0,0 +1,37 @@ +import pytest + +from harp.harp_types import Counts, Variant, VariantKey + + +def test_counts_iadd(): + c1 = Counts(h1_ref=1, h1_alt=2, h2_ref=3, h2_alt=4) + c2 = Counts(h1_ref=10, h1_alt=20, h2_ref=30, h2_alt=40) + c1 += c2 + assert c1.h1_ref == 11 + assert c1.h1_alt == 22 + assert c1.h2_ref == 33 + assert c1.h2_alt == 44 + + +def test_variant_iadd_success(): + key = VariantKey("chr1", 100) + v1 = Variant(key, "A", "T", Counts(1, 1, 1, 1)) + v2 = Variant(key, "A", "T", Counts(2, 2, 2, 2)) + v1 += v2 + assert v1.counts.h1_ref == 3 + assert v1.counts.h2_alt == 3 + + +def test_variant_iadd_key_mismatch(): + v1 = Variant(VariantKey("chr1", 100), "A", "T", Counts()) + v2 = Variant(VariantKey("chr1", 101), "A", "T", Counts()) + with pytest.raises(ValueError): + v1 += v2 + + +def test_variant_iadd_allele_mismatch(): + key = VariantKey("chr1", 100) + v1 = Variant(key, "A", "T", Counts()) + v2 = Variant(key, "G", "T", Counts()) + with pytest.raises(ValueError): + v1 += v2 diff --git a/tests/harp/tesy_process.py b/tests/harp/tesy_process.py new file mode 100644 index 0000000..9912d77 --- /dev/null +++ b/tests/harp/tesy_process.py @@ -0,0 +1,76 @@ +from pathlib import Path + +from harp.harp_types import Counts, Variant, VariantKey +from harp.process import merge_results, process_chunk + + +def test_process_chunk_minimal(tmp_path: Path): + # Use dummy paths; we just want to make sure the function runs + bam_path = tmp_path / "dummy.bam" + vcf_path = tmp_path / "dummy.vcf" + + # Create empty files so function doesn't crash opening them + bam_path.write_bytes(b"") + vcf_path.write_text("") + + result = process_chunk(bam_path, vcf_path, "chr1", 0, 10) + # Should return a dict (empty in this minimal case) + assert isinstance(result, dict) + + +def test_process_chunk_return_type(tmp_path: Path): + # Dummy files again + bam_path = tmp_path / "dummy.bam" + vcf_path = tmp_path / "dummy.vcf" + + bam_path.write_bytes(b"") + vcf_path.write_text("") + + result = process_chunk(bam_path, vcf_path, "chr1", 0, 10) + # Keys (if any) should be VariantKey + for key, var in result.items(): + assert isinstance(key, VariantKey) + assert isinstance(var, Variant) + assert isinstance(var.counts, Counts) + + +def make_variant( + chrom: str, pos: int, h1_REF=0, h1_ALT=0, h2_REF=0, h2_ALT=0 +) -> Variant: + return Variant( + key=VariantKey(chrom, pos), + ref="A", + alt="T", + counts=Counts(h1_REF, h1_ALT, h2_REF, h2_ALT), + ) + + +def test_merge_results_add_new(): + global_variants = {} + chunk_result = {VariantKey("chr1", 10): make_variant("chr1", 10, h1_REF=1)} + + merge_results(global_variants, chunk_result) + + assert VariantKey("chr1", 10) in global_variants + var = global_variants[VariantKey("chr1", 10)] + assert var.counts.h1_REF == 1 + assert var.counts.h1_ALT == 0 + assert var.counts.h2_REF == 0 + assert var.counts.h2_ALT == 0 + + +def test_merge_results_merge_existing(): + v1 = make_variant("chr1", 10, h1_REF=1, h2_ALT=2) + v2 = make_variant("chr1", 10, h1_REF=3, h2_ALT=1) + + global_variants = {v1.key: v1} + chunk_result = {v2.key: v2} + + merge_results(global_variants, chunk_result) + var = global_variants[v1.key] + + # Counts should be summed + assert var.counts.h1_REF == 4 + assert var.counts.h1_ALT == 0 + assert var.counts.h2_REF == 0 + assert var.counts.h2_ALT == 3