Skip to content

Commit 3515c83

Browse files
committed
simplify and include a parallel block iterator in binsize>1 calcs
1 parent fe8b70b commit 3515c83

File tree

2 files changed

+38
-87
lines changed

2 files changed

+38
-87
lines changed

src/covcalc.rs

Lines changed: 11 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
use rust_htslib::bam::{self, Read, IndexedReader, record::Cigar};
2+
use rust_htslib::bam::ext::BamRecordExtensions;
23
use std::collections::HashMap;
34
use tempfile::{Builder, TempPath};
45
use std::io::{BufWriter, Write};
56
use std::cmp::min;
67
use std::fmt;
78
use ndarray::Array1;
9+
use rayon::prelude::*;
10+
use std::collections::HashSet;
811

912
pub fn parse_regions(regions: &Vec<(String, u32, u32)>, bam_ifile: Vec<&str>) -> (Vec<Region>, HashMap<String, u32>) {
1013
// Takes a vector of regions, and a bam reference
@@ -172,41 +175,14 @@ pub fn bam_pileup<'a>(
172175
readlens.push(record.seq_len() as u32);
173176
}
174177
}
175-
176-
let blocks = bam_blocks(record);
177-
// keep a record of bin indices that have been updated already
178-
let mut changedbins: Vec<u32> = Vec::new();
179-
for block in blocks.into_iter() {
180-
// Don't count blocks that exceed the chromosome
181-
if block.0 >= region.2 {
182-
continue;
183-
}
184-
binix = block.0 / binsize;
185-
if !changedbins.contains(&binix) {
186-
counts[binix as usize] += 1.0;
187-
changedbins.push(binix);
188-
}
189-
// Don't count blocks that exceed the chromosome
190-
if block.1 >= region.2 {
191-
continue;
192-
}
193-
// Note that our blocks are strictly less then region ends, hence no problem with bin spewing at end:
194-
// binix = [a,b) where b == region.2 would divide into binix+1 (doesn't exist).
195-
binix = block.1 / binsize;
196-
if !changedbins.contains(&binix) {
197-
counts[binix as usize] += 1.0;
198-
changedbins.push(binix);
199-
}
200-
}
201-
// if changedbins contains non-continuous blocks (perhaps read length spans multiple bins), update the inbetweens
202-
if let (Some(&min), Some(&max)) = (changedbins.iter().min(), changedbins.iter().max()) {
203-
for ix in min..=max {
204-
if !changedbins.contains(&ix) {
205-
counts[ix as usize] += 1.0;
206-
changedbins.push(ix);
207-
}
208-
}
209-
}
178+
let indices: HashSet<usize> = record.aligned_blocks()
179+
.par_bridge()
180+
.filter(|x| (x[1] as u32) < region.2)
181+
.flat_map(|x| x[0] as u32..x[1] as u32)
182+
.map(|x| (x / binsize) as usize)
183+
.collect();
184+
indices.into_iter()
185+
.for_each(|ix| counts[ix] += 1.0);
210186
}
211187
let mut writer = BufWriter::new(&bg);
212188
// There are two scenarios:
@@ -372,32 +348,6 @@ pub fn bam_pileup<'a>(
372348
return (tmpvec, mapped_reads, unmapped_reads, readlens, fraglens);
373349
}
374350

