@@ -23,7 +23,11 @@ use std::collections::HashMap;
2323use std:: hash:: BuildHasherDefault ;
2424use parking_lot:: Mutex ;
2525use std:: sync:: atomic:: { AtomicBool , AtomicUsize , Ordering } ;
26- use std:: { str:: FromStr , sync:: Arc , time:: Instant } ;
26+ use std:: {
27+ str:: FromStr ,
28+ sync:: Arc ,
29+ time:: { Duration , Instant } ,
30+ } ;
2731use tokio:: sync:: Notify ;
2832
2933use fnv:: FnvHasher ;
@@ -402,14 +406,31 @@ impl ResultCollector {
402406 timeout_secs : u64 ,
403407 ) -> Option < ( bool , Vec < Signature > , Option < anyhow:: Error > , Vec < ( SwqosType , i64 ) > ) > {
404408 let start = Instant :: now ( ) ;
405- let timeout = std :: time :: Duration :: from_secs ( timeout_secs) ;
406- let poll_interval = std :: time :: Duration :: from_millis ( 2 ) ;
409+ let primary = Duration :: from_secs ( timeout_secs) ;
410+ let poll_interval = Duration :: from_millis ( 2 ) ;
407411 while self . completed_count . load ( Ordering :: Acquire ) < self . total_tasks {
408- if start. elapsed ( ) > timeout {
412+ if start. elapsed ( ) > primary {
409413 break ;
410414 }
411415 tokio:: time:: sleep ( poll_interval) . await ;
412416 }
417+ // 「不等待链上确认」仍会等各 SWQOS 的 HTTP 回包;主循环在收齐或触达 `timeout_secs` 后结束。
418+ // 若主窗口到时仍有未回包通道,晚到的 TaskResult 若立刻 drain 会丢签名——仅在该路径上拉长 grace。
419+ let all_submitted =
420+ self . completed_count . load ( Ordering :: Acquire ) >= self . total_tasks ;
421+ if all_submitted {
422+ // 全数已登记:仅留极短 settle,避免极端情况下最后一笔与计数可见性竞态。
423+ tokio:: time:: sleep ( Duration :: from_millis ( 35 ) ) . await ;
424+ } else {
425+ tokio:: time:: sleep ( Duration :: from_millis ( 600 ) ) . await ;
426+ while self . completed_count . load ( Ordering :: Acquire ) < self . total_tasks {
427+ if start. elapsed ( ) > primary + Duration :: from_secs ( 6 ) {
428+ break ;
429+ }
430+ tokio:: time:: sleep ( Duration :: from_millis ( 20 ) ) . await ;
431+ }
432+ tokio:: time:: sleep ( Duration :: from_millis ( 120 ) ) . await ;
433+ }
413434 self . get_first ( )
414435 }
415436}
@@ -492,7 +513,11 @@ pub async fn execute_parallel(
492513 }
493514
494515 // Task preparation completed: one shared context (clone once per batch), then minimal per-task data.
495- let collector = Arc :: new ( ResultCollector :: new ( task_configs. len ( ) ) ) ;
516+ let channel_count = task_configs. len ( ) . max ( 1 ) ;
517+ let collector = Arc :: new ( ResultCollector :: new ( channel_count) ) ;
518+ // 上限最多 5s:单路卡死时才会等满;若所有通道在窗口内回完,主循环会提前结束(不睡满秒数)。
519+ let submit_timeout_secs: u64 =
520+ ( 3u64 + ( channel_count. min ( 16 ) as u64 ) . div_ceil ( 2 ) . min ( 6 ) ) . clamp ( 3 , 5 ) ;
496521 let shared = Arc :: new ( SwqosSharedContext {
497522 payer,
498523 instructions,
@@ -562,13 +587,20 @@ pub async fn execute_parallel(
562587 // All jobs enqueued (no spawn on hot path)
563588
564589 if !wait_transaction_confirmed {
565- const SUBMIT_TIMEOUT_SECS : u64 = 2 ; //无需确认的交易,一般2秒合适了 一般2秒内发送全都返回 没返回的也不等了,没返回的就是太慢的swqos
566- let ret = collector. wait_for_all_submitted ( SUBMIT_TIMEOUT_SECS ) . await . unwrap_or ( (
567- false ,
568- vec ! [ ] ,
569- Some ( anyhow ! ( "No SWQOS result within {}s" , SUBMIT_TIMEOUT_SECS ) ) ,
570- vec ! [ ] ,
571- ) ) ;
590+ // submit_timeout_secs 为「等齐各 SWQOS HTTP 应答」的上限;收齐后会立刻进入短 settle,不会睡满整段秒数。
591+ // `wait_for_all_submitted` 仅在未收齐时追加长 grace,避免过早 drain 丢晚到签名。
592+ let ret = collector
593+ . wait_for_all_submitted ( submit_timeout_secs)
594+ . await
595+ . unwrap_or ( (
596+ false ,
597+ vec ! [ ] ,
598+ Some ( anyhow ! (
599+ "No SWQOS result within grace window (primary {}s)" ,
600+ submit_timeout_secs
601+ ) ) ,
602+ vec ! [ ] ,
603+ ) ) ;
572604 let ( success, signatures, last_error, submit_timings) = ret;
573605 return Ok ( ( success, signatures, last_error, submit_timings) ) ;
574606 }
0 commit comments