Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ requires-python = ">=3.13"
dependencies = [
"click (>=8.3.0,<9.0.0)",
"pysam (>=0.23.3,<0.24.0)",
"pytest (>=8.4.2,<9.0.0)"
"pytest (>=8.4.2,<9.0.0)",
"tqdm (>=4.67.1,<5.0.0)"
]

[tool.poetry]
Expand Down
39 changes: 31 additions & 8 deletions src/harp/harp.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
from pathlib import Path

import click

from harp.pipeline import pipeline


@click.group()
@click.version_option(prog_name="HARP")
Expand All @@ -10,18 +14,37 @@ def cli():

@cli.command()
@click.option(
"--bam", type=click.Path(exists=True), help="Input BAM file (indexed)"
"--bam",
type=click.Path(exists=True, dir_okay=False, path_type=Path),
required=True,
help="Input BAM file (indexed)",
)
@click.option(
"--vcf",
type=click.Path(exists=True, dir_okay=False, path_type=Path),
required=True,
help="Input phased VCF file",
)
@click.option(
"--out",
type=click.Path(dir_okay=False, path_type=Path),
required=True,
help="Output TSV file",
)
@click.option(
"--vcf", type=click.Path(exists=True), help="Input phased VCF file"
"--threads",
type=int,
default=4,
show_default=True,
help="Number of parallel threads to use",
)
@click.option("--out", type=click.Path(), help="Output TSV file")
def run(bam, vcf, out):
def run(bam: Path, vcf: Path, out: Path, threads: int):
"""Compute haplotype-specific allele counts for SNVs."""
click.echo(f"BAM: {bam}")
click.echo(f"VCF: {vcf}")
click.echo(f"Output TSV: {out}")
click.echo("HARP CLI skeleton ready for future implementation.")
click.echo(
f"Running HARP on:\n BAM: {bam}\n VCF: {vcf}\n Output: {out}"
)
pipeline(bam_path=bam, vcf_path=vcf, out_path=out, threads=threads)
click.echo("HARP finished successfully.")


if __name__ == "__main__":
Expand Down
19 changes: 19 additions & 0 deletions src/harp/harp_errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
"""
Custom exceptions for HARP.
"""


class HarpError(Exception):
"""Base class for all HARP-specific errors."""


class VCFParsingError(HarpError):
"""Raised when a VCF file cannot be opened or parsed."""


class BAMProcessingError(HarpError):
"""Raised when a BAM file cannot be processed."""


class InvalidVariantError(HarpError):
"""Raised when variant data is malformed or inconsistent."""
67 changes: 67 additions & 0 deletions src/harp/harp_io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"""
I/O functions for reading variants in genomic intervals.
"""

from pathlib import Path
from typing import Iterator

import pysam

from harp.harp_errors import VCFParsingError
from harp.harp_types import Counts, Variant, VariantKey


def load_variants_chunk(
vcf_path: Path, chrom: str, start: int, end: int
) -> dict[VariantKey, Variant]:
"""
Load variants from a VCF only within a specified genomic interval.

Args:
vcf_path (Path): Path to VCF file (gzipped/indexed or plain).
chrom (str): Chromosome name.
start (int): 0-based start position (inclusive).
end (int): 0-based end position (exclusive).

Returns:
dict[VariantKey, Variant]: Dictionary of variants within the interval.
"""
try:
vcffile = pysam.VariantFile(str(vcf_path))
except Exception as e:
raise VCFParsingError(f"Failed to open VCF: {vcf_path}") from e

variants: dict[VariantKey, Variant] = {}

for record in vcffile.fetch(chrom, start, end):
# Only simple bi-allelic SNVs
if (
len(record.alts) != 1
or len(record.ref) != 1
or len(record.alts[0]) != 1
):
continue

key = VariantKey(chrom=chrom, pos=record.pos - 1)
variants[key] = Variant(
key=key,
ref=record.ref,
alt=record.alts[0],
counts=Counts(),
)

return variants