375-
/// Extract contigious blocks from a bam record
376-
/// Blocks are split per insertion, padding, deletion and ref skip
377-
fn bam_blocks(rec: bam::Record) -> Vec<(u32, u32)> {
378-
let mut blocks: Vec<(u32, u32)> = Vec::new();
379-
let mut pos = rec.pos();
380-
381-
for c in rec.cigar().iter() {
382-
match c {
383-
Cigar::Match(len) | Cigar::Equal(len) | Cigar::Diff(len) => {
384-
let start = pos;
385-
let end = pos + *len as i64 -1;
386-
blocks.push((start as u32, end as u32));
387-
pos = end;
388-
}
389-
Cigar::Ins(len) | Cigar::Pad(len) => {
390-
pos += *len as i64;
391-
}
392-
Cigar::Del(len) | Cigar::RefSkip(len) => {
393-
pos += *len as i64;
394-
}
395-
_ => (),
396-
}
397-
}
398-
return blocks;
399-
}
400-
401351
pub struct Alignmentfilters {
402352
pub minmappingquality: u8,
403353
pub samflaginclude: u16,

src/multibamsummary.rs

Lines changed: 27 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use rayon::prelude::*;
44
use rayon::ThreadPoolBuilder;
55
use itertools::{multizip, multiunzip, izip};
66
use std::io::prelude::*;
7-
use std::io::BufReader;
7+
use std::io::{BufReader, BufWriter};
88
use std::fs::File;
99
use ndarray::{Array1, Array2, stack, Axis};
1010
use std::io;
@@ -167,6 +167,18 @@ pub fn r_mbams(
167167
println!("Define output file");
168168
}
169169

170+
let mut cfile: Option<BufWriter<File>> = None;
171+
if out_raw_counts != "None" {
172+
cfile = Some(BufWriter::new(File::create(out_raw_counts).unwrap()));
173+
// Write the header to the file.
174+
let mut headstr = String::new();
175+
headstr.push_str("#\'chr\'\t\'start\'\t\'end\'");
176+
for label in bamlabels.iter() {
177+
headstr.push_str(&format!("\t\'{}\'", label));
178+
}
179+
writeln!(cfile.as_mut().unwrap(), "{}", headstr).unwrap();
180+
}
181+
170182
// Collate the coverage files into a matrix.
171183
let its: Vec<_> = covcalcs.iter().map(|x| x.into_iter()).collect();
172184
let zips = TempZip { iterators: its };
@@ -176,10 +188,11 @@ pub fn r_mbams(
176188
let zips_vec: Vec<_> = zips.collect();
177189
let matvec: Array2<f32> = pool.install(|| {
178190
let matvecs: Vec<Array1<f32>> = zips_vec
179-
.par_iter()
191+
.iter()
180192
.flat_map(|c| {
181-
let readers: Vec<_> = c.iter().map(|x| BufReader::new(File::open(x).unwrap()).lines()).collect();
193+
let readers: Vec<_> = c.par_iter().map(|x| BufReader::new(File::open(x).unwrap()).lines()).collect();
182194
let mut _matvec: Vec<Array1<f32>> = Vec::new();
195+
let mut _regions: Vec<(String, u32, u32)> = Vec::new();
183196
for mut _l in (TempZip { iterators: readers }) {
184197
// unwrap all lines in _l
185198
let lines: Vec<_> = _l
@@ -189,6 +202,16 @@ pub fn r_mbams(
189202
.map(|x: Vec<&str> | ( x[0].to_string(), x[1].parse::<u32>().unwrap(), x[2].parse::<u32>().unwrap(), x[3].parse::<f32>().unwrap() ) )
190203
.collect();
191204
let arrline = Array1::from(lines.iter().map(|x| x.3).collect::<Vec<_>>());
205+
// If cfile is define, write the raw counts to file.
206+
if let Some(ref mut file) = cfile {
207+
let mut regstr = String::new();
208+
regstr.push_str(&format!("{}\t{}\t{}",lines[0].0, lines[0].1, lines[0].2));
209+
// push values from array
210+
for val in arrline.iter() {
211+
regstr.push_str(&format!("\t{}", val));
212+
}
213+
writeln!(file, "{}", regstr).unwrap();
214+
}
192215
_matvec.push(arrline);
193216
}
194217
_matvec
@@ -207,36 +230,14 @@ pub fn r_mbams(
207230
writeln!(sf_file, "{}\t{}", label, sf).unwrap();
208231
}
209232
}
210-
let mut npz = NpzWriter::new(File::create(ofile).unwrap());
233+
let mut npz = NpzWriter::new_compressed(File::create(ofile).unwrap());
211234
npz.add_array("matrix", &matvec).unwrap();
212235
npz.add_array("labels", &bamlabels_arr).unwrap();
213236
npz.finish().unwrap();
214237
if verbose {
215238
println!("Matrix written.");
216239
}
217240

218-
// If raw counts are required, write them to file.
219-
if out_raw_counts != "None" {
220-
let mut cfile = File::create(out_raw_counts).unwrap();
221-
// Create the header
222-
let mut headstr = String::new();
223-
headstr.push_str("#\'chr\'\t\'start\'\t\'end\'");
224-
for label in bamlabels.iter() {
225-
headstr.push_str(&format!("\t\'{}\'", label));
226-
}
227-
writeln!(cfile, "{}", headstr).unwrap();
228-
for (i, region) in regions.iter().enumerate() {
229-
let mut line = String::new();
230-
line.push_str(&format!("{}\t{}\t{}", region.chrom, region.get_startu(), region.get_endu()));
231-
for j in 0..bamlabels.len() {
232-
line.push_str(&format!("\t{}", matvec[[i, j]]));
233-
}
234-
writeln!(cfile, "{}", line).unwrap();
235-
}
236-
}
237-
if verbose {
238-
println!("Counts written.");
239-
}
240241
Ok(())
241242
}
242243

0 commit comments

Comments
 (0)