diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index b4b1860..84d25fe 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -15,49 +15,139 @@ on: pull_request: workflow_dispatch: -permissions: - contents: read jobs: linux: runs-on: ubuntu-latest strategy: matrix: - target: [x86_64] + target: [x86_64, "aarch64", "armv7"] + manylinux: [auto] + # include: + # # musllinux + # - os: ubuntu + # target: x86_64-unknown-linux-musl + # manylinux: musllinux_1_2 + # - os: ubuntu + # target: aarch64-unknown-linux-musl + # manylinux: musllinux_1_2 + # - os: ubuntu + # target: armv7-unknown-linux-musleabihf + # manylinux: musllinux_1_2 steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 + with: + submodules: 'recursive' # Fetch Git submodules recursively + + # Initialize and update submodules (optional, if submodules were not initialized during checkout) + - name: Update submodules + run: git submodule update --init --recursive - uses: actions/setup-python@v4 with: - python-version: '3.10' + python-version: '3.12' - name: Build wheels uses: PyO3/maturin-action@v1 with: target: ${{ matrix.target }} - args: --release --out dist --find-interpreter + args: --release --out dist -i 3.8 3.9 3.10 3.11 3.12 pypy3.8 pypy3.9 pypy3.10 sccache: 'true' - manylinux: auto + manylinux: ${{ matrix.manylinux || 'auto' }} + - name: Install and test built x86_64 wheel + if: matrix.target == 'x86_64' + run: | + pip install mappy-rs --no-index --find-links dist --force-reinstall + pip install pytest + pytest -sv + # Test musl wheels fro x86 + - name: Install and test build x86_64 musl wheel + if: matrix.target == 'x86_64-unknown-linux-musl' + uses: addnab/docker-run-action@v3 + with: + image: alpine:latest + options: -v ${{ github.workspace }}:/io -w /io + run: | + apk add py3-pip + pip3 install -U pip pytest + pip3 install mappy-rs --no-index --find-links /io/dist/ --force-reinstall + python3 -m pytest -sv - name: Upload wheels uses: actions/upload-artifact@v3 with: name: wheels path: dist + test-builds-aarch: + name: run test of build on ${{ matrix.target }}-${{ matrix.distro }} + needs: [linux] + runs-on: ubuntu-latest + + strategy: + fail-fast: false + matrix: + target: [aarch64, armv7] + distro: ['ubuntu22.04'] + + steps: + - uses: actions/checkout@v4 + + - name: get dist artifacts + uses: actions/download-artifact@v3 + with: + name: wheels + path: dist + + - uses: uraimo/run-on-arch-action@v2.5.1 + name: install & test + with: + arch: ${{ matrix.target }} + distro: ${{ matrix.distro }} + githubToken: ${{ github.token }} + install: | + set -x + if command -v apt-get &> /dev/null; then + echo "installing python & pip with apt-get..." + apt-get update + apt-get install -y --no-install-recommends python3 python3-pip python3-venv + pip install -U pip pytest + else + echo "installing python & pip with apk..." + apk update + apk add py3-pip + pip3 install -U pip pytest + + fi + run: | + python3 -m pip install mappy-rs --no-index --find-links dist --force-reinstall + python3 -m pytest -sv + macos: - runs-on: macos-latest + runs-on: macos-13 strategy: matrix: target: [x86_64, aarch64] steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 + with: + submodules: 'recursive' # Fetch Git submodules recursively + + # Initialize and update submodules (optional, if submodules were not initialized during checkout) + - name: Update submodules + run: git submodule update --init --recursive - uses: actions/setup-python@v4 with: - python-version: '3.10' + python-version: '3.11' - name: Build wheels uses: PyO3/maturin-action@v1 with: target: ${{ matrix.target }} - args: --release --out dist --find-interpreter + args: --release --out dist -i 3.8 3.9 3.10 3.11 3.12 pypy3.8 pypy3.9 pypy3.10 sccache: 'true' + - name: Test built wheels + if: matrix.target == 'x86_64' + run: | + pip install mappy-rs --no-index --find-links dist --force-reinstall + pip install pytest + pytest -sv - name: Upload wheels uses: actions/upload-artifact@v3 with: @@ -67,7 +157,13 @@ jobs: sdist: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 + with: + submodules: 'recursive' # Fetch Git submodules recursively + + # Initialize and update submodules (optional, if submodules were not initialized during checkout) + - name: Update submodules + run: git submodule update --init --recursive - name: Build sdist uses: PyO3/maturin-action@v1 with: @@ -83,7 +179,7 @@ jobs: name: Release runs-on: ubuntu-latest if: "startsWith(github.ref, 'refs/tags/')" - needs: [linux, macos, sdist] + needs: [linux, test-builds-aarch, macos, sdist] permissions: # Used to upload release artifacts contents: write diff --git a/.github/workflows/check.yml b/.github/workflows/check.yml index 5c1cc1d..5b91115 100644 --- a/.github/workflows/check.yml +++ b/.github/workflows/check.yml @@ -3,21 +3,28 @@ name: check on: push: - branches: [main] + branches: [main, feature/*] + tags: + - '*' pull_request: workflow_dispatch: schedule: - cron: '0 0 * * SUN' env: - PYTHON_LATEST: "3.11" + PYTHON_LATEST: "3.12" jobs: pre-commit: name: "pre-commit" runs-on: "ubuntu-latest" steps: - - uses: "actions/checkout@v3" + - uses: "actions/checkout@v4" + with: + submodules: 'recursive' # Fetch Git submodules recursively + # Initialize and update submodules (optional, if submodules were not initialized during checkout) + - name: Update submodules + run: git submodule update --init --recursive - uses: "actions/setup-python@v4" with: python-version: ${{env.PYTHON_LATEST}} @@ -33,7 +40,13 @@ jobs: matrix: toolchain: [stable, beta, nightly] steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 + with: + submodules: 'recursive' # Fetch Git submodules recursively + + # Initialize and update submodules (optional, if submodules were not initialized during checkout) + - name: Update submodules + run: git submodule update --init --recursive - uses: "dtolnay/rust-toolchain@master" with: toolchain: ${{ matrix.toolchain }} @@ -46,7 +59,13 @@ jobs: runs-on: ubuntu-latest needs: clippy steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 + with: + submodules: 'recursive' # Fetch Git submodules recursively + + # Initialize and update submodules (optional, if submodules were not initialized during checkout) + - name: Update submodules + run: git submodule update --init --recursive - uses: dtolnay/rust-toolchain@stable - run: cargo test --no-default-features @@ -56,10 +75,16 @@ jobs: needs: clippy strategy: matrix: - python-version: ["3.8", "3.9", "3.10", "3.11"] + python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"] steps: - - uses: "actions/checkout@v3" + - uses: "actions/checkout@v4" + with: + submodules: 'recursive' # Fetch Git submodules recursively + + # Initialize and update submodules (optional, if submodules were not initialized during checkout) + - name: Update submodules + run: git submodule update --init --recursive - uses: "actions/setup-python@v4" with: python-version: "${{ matrix.python-version }}" @@ -79,10 +104,16 @@ jobs: needs: ["cargo-test", "python-test"] strategy: matrix: - msrv: ["1.63.0", "1.64.0"] + msrv: ["1.76.0", "1.77.0"] name: "ubuntu / ${{ matrix.msrv }}" steps: - - uses: "actions/checkout@v3" + - uses: "actions/checkout@v4" + with: + submodules: 'recursive' # Fetch Git submodules recursively + + # Initialize and update submodules (optional, if submodules were not initialized during checkout) + - name: Update submodules + run: git submodule update --init --recursive - uses: "dtolnay/rust-toolchain@master" with: toolchain: "${{ matrix.msrv }}" diff --git a/.gitignore b/.gitignore index 56a6f41..eb9b3ad 100644 --- a/.gitignore +++ b/.gitignore @@ -271,3 +271,6 @@ cython_debug/ # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ + +# pytest Benchmark histogram output +*.svg diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..bc886f0 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,4 @@ +[submodule "minimap2-rs"] + path = minimap2-rs + url = https://github.com/Adoni5/minimap2-rs.git + branch = main diff --git a/Cargo.toml b/Cargo.toml index d060cd4..79b913b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mappy-rs" -version = "0.0.7" +version = "0.0.8-alpha.1" edition = "2021" authors = ["Rory Munro "] license = "MIT OR Apache-2.0" @@ -21,13 +21,12 @@ name = "mappy_rs" [dependencies] libc = "0.2" pyo3 = { version = "0.19" } -minimap2-sys = { version = "0.1.15+minimap2.2.26", features = ["simde"] } crossbeam = "0.8.2" fnv = "1.0.7" ctrlc = {version = "3.4.0", features = ["termination"] } itertools = "0.10.5" -minimap2 = {version = "0.1.15+minimap2.2.26" } - +minimap2-sys = { path = "./minimap2-rs/minimap2-sys", features = ["simde"] } +minimap2 = { path = "./minimap2-rs", features = ["simde"] } [features] extension-module = ["pyo3/extension-module"] default = ["extension-module"] diff --git a/README.md b/README.md index fd8214c..365aa87 100644 --- a/README.md +++ b/README.md @@ -103,6 +103,11 @@ test_benchmark_multi[1] | 84.8418 (3.16)| 94.0907 (3.13) | 86.7404 (3.09)| # Changelog +## 0.0.8 +- Updated `map_batch` - now has a max attempts parameter, which defaults to 6. If set to `None`, there is no limit to the number of attempts to add a fastq to the mapping batch. + +## 0.0.7 +- Added wheel builds for aarch64 linux and MacOS, and armv7 in the CI/yml. ## 0.0.6 - Lowered backoff time for `map_batch` to 50 milliseconds, with 6 attempts. Each attempt will double the previous back off time. diff --git a/help.py b/help.py new file mode 100644 index 0000000..456b70c --- /dev/null +++ b/help.py @@ -0,0 +1,9 @@ +# Test the memory issue, not dropping Aligner on rust +# side after del without manual gc.collect() +from mappy_rs import Aligner +import gc + +al = Aligner("/home/adoni5/Documents/Bioinformatics/refs/hg38_no_alts_22.mmi") +al.enable_threading(8) +del al +gc.collect() diff --git a/minimap2-rs b/minimap2-rs new file mode 160000 index 0000000..5f0a940 --- /dev/null +++ b/minimap2-rs @@ -0,0 +1 @@ +Subproject commit 5f0a940c8ab0819bd055a446a55127b6075c240e diff --git a/pyproject.toml b/pyproject.toml index bce5ba1..7006caa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,7 +16,7 @@ classifiers = [ "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", ] -version = "0.0.7a" +version = "0.0.8a1" license = {text = "BSD 3-Clause License"} description = "A multithreaded python wrapper for rust bindings of minimap2." authors = [ diff --git a/src/lib.rs b/src/lib.rs index d12c538..63f3fd7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,11 +3,13 @@ //! Designed for use with readfish https://github.com/LooseLab/readfish/ #![deny(missing_docs)] #![deny(clippy::missing_docs_in_private_items)] - +#![allow(forgetting_copy_types)] use crossbeam::channel::{bounded, Receiver, RecvError, Sender}; use crossbeam::queue::ArrayQueue; use fnv::FnvHashMap; use itertools::all; +use minimap2_sys::mm_idx_destroy; + use pyo3::exceptions::{ PyKeyError, PyNotImplementedError, PyRuntimeError, PyTypeError, PyValueError, }; @@ -16,7 +18,8 @@ use pyo3::pyclass::IterNextOutput; use pyo3::types::{PyIterator, PyList, PySequence, PyTuple}; use pyo3::FromPyObject; use std::collections::HashMap; -use std::fmt::{Display, Formatter}; +use std::fmt::{Debug, Display, Formatter}; +use std::ptr::NonNull; use std::sync::{Arc, Mutex}; use std::time::Duration; use std::{mem, thread}; @@ -287,7 +290,6 @@ impl Mapping { /// Aligner struct, mimicking minimap2's python interface #[pyclass(unsendable)] #[allow(clippy::type_complexity)] - pub struct Aligner { /// Inner minimap2::Aligner pub aligner: minimap2::Aligner, @@ -303,7 +305,22 @@ pub struct Aligner { results_queue: Arc, usize)>>>, } // unsafe impl Send for Aligner {} - +impl Drop for Aligner { + fn drop(&mut self) { + { + let mut stop = self.stop.lock().unwrap(); + *stop = true; + } + std::thread::sleep(Duration::from_millis(10)); + if self.aligner.idx.is_some() { + let c_ptr = self.aligner.idx.as_mut().unwrap().as_mut_ptr(); + unsafe { + mm_idx_destroy(c_ptr); + } + self.aligner.idx = None; + } + } +} #[pymethods] impl Aligner { /// Initialise a new Py Class Aligner @@ -415,12 +432,16 @@ impl Aligner { // Idx index name minimap2_sys::mm_idx_index_name(idx.assume_init()); }; + let idx_raw = unsafe { idx.assume_init() }; + // Safety check before wrapping the raw pointer + let idx_safe = NonNull::new(idx_raw).ok_or("Ahhhh").unwrap(); + let my_safe_pointer = minimap2::MySafePointer { ptr: idx_safe }; let al = Aligner { aligner: minimap2::Aligner { mapopt: mapopts, idxopt: idxopts, threads: n_threads, - idx: Some(unsafe { *idx.assume_init() }), + idx: Some(my_safe_pointer), idx_reader: Some(unsafe { *idx_reader }), }, n_threads: 0, @@ -442,12 +463,15 @@ impl Aligner { return Err(PyRuntimeError::new_err("Index hasn't loaded")); } unsafe { - let ns = (self.aligner.idx.unwrap()).n_seq; + let ns = (*self.aligner.idx.as_ref().unwrap().as_ptr()).n_seq; let mut sn = Vec::with_capacity(ns as usize); for i in 0..ns { sn.push( std::ffi::CStr::from_ptr( - (*((self.aligner.idx.unwrap()).seq.offset(i as isize))).name, + (*((*(self.aligner.idx.as_ref().unwrap().as_ptr())) + .seq + .offset(i as isize))) + .name, ) .to_str() .unwrap() @@ -472,7 +496,13 @@ impl Aligner { /// Map a single read, blocking #[pyo3(signature = (seq, seq2=None, cs=false, MD=false), text_signature = "(seq, seq2=None, cs=False, MD=False)")] #[allow(non_snake_case)] - fn map(&self, seq: String, seq2: Option, cs: bool, MD: bool) -> PyResult> { + fn map( + &mut self, + seq: String, + seq2: Option, + cs: bool, + MD: bool, + ) -> PyResult> { // TODO: PyIterProtocol to map single reads and return as a generator if let Some(_seq2) = seq2 { return Err(PyNotImplementedError::new_err( @@ -542,7 +572,7 @@ impl Aligner { self.n_threads = n_threads; let dones = Arc::new(Mutex::new(vec![false; n_threads])); for i in 0..n_threads { - let _aligner = self.aligner.clone(); + let mut _aligner = self.aligner.clone(); let stop = Arc::clone(&self.stop); let wq = Arc::clone(&self.work_queue); let rq = Arc::clone(&self.results_queue); @@ -636,13 +666,19 @@ impl Aligner { } /// Align a sequence Optionally back off if we fail to add the sequence to the queue, in the case that the work queue is full. - #[pyo3(signature = (seqs, back_off=true))] - fn map_batch(&self, seqs: &PyAny, back_off: bool) -> PyResult { + /// This function will block until the sequence has been added to the queue. If max_attempts is None and back_off is true, there is no (effective) limit to the number of attempts. + #[pyo3(signature = (seqs, back_off=true, max_attempts=6))] + fn map_batch( + &self, + seqs: &PyAny, + back_off: bool, + max_attempts: Option, + ) -> PyResult { let mut res = AlignmentBatchResultIter::new(); // Set the number of threads res.set_n_threads(self.n_threads); // do the heavy work - self._map_batch(&mut res, seqs, back_off)?; + self._map_batch(&mut res, seqs, back_off, max_attempts)?; // let return_metadata: (i32, i32, String) = (metadata.read_number, metadata.channel_number, String::from("hdea")); Ok(res) } @@ -655,18 +691,98 @@ impl Aligner { /// Get the k value from the index. #[getter] fn k(&self) -> PyResult { - Ok(self.aligner.idx.unwrap().k) + Ok(unsafe { *self.aligner.idx.as_ref().unwrap().as_ptr() }.k) } /// Get the w value form the index. #[getter] fn w(&self) -> PyResult { - Ok(self.aligner.idx.unwrap().w) + Ok(unsafe { *self.aligner.idx.as_ref().unwrap().as_ptr() }.w) } /// Get the number of sequences present in the index #[getter] fn n_seq(&self) -> PyResult { - Ok(self.aligner.idx.unwrap().n_seq) + Ok(unsafe { *self.aligner.idx.as_ref().unwrap().as_ptr() }.n_seq) + } +} + +/// Pushes an item onto a `crossbeam::queue::ArrayQueue` with optional backoff support. +/// +/// The function tries to push an item `T` onto a given queue. If the push operation fails, +/// it will automatically retry the operation up to `max_attempts` times if `back_off` is `true`. +/// +/// # Parameters +/// * `work_queue`: Reference to the `ArrayQueue` to push the item onto. +/// * `item`: The item to be pushed onto the queue. +/// * `back_off`: A boolean flag that, when `true`, will engage a backoff mechanism on failure. +/// * `id_num`: An identifier number for logging purposes. +/// * `max_attempts`: The maximum number of attempts to push an item onto the queue. Defaults to 6, if None. +/// +/// # Examples +/// ```rust,ignore +/// use crossbeam::queue::ArrayQueue; +/// use pyo3::prelude::*; +/// +/// fn main() -> PyResult<()> { +/// let queue = ArrayQueue::new(100); +/// let item = "my_item"; +/// let back_off = true; +/// let id_num = 42; +/// +/// push_with_backoff(&queue, item, back_off, id_num, usize::MAX)?; +/// Ok(()) +/// } +/// ``` +/// +/// # Returns +/// * `Ok(())` if the push operation is successful or `Err(PyErr)` otherwise. +/// +/// # Panics +/// This function will not panic. +/// +/// # Errors +/// Returns a `PyErr` under the following conditions: +/// * If the queue is full and `back_off` is `false` +/// * If the queue remains full after `max_attempts` with `back_off` set to `true` +/// +fn push_with_backoff( + work_queue: &crossbeam::queue::ArrayQueue, + item: T, + back_off: bool, + id_num: usize, + max_attempts: Option, +) -> PyResult<()> { + match work_queue.push(item) { + Ok(()) => Ok(()), + Err(e) => { + if back_off { + let mut attempts = 0; + let max_attempts = max_attempts.unwrap_or(usize::MAX); + let sleep_duration = Duration::from_millis(50); + + while attempts < max_attempts { + if work_queue.push(e.clone()).is_ok() { + return Ok(()); + } + + attempts += 1; + thread::sleep(sleep_duration); + // sleep_duration *= 2; + } + + eprintln!("Internal error adding data to work queue, with backoff. {e:#?}, {id_num}, Attempts: {attempts}"); + } else { + eprintln!( + "Internal error adding data to work queue, without backoff. {e:#?} {id_num}" + ); + return Err(PyErr::new::(format!( + "Internal error adding data to work queue, without backoff. {e:#?} {id_num}. Is your fastq batch larger than 50000? Perhaps try `map_batch` with back_off=True?", + e = e, + id_num = id_num + ))); + } + Ok(()) + } } } @@ -708,31 +824,52 @@ impl Aligner { return Err("No index"); } if (self.aligner.mapopt.flag & minimap2_sys::MM_F_CIGAR as i64 != 0) - && (self.aligner.idx.unwrap().flag & minimap2_sys::MM_I_NO_SEQ as i32 != 0) + && (unsafe { *self.aligner.idx.as_ref().unwrap().as_ptr() }.flag + & minimap2_sys::MM_I_NO_SEQ as i32 + != 0) { return Err("No sequence in this index"); } let ref_seq_id: i32 = unsafe { - minimap2_sys::mm_idx_name2id( - self.aligner.idx.as_ref().unwrap() as *const minimap2_sys::mm_idx_t, - std::ffi::CString::new(name) - .unwrap() - .as_bytes_with_nul() - .as_ptr() as *const i8, - ) + // conditionally compile using the correct pointer type (u8 or i8) for the platform and architecture + #[cfg(any( + all(target_arch = "aarch64", target_os = "linux"), + all(target_arch = "arm", target_os = "linux") + ))] + { + minimap2_sys::mm_idx_name2id( + self.aligner.idx.as_ref().unwrap().as_ptr(), + std::ffi::CString::new(name) + .unwrap() + .as_bytes_with_nul() + .as_ptr() as *const u8, + ) + } + #[cfg(any( + all(target_arch = "aarch64", target_os = "macos"), + all(target_arch = "x86_64", target_os = "linux"), + all(target_arch = "x86_64", target_os = "macos") + ))] + { + minimap2_sys::mm_idx_name2id( + self.aligner.idx.as_ref().unwrap().as_ptr(), + std::ffi::CString::new(name) + .unwrap() + .as_bytes_with_nul() + .as_ptr() as *const i8, + ) + } }; - if (ref_seq_id < 0) | (ref_seq_id as u32 >= self.aligner.idx.unwrap().n_seq) { + if (ref_seq_id < 0) + | (ref_seq_id as u32 >= unsafe { *self.aligner.idx.as_ref().unwrap().as_ptr() }.n_seq) + { return Err("Could not find reference in index"); } let ref_seq_offset = unsafe { - *(self - .aligner - .idx - .as_ref() - .unwrap() + *(*(self.aligner.idx.as_ref().unwrap().as_ptr())) .seq - .offset(ref_seq_id as isize)) + .offset(ref_seq_id as isize) }; let ref_seq_len = ref_seq_offset.len as i32; if start >= ref_seq_len || start >= end { @@ -745,7 +882,7 @@ impl Aligner { let mut seq_buf: Vec = vec![0; seq_len as usize]; let _len = unsafe { minimap2_sys::mm_idx_getseq( - self.aligner.idx.as_ref().unwrap() as *const minimap2_sys::mm_idx_t, + self.aligner.idx.as_ref().unwrap().as_ptr(), ref_seq_id as u32, start as u32, end as u32, @@ -773,6 +910,7 @@ impl Aligner { res: &mut AlignmentBatchResultIter, seqs: &PyAny, back_off: bool, + max_attempts: Option, ) -> PyResult<()> { if self.n_threads == 0_usize { return Err(PyRuntimeError::new_err( @@ -864,42 +1002,17 @@ impl Aligner { )) } }; - match work_queue.push(WorkQueue::Work((id_num, seq))) { - Ok(()) => {} - Err(e) => { - if back_off { - let mut attempts = 0; - let max_attempts = 6; - let mut sleep_duration = Duration::from_millis(50); // Initial sleep duration (in milliseconds) - - while attempts < max_attempts { - if work_queue.push(e.clone()).is_ok() { - break; // Operation succeeded - } - - attempts += 1; - thread::sleep(sleep_duration); - - // Increase the sleep duration exponentially - sleep_duration *= 2; - } - if attempts == 6 { - eprintln!("Internal error adding data to work queue, with backoff. {e:#?}, {id_num}, Attempts: {attempts}"); - } - } else { - eprintln!("Internal error adding data to work queue, without backoff. {e:#?} {id_num}"); - return Err(PyErr::new::(format!( - "Internal error adding data to work queue, without backoff. {e:#?} {id_num}. Is your fastq batch larger than 50000? Perhaps try `map_batch` with back_off=True?", - e = e, - id_num = id_num - ))); - } - } - } + push_with_backoff( + &work_queue, + WorkQueue::Work((id_num, seq)), + back_off, + id_num, + max_attempts, + )?; } // Now we add n_thread dones, one for each thread. When the threads see this they know to close as there is no more data for _ in 0..self.n_threads { - work_queue.push(WorkQueue::Done).unwrap(); + push_with_backoff(&work_queue, WorkQueue::Done, back_off, 0, max_attempts)?; } Ok(()) @@ -908,6 +1021,7 @@ impl Aligner { /// Python iterable types that are accepted by the `Aligner.map_batch()` function #[derive(FromPyObject)] +#[allow(dead_code)] enum SupportedTypes<'py> { /// PyList wrapper List(&'py PyList), @@ -1036,6 +1150,32 @@ mod tests { ) } + // #[test] + // #[allow(dropping_references)] + // fn test_drop_and_recreate() { + // let mut al = Aligner::py_new( + // Some(PathBuf::from( + // "/home/adoni5/Documents/Bioinformatics/refs/hg38_no_alts_22.mmi", + // )), + // None, + // None, + // None, + // None, + // None, + // None, + // None, + // None, + // 4_usize, + // None, + // None, + // None, + // None, + // None, + // ) + // .unwrap(); + // std::mem::drop(&mut al); + // } + #[test] fn load_index() { let al = get_test_aligner().unwrap(); @@ -1092,7 +1232,7 @@ mod tests { #[test] fn map_one() { - let al = get_test_aligner().unwrap(); + let mut al = get_test_aligner().unwrap(); let mappings = al.map( String::from("AGAGCAGGTAGGATCGTTGAAAAAAGAGTACTCAGGATTCCATTCAACTTTTACTGATTTGAAGCGTACTGTTTATGGCC\ AAGAATATTTACGTCTTTACAACCAATACGCAAAAAAAGGTTCATTGAGTTTGGTTGTGATTTGATGAAAATTACTGAGA\ diff --git a/tests/benchmark.py b/tests/benchmark.py index 43a8325..7bde6d2 100644 --- a/tests/benchmark.py +++ b/tests/benchmark.py @@ -2,12 +2,14 @@ import mappy as mp import pytest from pathlib import Path +import gc +from itertools import cycle RESOURCES = ( Path(__file__).parent.resolve().parent.resolve() / "resources/benchmarking" ) FASTQ_PATH = RESOURCES / "fastq" -INDEX_PATH = RESOURCES / "index/hg38_simple.mmi" +INDEX_PATH = RESOURCES / "index/hg38_no_alts_22.mmi" _FILE_SUFFIXES = set([".fq", ".fastq", ".fastq.gz", ".fq.qz"]) @@ -22,12 +24,14 @@ def _gen_fastq(path: Path): for f in path.iterdir(): if set(f.suffixes).intersection(_FILE_SUFFIXES): yield from mp.fastx_read(str(f)) + else: if set(path.suffixes).intersection(_FILE_SUFFIXES): yield from mp.fastx_read(str(f)) N_READS = sum(1 for _ in _gen_fastq(FASTQ_PATH)) +print(N_READS) @pytest.fixture(scope="module") @@ -37,12 +41,18 @@ def fasta(): @pytest.fixture def mappy_al_rs(): - return mappy_rs.Aligner(str(INDEX_PATH)) + al = mappy_rs.Aligner(str(INDEX_PATH)) + yield al + del al + gc.collect() @pytest.fixture def mappy_al(): - yield mp.Aligner(str(INDEX_PATH)) + al = mp.Aligner(str(INDEX_PATH)) + yield al + del al + gc.collect() @pytest.fixture @@ -71,20 +81,38 @@ def _align(al, seqs): return n -@pytest.mark.parametrize("aligner", ["mappy_al", "mappy_al_rs"], indirect=True) -def test_classic_mappy(benchmark, aligner, fasta): - n = benchmark.pedantic( - _align, args=(aligner, fasta), iterations=1, rounds=1 - ) - assert N_READS == n - print("Finished classic mappy round") - - -@pytest.mark.parametrize("threads", list(range(1, 6))) -def test_benchmark_multi(threads, benchmark, mappy_al_rs, fasta): - mappy_al_rs.enable_threading(threads) +# @pytest.mark.parametrize( +# "aligner", +# ["mappy_al", "mappy_al_rs"], +# indirect=True, +# ) +# def test_classic_mappy(benchmark, aligner, fasta): +# n = benchmark.pedantic( +# _align, args=(aligner, fasta), iterations=1, rounds=1 +# ) +# print(n) +# assert N_READS == n +# print("Finished classic mappy round") + + +@pytest.mark.parametrize( + "aligner, threads", + [ + ("mappy_al", None), + ("mappy_al_rs", None), + ] + + list( + zip(cycle(("mappy_al_rs",)), range(1, 17)) + ), # Add more thread counts as needed + indirect=["aligner"], +) +def test_aligner_with_threads(benchmark, aligner, threads, fasta): + if threads: # Assuming this is the class name for mappy_rs.Aligner + aligner.enable_threading(threads) n = benchmark.pedantic( - align_multi, args=(mappy_al_rs, fasta), iterations=1, rounds=1 + align_multi if threads else _align, + args=(aligner, fasta), + iterations=5, + rounds=5, ) assert N_READS == n - print("Finished threaded mappy round") diff --git a/tests/python_test.py b/tests/python_test.py index fc4fcea..68e44cd 100644 --- a/tests/python_test.py +++ b/tests/python_test.py @@ -133,35 +133,36 @@ def test_map_one(al): ) assert len(mappings) == 1 mapping = mappings[0] + print(mapping) assert mapping.target_start == 0 assert mapping.target_end == 400 -def test_map_batch_100000(al, fasta_iter): +def test_map_batch_60000(al, fasta_iter): al.enable_threading(4) - iter_ = repeat(next(fasta_iter), 100000) + iter_ = repeat(next(fasta_iter), 60000) mappings = al.map_batch(iter_, back_off=True) n = 0 - for res in mappings: + for _res in mappings: n += 1 - assert n == 100000 - - -def test_map_batch_100000_no_backoff(al, fasta_iter): - al.enable_threading(4) - iter_ = repeat(next(fasta_iter), 100000) - with pytest.raises(RuntimeError) as excinfo: - mappings = al.map_batch(iter_, back_off=False) - n = 0 - for res in mappings: - n += 1 - assert "Internal error adding data to work queue, without backoff" in str( - excinfo - ) - assert ( - "Is your fastq batch larger than 50000? Perhaps try" - " `map_batch` with back_off=True?" in str(excinfo) - ) + assert n == 60000 + + +# def test_map_batch_100000_no_backoff(al, fasta_iter): +# al.enable_threading(4) +# iter_ = repeat(next(fasta_iter), 100000) +# with pytest.raises(RuntimeError) as excinfo: +# mappings = al.map_batch(iter_, back_off=True) +# n = 0 +# for res in mappings: +# n += 1 +# assert "Internal error adding data to work queue, without backoff" in str( +# excinfo +# ) +# assert ( +# "Is your fastq batch larger than 50000? Perhaps try" +# " `map_batch` with back_off=True?" in str(excinfo) +# ) @pytest.mark.parametrize(