Skip to content

Commit 73e92c3

Browse files
committed
rewrite cmd subfq
1 parent 2e4d99d commit 73e92c3

5 files changed

Lines changed: 100 additions & 133 deletions

File tree

src/cli/stats.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,6 @@ impl ParallelProcessor for Info {
236236
}
237237

238238
fn on_batch_complete(&mut self) -> Result<(), ProcessError> {
239-
240239
*self.total_num_a.lock() += self.num_a;
241240
*self.total_num_t.lock() += self.num_t;
242241
*self.total_num_g.lock() += self.num_g;

src/cli/subfq.rs

Lines changed: 65 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -5,139 +5,101 @@ use paraseq::{fastq, fastx::Record};
55
use rand::{Rng, prelude::*};
66
use rand_pcg::Pcg64;
77

8-
// reduce much memory but cost more time
9-
fn select_fastq(
8+
pub fn sample_fastq(
109
file: Option<&String>,
1110
n: usize,
1211
seed: u64,
12+
two_pass: bool,
1313
out: Option<&String>,
1414
compression_level: u32,
1515
stdout_type: char,
1616
) -> Result<(), FqkitError> {
17-
let mut rng: rand_pcg::Lcg128Xsl64 = Pcg64::seed_from_u64(seed);
18-
let mut get: Vec<usize> = Vec::with_capacity(n);
19-
2017
if n == 0 {
2118
error!("n must be greater than 0");
2219
std::process::exit(1);
2320
}
21+
info!("subseq number: {}", n);
2422

2523
let mut fq_reader = fastq::Reader::new(file_reader(file)?);
26-
info!("rand seed: {}", seed);
27-
info!("subseq number: {}", n);
28-
info!("reduce much memory but cost more time");
2924
let mut rset = fastq::RecordSet::default();
25+
info!("rand seed: {}", seed);
26+
let mut fq_writer = file_writer(out, compression_level, stdout_type)?;
27+
28+
let mut rng: rand_pcg::Lcg128Xsl64 = Pcg64::seed_from_u64(seed);
3029
let mut order: usize = 0;
31-
while rset.fill(&mut fq_reader)? {
32-
for _ in rset.iter().map_while(Result::ok) {
33-
if order < n {
34-
get.push(order);
35-
} else {
36-
let ret = rng.random_range(0..=order);
37-
if ret < n {
38-
get[ret] = order;
30+
if two_pass {
31+
info!("two pass mode enabled");
32+
let mut get: Vec<usize> = Vec::with_capacity(n);
33+
while rset.fill(&mut fq_reader)? {
34+
for _ in rset.iter().map_while(Result::ok) {
35+
if order < n {
36+
get.push(order);
37+
} else {
38+
let ret = rng.random_range(0..=order);
39+
if ret < n {
40+
get[ret] = order;
41+
}
3942
}
43+
order += 1;
4044
}
41-
order += 1;
4245
}
43-
}
4446

45-
let mut fq_writer = file_writer(out, compression_level, stdout_type)?;
46-
let mut fq_reader2 = fastq::Reader::new(file_reader(file)?);
47-
let mut rset2 = fastq::RecordSet::default();
48-
let mut order2: usize = 0;
49-
50-
while rset2.fill(&mut fq_reader2)? {
51-
for rec in rset2.iter().map_while(Result::ok) {
52-
if get.contains(&order2) {
53-
write_record(&mut fq_writer, rec.id(), rec.seq(), rec.qual())?;
47+
let mut fq_reader2 = fastq::Reader::new(file_reader(file)?);
48+
let mut rset2 = fastq::RecordSet::default();
49+
order = 0;
50+
get.sort_unstable(); // keep the order
51+
let mut idx = 0usize;
52+
info!("all records has been readed into memory, start write to output ...");
53+
while rset2.fill(&mut fq_reader2)? {
54+
for rec in rset2.iter().map_while(Result::ok) {
55+
if idx < get.len() && order == get[idx] {
56+
write_record(&mut fq_writer, rec.id(), rec.seq(), rec.qual())?;
57+
idx += 1;
58+
}
59+
if idx >= get.len() {
60+
break;
61+
}
62+
order += 1;
5463
}
55-
order2 += 1;
5664
}
57-
}
58-
fq_writer.flush()?;
59-
60-
Ok(())
61-
}
62-
63-
// fast mode but cost more memory
64-
fn select_fastq2(
65-
file: Option<&String>,
66-
n: usize,
67-
seed: u64,
68-
out: Option<&String>,
69-
compression_level: u32,
70-
stdout_type: char,
71-
) -> Result<(), FqkitError> {
72-
info!("rand seed: {}", seed);
73-
info!("subseq num: {}", n);
74-
info!("fast mode but cost more memory");
75-
76-
let mut rng = Pcg64::seed_from_u64(seed);
77-
let mut get = Vec::with_capacity(n);
78-
if n == 0 {
79-
error!("n must be greater than 0");
80-
std::process::exit(1);
81-
}
82-
83-
let mut fq_reader = fastq::Reader::new(file_reader(file)?);
84-
let mut rset = fastq::RecordSet::default();
85-
let mut order: usize = 0;
86-
while rset.fill(&mut fq_reader)? {
87-
for rec in rset.iter().map_while(Result::ok) {
88-
if order < n {
89-
let rec_t = vec![
90-
rec.id_str().to_owned(),
91-
rec.seq_str().to_owned(),
92-
rec.qual_str().to_owned(),
93-
];
94-
get.push(rec_t);
95-
} else {
96-
let ret = rng.random_range(0..=order);
97-
if ret < n {
98-
get[ret] = vec![
65+
} else {
66+
let mut get = Vec::with_capacity(n);
67+
while rset.fill(&mut fq_reader)? {
68+
for rec in rset.iter().map_while(Result::ok) {
69+
if order < n {
70+
get.push((
71+
order,
9972
rec.id_str().to_owned(),
10073
rec.seq_str().to_owned(),
10174
rec.qual_str().to_owned(),
102-
];
75+
));
76+
} else {
77+
let ret = rng.random_range(0..=order);
78+
if ret < n {
79+
get[ret] = (
80+
order,
81+
rec.id_str().to_owned(),
82+
rec.seq_str().to_owned(),
83+
rec.qual_str().to_owned(),
84+
);
85+
}
10386
}
87+
order += 1;
10488
}
105-
order += 1;
89+
}
90+
info!("all records has been readed into memory, start write to output ...");
91+
get.sort_unstable_by_key(|x| x.0); // sort by order to keep the raw order
92+
for (_, id, seq, qual) in get {
93+
write_record(
94+
&mut fq_writer,
95+
id.as_bytes(),
96+
seq.as_bytes(),
97+
qual.as_bytes(),
98+
)?;
10699
}
107100
}
108101

109-
let mut fq_writer = file_writer(out, compression_level, stdout_type)?;
110-
for rec in get.iter() {
111-
write_record(
112-
&mut fq_writer,
113-
rec[0].as_bytes(),
114-
rec[1].as_bytes(),
115-
rec[2].as_bytes(),
116-
)?
117-
}
118102
fq_writer.flush()?;
119103

120104
Ok(())
121105
}
122-
123-
pub fn subset_fastq(
124-
rdc: bool,
125-
file: Option<&String>,
126-
n: usize,
127-
seed: u64,
128-
out: Option<&String>,
129-
compression_level: u32,
130-
stdout_type: char,
131-
) -> Result<(), FqkitError> {
132-
if rdc {
133-
if file.is_none() {
134-
error!("opt -r used, fastq data can't from stdin.");
135-
std::process::exit(1);
136-
}
137-
select_fastq(file, n, seed, out, compression_level, stdout_type)?;
138-
} else {
139-
select_fastq2(file, n, seed, out, compression_level, stdout_type)?;
140-
}
141-
142-
Ok(())
143-
}

src/command.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -139,9 +139,9 @@ pub enum Subcli {
139139
/// subseq number
140140
#[arg(short = 'n', long = "num", value_name = "INT")]
141141
num: usize,
142-
/// read files twice to reduce much memory but cost more time
143-
#[arg(short = 'r', long = "rdc", help_heading = Some("FLAGS"))]
144-
rdc: bool,
142+
/// if specified, use two-pass mode to reduce memory usage
143+
#[arg(short = '2', long = "two-pass", help_heading = Some("FLAGS"))]
144+
two_pass: bool,
145145
/// fastq output file name or write to stdout, files ending in .gz/.bz2/.xz will be compressed automatically
146146
#[arg(short = 'o', long = "out", value_name = "FILE")]
147147
out: Option<String>,

src/main.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,14 +62,14 @@ fn run_main() -> Result<(), Error> {
6262
input,
6363
seed,
6464
num,
65-
rdc,
65+
two_pass,
6666
out,
6767
} => {
68-
subset_fastq(
69-
rdc,
68+
sample_fastq(
7069
input.as_ref(),
7170
num,
7271
seed,
72+
two_pass,
7373
out.as_ref(),
7474
arg.compression_level,
7575
arg.stdout_type,

src/utils.rs

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -166,14 +166,13 @@ where
166166
2 => 3,
167167
3 => 7,
168168
4 => 11,
169-
_ => 3
169+
_ => 3,
170170
};
171171
Ok(Box::new(BufWriter::with_capacity(
172172
BUFF_SIZE,
173173
zstd::stream::write::Encoder::new(fp, level)?,
174174
)))
175-
}
176-
else {
175+
} else {
177176
Ok(Box::new(BufWriter::with_capacity(BUFF_SIZE, fp)))
178177
}
179178
} else if stdout_format == 'g' {
@@ -196,15 +195,18 @@ where
196195
)))
197196
} else if stdout_format == 'z' {
198197
Ok(Box::new(BufWriter::with_capacity(
199-
BUFF_SIZE,
200-
zstd::stream::write::Encoder::new(io::stdout(), match compression_level {
201-
1 => 1,
202-
2 => 3,
203-
3 => 7,
204-
4 => 11,
205-
_ => 3
206-
})?,
207-
)))
198+
BUFF_SIZE,
199+
zstd::stream::write::Encoder::new(
200+
io::stdout(),
201+
match compression_level {
202+
1 => 1,
203+
2 => 3,
204+
3 => 7,
205+
4 => 11,
206+
_ => 3,
207+
},
208+
)?,
209+
)))
208210
} else if stdout_format == 'u' {
209211
Ok(Box::new(BufWriter::new(io::stdout())))
210212
} else {
@@ -245,18 +247,22 @@ where
245247
BUFF_SIZE,
246248
xz2::write::XzEncoder::new(fp, compression_level),
247249
)))
248-
} else if file_out.as_ref().extension().is_some_and(|ext| ext == "zst") {
250+
} else if file_out
251+
.as_ref()
252+
.extension()
253+
.is_some_and(|ext| ext == "zst")
254+
{
249255
let level = match compression_level {
250-
1 => 1,
251-
2 => 3,
252-
3 => 7,
253-
4 => 11,
254-
_ => 3
255-
};
256-
Ok(Box::new(BufWriter::with_capacity(
257-
BUFF_SIZE,
258-
zstd::stream::write::Encoder::new(fp, level)?,
259-
)))
256+
1 => 1,
257+
2 => 3,
258+
3 => 7,
259+
4 => 11,
260+
_ => 3,
261+
};
262+
Ok(Box::new(BufWriter::with_capacity(
263+
BUFF_SIZE,
264+
zstd::stream::write::Encoder::new(fp, level)?,
265+
)))
260266
} else {
261267
Ok(Box::new(BufWriter::with_capacity(BUFF_SIZE, fp)))
262268
}

0 commit comments

Comments
 (0)