1+ use crate :: utils:: * ;
2+ use bio:: io:: fastq;
3+ use anyhow:: { Result , Ok } ;
4+ use crossbeam:: channel:: unbounded;
5+ use log:: * ;
6+ use std:: time:: Instant ;
7+
8+
9+ pub fn filter_fastq (
10+ read1 : & str ,
11+ read2 : & str ,
12+ nbase : usize ,
13+ length : usize ,
14+ complexity : u32 ,
15+ average_qual : u8 ,
16+ phred : u8 ,
17+ chunks : usize ,
18+ ncpu : usize ,
19+ failed : & str ,
20+ out1 : & str ,
21+ out2 : & str ,
22+ compression_level : u32 ,
23+ quiet : bool ,
24+ ) -> Result < ( ) > {
25+ let start = Instant :: now ( ) ;
26+ if !quiet {
27+ info ! ( "read forward reads from file: {}" , read1) ;
28+ info ! ( "read reverse reads from file: {}" , read2) ;
29+ if ![ 33u8 , 64u8 ] . contains ( & phred) {
30+ error ! ( "invalid phred value" ) ;
31+ std:: process:: exit ( 1 ) ;
32+ }
33+ if ncpu <= 1 {
34+ info ! ( "thread num is: {}" , ncpu) ;
35+ } else {
36+ info ! ( "additional thread num is: {}" , ncpu) ;
37+ }
38+ info ! ( "output clean read1 file: {}" , out1) ;
39+ info ! ( "output clean read2 file: {}" , out2) ;
40+ }
41+
42+ let fq_reader1 = file_reader ( & Some ( read1) ) . map ( fastq:: Reader :: new) ?;
43+ let fq_reader2 = file_reader ( & Some ( read2) ) . map ( fastq:: Reader :: new) ?;
44+ let mut out_writer1 = file_writer ( & Some ( out1) , compression_level) . map ( fastq:: Writer :: new) ?;
45+ let mut out_writer2 = file_writer ( & Some ( out2) , compression_level) . map ( fastq:: Writer :: new) ?;
46+ let mut failed_writer = file_writer ( & Some ( failed) , compression_level) . map ( fastq:: Writer :: new) ?;
47+ let complex = complexity as usize ;
48+ let ( mut pe_ok, mut pe_fail) = ( 0usize , 0usize ) ;
49+ if ncpu <= 1 {
50+ for ( rec1, rec2) in fq_reader1. records ( ) . flatten ( ) . zip ( fq_reader2. records ( ) . flatten ( ) ) {
51+ if rec1. seq ( ) . iter ( ) . filter ( |v| v == & & b'N' ) . count ( ) > nbase || rec2. seq ( ) . iter ( ) . filter ( |v| v == & & b'N' ) . count ( ) > nbase {
52+ failed_writer. write_record ( & rec1) ?;
53+ failed_writer. write_record ( & rec2) ?;
54+ pe_fail += 1 ;
55+ continue ;
56+ }
57+ if rec1. seq ( ) . len ( ) < length || rec2. seq ( ) . len ( ) < length {
58+ failed_writer. write_record ( & rec1) ?;
59+ failed_writer. write_record ( & rec2) ?;
60+ pe_fail += 1 ;
61+ continue ;
62+ }
63+ let complx1 = ( rec1. seq ( ) . iter ( ) . skip ( 1 )
64+ . zip ( rec1. seq ( ) . iter ( ) )
65+ . filter ( |( q1, q2) | { q1 != q2} )
66+ . count ( ) as f64 / rec1. seq ( ) . len ( ) as f64 * 100.0 ) as usize ;
67+ let complx2 = ( rec2. seq ( ) . iter ( ) . skip ( 1 )
68+ . zip ( rec1. seq ( ) . iter ( ) )
69+ . filter ( |( q1, q2) | { q1 != q2} )
70+ . count ( ) as f64 / rec2. seq ( ) . len ( ) as f64 * 100.0 ) as usize ;
71+ if complx1 < complex || complx2 < complex {
72+ failed_writer. write_record ( & rec1) ?;
73+ failed_writer. write_record ( & rec2) ?;
74+ pe_fail += 1 ;
75+ continue ;
76+ }
77+ if phred_mean ( rec1. qual ( ) , phred) < average_qual || phred_mean ( rec2. qual ( ) , phred) < average_qual {
78+ failed_writer. write_record ( & rec1) ?;
79+ failed_writer. write_record ( & rec2) ?;
80+ pe_fail += 1 ;
81+ continue ;
82+ }
83+ pe_ok += 1 ;
84+ out_writer1. write_record ( & rec1) ?;
85+ out_writer2. write_record ( & rec2) ?;
86+ }
87+ } else {
88+ let mut chunk = chunks;
89+ if chunk == 0 {
90+ warn ! ( "pe read conut in chunk can't be: {}, changed to default value." , chunk) ;
91+ chunk = 5000 ;
92+ }
93+
94+ let ( tx, rx) = unbounded ( ) ;
95+ let mut fq_iter1 = fq_reader1. records ( ) ;
96+ let mut fq_iter2 = fq_reader2. records ( ) ;
97+ loop {
98+ let pe_vec: Vec < _ > = fq_iter1. by_ref ( ) . take ( chunk) . flatten ( ) . zip ( fq_iter2. by_ref ( ) . take ( chunk) . flatten ( ) ) . collect ( ) ;
99+ if pe_vec. is_empty ( ) {
100+ break ;
101+ }
102+ tx. send ( pe_vec) . unwrap ( ) ;
103+ }
104+ drop ( tx) ;
105+
106+ crossbeam:: scope ( |s| {
107+ let ( tx2, rx2) = unbounded ( ) ;
108+ let _handles: Vec < _ > = ( 0 ..ncpu) . map ( |_| {
109+ let tx_tmp = tx2. clone ( ) ;
110+ let rx_tmp = rx. clone ( ) ;
111+ s. spawn ( move |_| {
112+ for vec_pe in rx_tmp. iter ( ) {
113+ let mut passed = vec ! [ ] ;
114+ let mut failed = vec ! [ ] ;
115+ for ( rec1, rec2) in vec_pe {
116+ if rec1. seq ( ) . iter ( ) . filter ( |v| v == & & b'N' ) . count ( ) > nbase || rec2. seq ( ) . iter ( ) . filter ( |v| v == & & b'N' ) . count ( ) > nbase {
117+ failed. push ( ( rec1, rec2) ) ;
118+ continue ;
119+ }
120+ if rec1. seq ( ) . len ( ) < length || rec2. seq ( ) . len ( ) < length {
121+ failed. push ( ( rec1, rec2) ) ;
122+ continue ;
123+ }
124+ let complx1 = ( rec1. seq ( ) . iter ( ) . skip ( 1 )
125+ . zip ( rec1. seq ( ) . iter ( ) )
126+ . filter ( |( q1, q2) | { q1 != q2} )
127+ . count ( ) as f64 / rec1. seq ( ) . len ( ) as f64 * 100.0 ) as usize ;
128+ let complx2 = ( rec2. seq ( ) . iter ( ) . skip ( 1 )
129+ . zip ( rec1. seq ( ) . iter ( ) )
130+ . filter ( |( q1, q2) | { q1 != q2} )
131+ . count ( ) as f64 / rec2. seq ( ) . len ( ) as f64 * 100.0 ) as usize ;
132+ if complx1 < complex || complx2 < complex {
133+ failed. push ( ( rec1, rec2) ) ;
134+ continue ;
135+ }
136+ if phred_mean ( rec1. qual ( ) , phred) < average_qual || phred_mean ( rec2. qual ( ) , phred) < average_qual {
137+ failed. push ( ( rec1, rec2) ) ;
138+ continue ;
139+ }
140+ passed. push ( ( rec1, rec2) ) ;
141+ }
142+ tx_tmp. send ( ( passed, failed) ) . unwrap ( ) ;
143+ }
144+ } ) ;
145+ } ) . collect ( ) ;
146+ drop ( tx2) ;
147+
148+ for ( vec_pass, vec_failed) in rx2. iter ( ) {
149+ for ( rec1, rec2) in vec_pass {
150+ pe_ok += 1 ;
151+ out_writer1. write_record ( & rec1) . unwrap ( ) ;
152+ out_writer2. write_record ( & rec2) . unwrap ( ) ;
153+ }
154+ for ( rec1, rec2) in vec_failed. iter ( ) {
155+ pe_fail += 1 ;
156+ failed_writer. write_record ( & rec1) . unwrap ( ) ;
157+ failed_writer. write_record ( & rec2) . unwrap ( ) ;
158+ }
159+ }
160+ } ) . unwrap ( ) ;
161+ }
162+ out_writer1. flush ( ) ?;
163+ out_writer2. flush ( ) ?;
164+ failed_writer. flush ( ) ?;
165+
166+ if !quiet {
167+ info ! ( "total clean pe reads number: {}" , pe_ok) ;
168+ info ! ( "total failed pe reads number: {}" , pe_fail) ;
169+ info ! ( "time elapsed is: {:?}" , start. elapsed( ) ) ;
170+ }
171+ Ok ( ( ) )
172+ }
173+
174+
175+ fn phred_mean (
176+ qual : & [ u8 ] ,
177+ phred : u8 ,
178+ ) -> u8 {
179+ let ave_error = qual
180+ . iter ( )
181+ . map ( |x| { 10.0f64 . powf ( ( x -phred) as f64 / 10.0 ) } )
182+ . sum :: < f64 > ( ) / qual. len ( ) as f64 ;
183+
184+ ( ave_error. log10 ( ) * -10.0f64 ) as u8 - phred
185+ }
0 commit comments