11use crate :: cli:: Args ;
22use crate :: curl_parser:: { parse_curl_command, parse_curl_file, CurlCommand } ;
33use crate :: http_client:: { ClientState , ConnectionPool } ;
4- use crate :: stats:: { create_shared_stats, RequestResult , SharedStats } ;
4+ use crate :: stats:: { create_shared_stats, RequestResult , SharedStats , Statistics } ;
55use crate :: template:: TemplateEngine ;
66use anyhow:: Result ;
77use rand:: Rng ;
88use std:: sync:: Arc ;
99use std:: time:: { Duration , Instant } ;
10- use tokio:: time:: sleep;
1110
1211pub async fn run_benchmark ( args : Args ) -> Result < ( ) > {
1312 // Parse curl commands if provided
@@ -45,8 +44,8 @@ pub async fn run_benchmark(args: Args) -> Result<()> {
4544 // Create shared statistics
4645 let stats = create_shared_stats ( ) ;
4746
48- // Run the benchmark(使用 block_in_place 因为 run_workers 现在是同步的 )
49- tokio:: task:: block_in_place ( || {
47+ // Run the benchmark(使用 kanal 通道收集统计 )
48+ let final_stats = tokio:: task:: block_in_place ( || {
5049 run_workers (
5150 commands,
5251 args. connections ,
@@ -55,21 +54,13 @@ pub async fn run_benchmark(args: Args) -> Result<()> {
5554 args. rate ,
5655 args. parse_timeout ( ) ?,
5756 & args. load_strategy ,
58- stats. clone ( ) ,
5957 template_engine,
6058 args. http2 ,
6159 )
6260 } ) ?;
6361
64- // Finish statistics collection
65- {
66- let mut stats_lock = stats. lock ( ) . unwrap ( ) ;
67- stats_lock. finish ( ) ;
68- }
69-
7062 // Print results
71- let stats_lock = stats. lock ( ) . unwrap ( ) ;
72- stats_lock. print_summary ( args. latency ) ;
63+ final_stats. print_summary ( args. latency ) ;
7364
7465 Ok ( ( ) )
7566}
@@ -103,10 +94,9 @@ fn run_workers(
10394 rate : u32 ,
10495 timeout : Duration ,
10596 load_strategy : & str ,
106- stats : SharedStats ,
10797 template_engine : Arc < TemplateEngine > ,
10898 enable_http2 : bool ,
109- ) -> Result < ( ) > {
99+ ) -> Result < Statistics > {
110100 let commands = Arc :: new ( commands) ;
111101 let load_strategy = load_strategy. to_string ( ) ;
112102 let end_time = Instant :: now ( ) + duration;
@@ -130,11 +120,14 @@ fn run_workers(
130120 . expect ( "Failed to create connection pool" )
131121 ) ;
132122
123+ // 创建 kanal 通道收集统计数据(关键优化:避免 Mutex)
124+ let ( tx, rx) = kanal:: unbounded ( ) ;
125+
133126 // 使用 LocalSet 架构:每个物理线程独立运行
134127 let handles: Vec < _ > = ( 0 ..actual_threads)
135128 . map ( |_| {
136129 let commands = commands. clone ( ) ;
137- let stats = stats . clone ( ) ;
130+ let tx = tx . clone ( ) ;
138131 let load_strategy = load_strategy. clone ( ) ;
139132 let template_engine = template_engine. clone ( ) ;
140133 let pool = pool. clone ( ) ;
@@ -151,7 +144,7 @@ fn run_workers(
151144 // 在 LocalSet 中创建多个任务(每个线程处理多个连接)
152145 for _ in 0 ..connections_per_thread {
153146 let commands = commands. clone ( ) ;
154- let stats = stats . clone ( ) ;
147+ let tx = tx . clone ( ) ;
155148 let load_strategy = load_strategy. clone ( ) ;
156149 let template_engine = template_engine. clone ( ) ;
157150 let client = pool. get_client ( ) . clone ( ) ;
@@ -174,7 +167,7 @@ fn run_workers(
174167 }
175168 } ;
176169
177- // Apply template processing
170+ // Apply template processing (优化:减少字符串分配)
178171 let url = template_engine. process ( & cmd. url ) ;
179172 let body = cmd. body . as_ref ( ) . map ( |b| template_engine. process ( b) ) ;
180173
@@ -183,7 +176,7 @@ fn run_workers(
183176 let result = client. request ( & mut client_state, & cmd. method , & url, & cmd. headers , body. as_deref ( ) ) . await ;
184177 let duration = start. elapsed ( ) ;
185178
186- // Record result
179+ // Record result(通过 kanal 通道发送,无锁)
187180 let request_result = RequestResult {
188181 duration,
189182 status_code : result. as_ref ( ) . ok ( ) . and_then ( |r| Some ( r. 0 ) ) ,
@@ -192,7 +185,7 @@ fn run_workers(
192185 endpoint : if commands. len ( ) > 1 { Some ( cmd. url . clone ( ) ) } else { None } ,
193186 } ;
194187
195- stats . lock ( ) . unwrap ( ) . record ( request_result) ;
188+ let _ = tx . send ( request_result) ;
196189 request_count += 1 ;
197190
198191 // Rate limiting
@@ -210,12 +203,28 @@ fn run_workers(
210203 } )
211204 . collect ( ) ;
212205
213- // 等待所有线程完成
206+ // 关闭发送端
207+ drop ( tx) ;
208+
209+ // 在后台线程收集统计数据
210+ let collector_handle = std:: thread:: spawn ( move || {
211+ let mut stats = Statistics :: new ( ) ;
212+ while let Ok ( result) = rx. recv ( ) {
213+ stats. record ( result) ;
214+ }
215+ stats. finish ( ) ;
216+ stats
217+ } ) ;
218+
219+ // 等待所有工作线程完成
214220 for handle in handles {
215221 let _ = handle. join ( ) ;
216222 }
217223
218- Ok ( ( ) )
224+ // 等待统计收集完成
225+ let final_stats = collector_handle. join ( ) . unwrap ( ) ;
226+
227+ Ok ( final_stats)
219228}
220229
221230// make_request 函数已被移除,现在直接使用 HttpClient::request 方法
0 commit comments