|
41 | 41 | import io |
42 | 42 | from pathlib import Path |
43 | 43 | import re |
44 | | -import tempfile |
45 | 44 | from typing import Any |
46 | 45 | from urllib.parse import urlparse, parse_qs |
47 | 46 |
|
| 47 | +from pysam.libcalignedsegment import AlignedSegment |
| 48 | + |
| 49 | +import htslurp |
48 | 50 | from pydantic import HttpUrl, validate_call |
49 | 51 | from pydantic.dataclasses import dataclass |
50 | 52 | import pysam |
51 | 53 | import requests |
52 | 54 |
|
53 | 55 | from modos.remote import get_session |
54 | 56 | from modos.genomics.region import Region |
55 | | -from modos.genomics.formats import GenomicFileSuffix, read_pysam |
| 57 | +from modos.genomics.formats import GenomicFileSuffix |
56 | 58 |
|
57 | 59 |
|
58 | 60 | @validate_call |
@@ -255,39 +257,57 @@ def to_file(self, path: Path): |
255 | 257 | for block in source: |
256 | 258 | sink.write(block) |
257 | 259 |
|
| 260 | + @property |
| 261 | + def format(self) -> str: |
| 262 | + return GenomicFileSuffix.from_path(self.path).name |
| 263 | + |
258 | 264 | @classmethod |
259 | 265 | def from_url(cls, url: str): |
260 | 266 | """Open connection directly from an htsget URL.""" |
261 | 267 | host, path, region = parse_htsget_url(url) |
262 | 268 | return cls(host, path, region=region) |
263 | 269 |
|
| 270 | + def records(self, reference: Path | None = None) -> htslurp.RecordIter: |
| 271 | + records = htslurp.stream_records( |
| 272 | + base_url=self.url, |
| 273 | + id=str(self.path), |
| 274 | + format=self.format, |
| 275 | + region=self.region, |
| 276 | + reference=reference, |
| 277 | + ) |
| 278 | + return records |
| 279 | + |
264 | 280 | def to_pysam( |
265 | | - self, reference_filename: str | None = None |
| 281 | + self, reference_filename: Path | None = None |
266 | 282 | ) -> Iterator[pysam.AlignedSegment | pysam.VariantRecord]: |
267 | 283 | """Convert the stream to a pysam object.""" |
268 | 284 |
|
269 | | - # NOTE: pysam needs a path or file descriptor, |
270 | | - # we have to stream from drive until this is addressed: |
| 285 | + # NOTE: we use a dedicated client because pysam does not support bytestreams |
271 | 286 | # ref: https://github.com/pysam-developers/pysam/blob/0787ca9da997b5911c00fd12584dad9741c82fb4/pysam/libcalignmentfile.pyx#L855 |
272 | | - # TODO: when above addressed, replace temporary file with |
| 287 | + # TODO: if above addressed, replace temporary file with |
273 | 288 | # self.open() to stream directly from in-memory buffer. |
274 | | - buffer = tempfile.NamedTemporaryFile( |
275 | | - "w+b", delete=False, suffix="".join(self.path.suffixes) |
276 | | - ).name |
277 | 289 |
|
278 | | - self.to_file(Path(buffer)) |
279 | | - buffer = read_pysam( |
280 | | - Path(buffer), reference_filename=reference_filename |
281 | | - ) |
| 290 | + stream = self.records(reference_filename) |
| 291 | + |
| 292 | + for record in stream: |
| 293 | + match self.format: |
| 294 | + case "CRAM" | "BAM" | "SAM": |
| 295 | + parsed = AlignedSegment.fromstring( |
| 296 | + record.decode(), stream.header |
| 297 | + ) |
| 298 | + case _: |
| 299 | + # NOTE: pysam does not support instantiating VariantRecord on the fly. |
| 300 | + raise ValueError( |
| 301 | + f"Cannot convert {self.format} records to pysam." |
| 302 | + ) |
282 | 303 |
|
283 | | - for record in buffer: |
284 | 304 | if self.region is None: |
285 | | - yield record |
| 305 | + yield parsed |
286 | 306 | continue |
287 | 307 |
|
288 | 308 | # htsget includes all returns in the bgzf block |
289 | 309 | # we filter out records outside requested region |
290 | 310 | record_region = Region.from_pysam(record) |
291 | 311 | if not record_region.overlaps(self.region): |
292 | 312 | continue |
293 | | - yield record |
| 313 | + yield parsed |
0 commit comments