1
1
//! This module provides the functionality to scrape and gathers all the results from the upstream
2
2
//! search engines and then removes duplicate results.
3
3
4
- use super :: user_agent:: random_user_agent;
5
- use crate :: handler:: { file_path, FileType } ;
4
+ use crate :: handler:: { FileType , file_path} ;
6
5
use crate :: models:: {
7
6
aggregation:: { EngineErrorInfo , SearchResult , SearchResults } ,
8
7
engine:: { EngineError , EngineHandler } ,
9
8
} ;
10
9
use crate :: parser:: Config ;
11
10
12
11
use error_stack:: Report ;
13
- use futures:: stream:: FuturesUnordered ;
12
+ use rayon:: iter:: { IntoParallelRefIterator , ParallelIterator } ;
13
+ use rayon:: slice:: ParallelSliceMut ;
14
14
use regex:: Regex ;
15
15
use reqwest:: { Client , ClientBuilder } ;
16
16
use std:: sync:: Arc ;
17
+ use tokio:: task:: JoinSet ;
17
18
use tokio:: {
18
19
fs:: File ,
19
20
io:: { AsyncBufReadExt , BufReader } ,
20
- task:: JoinHandle ,
21
21
time:: Duration ,
22
22
} ;
23
23
24
24
/// A constant for holding the prebuilt Client globally in the app.
25
25
static CLIENT : std:: sync:: OnceLock < Client > = std:: sync:: OnceLock :: new ( ) ;
26
26
27
27
/// Aliases for long type annotations
28
- type FutureVec =
29
- FuturesUnordered < JoinHandle < Result < Vec < ( String , SearchResult ) > , Report < EngineError > > > > ;
28
+ type FutureVec = JoinSet < Result < Vec < ( String , SearchResult ) > , Report < EngineError > > > ;
30
29
31
30
/// The function aggregates the scraped results from the user-selected upstream search engines.
32
31
/// These engines can be chosen either from the user interface (UI) or from the configuration file.
@@ -72,6 +71,7 @@ pub async fn aggregate(
72
71
config : & Config ,
73
72
upstream_search_engines : & [ EngineHandler ] ,
74
73
safe_search : u8 ,
74
+ user_agent : & ' static str ,
75
75
) -> Result < SearchResults , Box < dyn std:: error:: Error > > {
76
76
let client = CLIENT . get_or_init ( || {
77
77
let mut cb = ClientBuilder :: new ( )
@@ -96,19 +96,17 @@ pub async fn aggregate(
96
96
cb. build ( ) . unwrap ( )
97
97
} ) ;
98
98
99
- let user_agent: & str = random_user_agent ( ) ;
100
-
101
99
let mut names: Vec < & str > = Vec :: with_capacity ( 0 ) ;
102
100
103
101
// create tasks for upstream result fetching
104
- let tasks: FutureVec = FutureVec :: new ( ) ;
102
+ let mut tasks: FutureVec = JoinSet :: new ( ) ;
105
103
106
104
let query: Arc < String > = Arc :: new ( query. to_string ( ) ) ;
107
105
for engine_handler in upstream_search_engines {
108
106
let ( name, search_engine) = engine_handler. clone ( ) . into_name_engine ( ) ;
109
107
names. push ( name) ;
110
108
let query_partially_cloned = query. clone ( ) ;
111
- tasks. push ( tokio :: spawn ( async move {
109
+ tasks. spawn ( async move {
112
110
search_engine
113
111
. results (
114
112
& query_partially_cloned,
@@ -118,16 +116,7 @@ pub async fn aggregate(
118
116
safe_search,
119
117
)
120
118
. await
121
- } ) ) ;
122
- }
123
-
124
- // get upstream responses
125
- let mut responses = Vec :: with_capacity ( tasks. len ( ) ) ;
126
-
127
- for task in tasks {
128
- if let Ok ( result) = task. await {
129
- responses. push ( result)
130
- }
119
+ } ) ;
131
120
}
132
121
133
122
// aggregate search results, removing duplicates and handling errors the upstream engines returned
@@ -142,60 +131,58 @@ pub async fn aggregate(
142
131
) ) ;
143
132
} ;
144
133
145
- for _ in 0 ..responses. len ( ) {
146
- let response = responses. pop ( ) . unwrap ( ) ;
134
+ while let Some ( Ok ( response) ) = tasks. join_next ( ) . await {
147
135
let engine = names. pop ( ) . unwrap ( ) ;
148
136
149
- if result_map. is_empty ( ) {
150
- match response {
151
- Ok ( results) => result_map = results,
152
- Err ( error) => handle_error ( & error, engine) ,
153
- } ;
154
- continue ;
155
- }
156
-
157
- match response {
158
- Ok ( result) => {
159
- result. into_iter ( ) . for_each ( |( key, value) | {
160
- match result_map. iter ( ) . find ( |( key_s, _) | key_s == & key) {
161
- Some ( value) => value. 1 . to_owned ( ) . add_engines ( engine) ,
162
- None => result_map. push ( ( key, value) ) ,
163
- } ;
164
- } ) ;
137
+ if let Ok ( result) = response {
138
+ for ( key, value) in result {
139
+ if let Some ( value) = result_map. iter ( ) . find ( |( key_s, _) | key_s == & key) {
140
+ value. 1 . to_owned ( ) . add_engines ( engine)
141
+ } else {
142
+ result_map. push ( ( key, value) )
143
+ }
165
144
}
166
- Err ( error) => handle_error ( & error, engine) ,
167
- } ;
145
+ } else if let Err ( error) = response {
146
+ handle_error ( & error, engine)
147
+ }
168
148
}
169
149
170
150
if safe_search >= 3 {
171
151
let mut blacklist_map: Vec < ( String , SearchResult ) > = Vec :: new ( ) ;
172
152
filter_with_lists (
173
153
& mut result_map,
174
154
& mut blacklist_map,
175
- file_path ( FileType :: BlockList ) ?,
155
+ & file_path ( FileType :: BlockList ) . await ?,
176
156
)
177
157
. await ?;
178
158
179
159
filter_with_lists (
180
160
& mut blacklist_map,
181
161
& mut result_map,
182
- file_path ( FileType :: AllowList ) ?,
162
+ & file_path ( FileType :: AllowList ) . await ?,
183
163
)
184
164
. await ?;
185
165
186
166
drop ( blacklist_map) ;
187
167
}
188
168
189
- let mut results: Box < [ SearchResult ] > = result_map
190
- . into_iter ( )
191
- . map ( |( _, mut value) | {
192
- if !value. url . contains ( "temu.com" ) {
193
- value. calculate_relevance ( query. as_str ( ) )
194
- }
195
- value
196
- } )
197
- . collect ( ) ;
198
- sort_search_results ( & mut results) ;
169
+ let results: Box < [ SearchResult ] > = tokio:: task:: spawn_blocking ( move || {
170
+ let mut unsorted_results: Box < [ SearchResult ] > = result_map
171
+ . par_iter ( )
172
+ . cloned ( )
173
+ . map ( |( _, mut value) | {
174
+ if !value. url . contains ( "temu.com" ) {
175
+ value. calculate_relevance ( query. as_str ( ) )
176
+ }
177
+ value
178
+ } )
179
+ . collect ( ) ;
180
+
181
+ sort_search_results ( & mut unsorted_results) ;
182
+
183
+ unsorted_results
184
+ } )
185
+ . await ?;
199
186
200
187
Ok ( SearchResults :: new (
201
188
results,
@@ -231,17 +218,16 @@ pub async fn filter_with_lists(
231
218
while idx < length {
232
219
let ele = & map_to_be_filtered[ idx] ;
233
220
let ele_inner = & ele. 1 ;
234
- match re. is_match ( & ele. 0 . to_lowercase ( ) )
221
+ // If the search result matches the regex pattern, move it from the original map to the resultant map
222
+ if re. is_match ( & ele. 0 . to_lowercase ( ) )
235
223
|| re. is_match ( & ele_inner. title . to_lowercase ( ) )
236
224
|| re. is_match ( & ele_inner. description . to_lowercase ( ) )
237
225
{
238
- true => {
239
- // If the search result matches the regex pattern, move it from the original map to the resultant map
240
- resultant_map. push ( map_to_be_filtered. swap_remove ( idx) ) ;
241
- length -= 1 ;
242
- }
243
- false => idx += 1 ,
244
- } ;
226
+ resultant_map. push ( map_to_be_filtered. swap_remove ( idx) ) ;
227
+ length -= 1 ;
228
+ } else {
229
+ idx += 1 ;
230
+ }
245
231
}
246
232
}
247
233
@@ -251,11 +237,12 @@ pub async fn filter_with_lists(
251
237
/// Sorts SearchResults by relevance score.
252
238
/// <br> sort_unstable is used as its faster,stability is not an issue on our side.
253
239
/// For reasons why, check out [`this`](https://rust-lang.github.io/rfcs/1884-unstable-sort.html)
240
+ ///
254
241
/// # Arguments
242
+ ///
255
243
/// * `results` - A mutable slice or Vec of SearchResults
256
- ///
257
244
fn sort_search_results ( results : & mut [ SearchResult ] ) {
258
- results. sort_unstable_by ( |a, b| {
245
+ results. par_sort_unstable_by ( |a, b| {
259
246
use std:: cmp:: Ordering ;
260
247
261
248
b. relevance_score
@@ -311,12 +298,16 @@ mod tests {
311
298
. await ?;
312
299
313
300
assert_eq ! ( resultant_map. len( ) , 2 ) ;
314
- assert ! ( resultant_map
315
- . iter( )
316
- . any( |( key, _) | key == "https://www.example.com" ) ) ;
317
- assert ! ( resultant_map
318
- . iter( )
319
- . any( |( key, _) | key == "https://www.rust-lang.org/" ) ) ;
301
+ assert ! (
302
+ resultant_map
303
+ . iter( )
304
+ . any( |( key, _) | key == "https://www.example.com" )
305
+ ) ;
306
+ assert ! (
307
+ resultant_map
308
+ . iter( )
309
+ . any( |( key, _) | key == "https://www.rust-lang.org/" )
310
+ ) ;
320
311
assert_eq ! ( map_to_be_filtered. len( ) , 0 ) ;
321
312
322
313
Ok ( ( ) )
@@ -362,13 +353,17 @@ mod tests {
362
353
. await ?;
363
354
364
355
assert_eq ! ( resultant_map. len( ) , 1 ) ;
365
- assert ! ( resultant_map
366
- . iter( )
367
- . any( |( key, _) | key == "https://www.example.com" ) ) ;
356
+ assert ! (
357
+ resultant_map
358
+ . iter( )
359
+ . any( |( key, _) | key == "https://www.example.com" )
360
+ ) ;
368
361
assert_eq ! ( map_to_be_filtered. len( ) , 1 ) ;
369
- assert ! ( map_to_be_filtered
370
- . iter( )
371
- . any( |( key, _) | key == "https://www.rust-lang.org/" ) ) ;
362
+ assert ! (
363
+ map_to_be_filtered
364
+ . iter( )
365
+ . any( |( key, _) | key == "https://www.rust-lang.org/" )
366
+ ) ;
372
367
373
368
Ok ( ( ) )
374
369
}
0 commit comments