11use {
22 crate :: transaction_notifier_interface:: TransactionNotifierArc ,
3- crossbeam_channel:: { Receiver , RecvTimeoutError } ,
3+ crossbeam_channel:: { Receiver , TryRecvError } ,
44 itertools:: izip,
55 solana_ledger:: {
66 blockstore:: { Blockstore , BlockstoreError } ,
@@ -12,10 +12,10 @@ use {
1212 } ,
1313 std:: {
1414 sync:: {
15- atomic:: { AtomicBool , AtomicU64 , Ordering } ,
15+ atomic:: { AtomicBool , AtomicU64 , AtomicUsize , Ordering } ,
1616 Arc ,
1717 } ,
18- thread:: { self , Builder , JoinHandle } ,
18+ thread:: { self , sleep , Builder , JoinHandle } ,
1919 time:: Duration ,
2020 } ,
2121} ;
@@ -49,38 +49,60 @@ impl TransactionStatusService {
4949 . name ( "solTxStatusWrtr" . to_string ( ) )
5050 . spawn ( move || {
5151 info ! ( "TransactionStatusService has started" ) ;
52+
53+ let outstanding_thread_count = Arc :: new ( AtomicUsize :: new ( 0 ) ) ;
5254 loop {
5355 if exit. load ( Ordering :: Relaxed ) {
56+ // Wait for the outstanding worker threads to complete before
57+ // joining the main thread and shutting down the service.
58+ while outstanding_thread_count. load ( Ordering :: SeqCst ) > 0 {
59+ sleep ( Duration :: from_millis ( 1 ) ) ;
60+ }
5461 break ;
5562 }
5663
57- let message = match transaction_status_receiver_handle
58- . recv_timeout ( Duration :: from_secs ( 1 ) )
59- {
64+ let message = match transaction_status_receiver_handle. try_recv ( ) {
6065 Ok ( message) => message,
61- Err ( RecvTimeoutError :: Disconnected ) => {
66+ Err ( TryRecvError :: Disconnected ) => {
6267 break ;
6368 }
64- Err ( RecvTimeoutError :: Timeout ) => {
69+ Err ( TryRecvError :: Empty ) => {
70+ // TSS is bandwidth sensitive at high TPS, but not necessarily
71+ // latency sensitive. We use a global thread pool to handle
72+ // bursts of work below. This sleep is intended to balance that
73+ // out so other users of the pool can make progress while TSS
74+ // builds up a backlog for the next burst.
75+ sleep ( Duration :: from_millis ( 50 ) ) ;
6576 continue ;
6677 }
6778 } ;
6879
69- match Self :: write_transaction_status_batch (
70- message,
71- & max_complete_transaction_status_slot,
72- enable_rpc_transaction_history,
73- transaction_notifier. clone ( ) ,
74- & blockstore,
75- enable_extended_tx_metadata_storage,
76- ) {
77- Ok ( _) => { }
78- Err ( err) => {
79- error ! ( "TransactionStatusService stopping due to error: {err}" ) ;
80- exit. store ( true , Ordering :: Relaxed ) ;
81- break ;
80+ let max_complete_transaction_status_slot =
81+ Arc :: clone ( & max_complete_transaction_status_slot) ;
82+ let blockstore = Arc :: clone ( & blockstore) ;
83+ let transaction_notifier = transaction_notifier. clone ( ) ;
84+ let exit_clone = Arc :: clone ( & exit) ;
85+ let outstanding_thread_count_handle = Arc :: clone ( & outstanding_thread_count) ;
86+
87+ outstanding_thread_count. fetch_add ( 1 , Ordering :: Relaxed ) ;
88+
89+ rayon:: spawn ( move || {
90+ match Self :: write_transaction_status_batch (
91+ message,
92+ & max_complete_transaction_status_slot,
93+ enable_rpc_transaction_history,
94+ transaction_notifier,
95+ & blockstore,
96+ enable_extended_tx_metadata_storage,
97+ ) {
98+ Ok ( _) => { }
99+ Err ( err) => {
100+ error ! ( "TransactionStatusService stopping due to error: {err}" ) ;
101+ exit_clone. store ( true , Ordering :: Relaxed ) ;
102+ }
82103 }
83- }
104+ outstanding_thread_count_handle. fetch_sub ( 1 , Ordering :: Relaxed ) ;
105+ } ) ;
84106 }
85107 info ! ( "TransactionStatusService has stopped" ) ;
86108 } )
0 commit comments