@@ -5,6 +5,8 @@ use alloy::{
55 providers:: { Provider , ProviderBuilder , WalletProvider } ,
66 rpc:: types:: Transaction ,
77} ;
8+ use std:: sync:: Arc ;
9+
810use anyhow:: Context ;
911use omni_types:: ChainKind ;
1012use tokio:: time:: { Duration , Sleep } ;
@@ -16,7 +18,6 @@ use crate::{
1618 self ,
1719 pending_transactions:: { self , PendingTransaction } ,
1820 } ,
19- workers:: RetryableEvent ,
2021} ;
2122
2223enum ShouldBump {
@@ -35,7 +36,12 @@ pub async fn start_evm_fee_bumping(
3536 config : & config:: Config ,
3637 chain_kind : ChainKind ,
3738 redis_connection_manager : & mut redis:: aio:: ConnectionManager ,
39+ nats_client : Arc < utils:: nats:: NatsClient > ,
3840) -> anyhow:: Result < ( ) > {
41+ let nats_config = config
42+ . nats
43+ . as_ref ( )
44+ . context ( "NATS config is required for fee bumping" ) ?;
3945 let evm_config = match chain_kind {
4046 ChainKind :: Eth => & config. eth ,
4147 ChainKind :: Bnb => & config. bnb ,
@@ -118,14 +124,15 @@ pub async fn start_evm_fee_bumping(
118124 pending_tx. tx_hash, pending_tx. nonce
119125 ) ;
120126
121- utils:: redis:: add_event (
122- config,
123- redis_connection_manager,
124- utils:: redis:: EVENTS ,
125- format ! ( "replay:{}" , pending_tx. tx_hash) ,
126- RetryableEvent :: new ( & pending_tx. source_event ) ,
127- )
128- . await ;
127+ let chain = chain_kind. as_ref ( ) . to_ascii_lowercase ( ) ;
128+ let subject = format ! ( "{}.{chain}" , nats_config. relayer_subject) ;
129+ let key = format ! ( "replay:{}" , pending_tx. tx_hash) ;
130+ let payload = serde_json:: to_vec ( & pending_tx. source_event )
131+ . context ( "Failed to serialize source event" ) ?;
132+
133+ if let Err ( err) = nats_client. publish ( subject, & key, payload) . await {
134+ warn ! ( "Failed to publish replay event to NATS: {err:?}" ) ;
135+ }
129136
130137 utils:: redis:: zrem ( config, redis_connection_manager, & redis_key, pending_tx) . await ;
131138 }
0 commit comments