def open_bam_iterator(
bam_path: Path, chrom: str, start: int, end: int
) -> Iterator[pysam.AlignedSegment]:
"""
Open a BAM file, check it exists, and return an
iterator over reads for the given region.
"""
if not bam_path.exists():
raise FileNotFoundError(f"BAM file not found: {bam_path}")

bamfile = pysam.AlignmentFile(str(bam_path), "rb")
return bamfile.fetch(chrom, start, end)
85 changes: 85 additions & 0 deletions src/harp/harp_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
"""
Custom data types used throughout HARP.
"""

from dataclasses import dataclass


@dataclass(frozen=True)
class VariantKey:
"""
Represents a genomic variant key uniquely identified by chromosome
and position.

Attributes:
chrom (str): Chromosome name.
pos (int): 0-based genomic position.
"""

chrom: str
pos: int


@dataclass
class Counts:
"""
Stores allele counts for haplotypes 1 and 2.

Attributes:
h1_ref (int): Number of reads supporting REF allele on haplotype 1.
h1_alt (int): Number of reads supporting ALT allele on haplotype 1.
h2_ref (int): Number of reads supporting REF allele on haplotype 2.
h2_alt (int): Number of reads supporting ALT allele on haplotype 2.
"""

h1_ref: int = 0
h1_alt: int = 0
h2_ref: int = 0
h2_alt: int = 0

def __iadd__(self, other: "Counts") -> "Counts":
"""In-place addition of counts from another `Counts` object."""
self.h1_ref += other.h1_ref
self.h1_alt += other.h1_alt
self.h2_ref += other.h2_ref
self.h2_alt += other.h2_alt
return self


@dataclass
class Variant:
"""
Represents a single SNV with reference/alternate alleles and counts.

Attributes:
key (VariantKey): Genomic location of the variant.
ref (str): Reference allele.
alt (str): Alternate allele.
counts (Counts): Read support counts for each haplotype/allele.
"""

key: VariantKey
ref: str
alt: str
counts: Counts

def __iadd__(self, other: "Variant") -> "Variant":
"""
In-place addition of counts from another Variant.

Raises:
ValueError: if ref/alt alleles or key do not match.
"""
if self.key != other.key:
raise ValueError(
f"Cannot merge variants with different keys: "
f"{self.key} != {other.key}"
)
if self.ref != other.ref or self.alt != other.alt:
raise ValueError(
f"Cannot merge variants with different alleles: "
f"{self.ref}/{self.alt} != {other.ref}/{other.alt}"
)

self.counts += other.counts
return self
49 changes: 49 additions & 0 deletions src/harp/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from concurrent.futures import ProcessPoolExecutor, as_completed
from pathlib import Path

import pysam
from tqdm import tqdm

from harp.harp_types import Variant, VariantKey
from harp.process import merge_results, process_chunk
from harp.writer import write_results

CHUNK_SIZE = 200_000 # Increase to reduce number of futures


def pipeline(
bam_path: Path, vcf_path: Path, out_path: Path, threads: int
) -> None:
"""Run HARP pipeline with multiprocessing and efficient merging."""

global_variants: dict[VariantKey, Variant] = {}

# Collect chromosome lengths
with pysam.AlignmentFile(str(bam_path), "rb") as bamfile:
chrom_lengths = {
chrom: bamfile.get_reference_length(chrom)
for chrom in bamfile.references
}

# Submit jobs
futures = []
with ProcessPoolExecutor(max_workers=threads) as executor:
for chrom, length in chrom_lengths.items():
for start in range(0, length, CHUNK_SIZE):
end = min(start + CHUNK_SIZE, length)
futures.append(
executor.submit(
process_chunk, bam_path, vcf_path, chrom, start, end
)
)

# Merge as they complete
for fut in tqdm(
as_completed(futures), total=len(futures), desc="Processing chunks"
):
chunk_result = fut.result()
if chunk_result:
merge_results(global_variants, chunk_result)

# Write final results
write_results(out_path, global_variants)
Loading