@@ -1019,10 +1019,13 @@ mod tests {
1019
1019
use bitcoin:: secp256k1:: { PublicKey , Secp256k1 , SecretKey } ;
1020
1020
use bitcoin:: transaction:: Version ;
1021
1021
use bitcoin:: { Amount , ScriptBuf , Txid } ;
1022
+ use core:: sync:: atomic:: { AtomicBool , Ordering } ;
1022
1023
use lightning:: chain:: channelmonitor:: ANTI_REORG_DELAY ;
1023
1024
use lightning:: chain:: transaction:: OutPoint ;
1024
1025
use lightning:: chain:: { chainmonitor, BestBlock , Confirm , Filter } ;
1025
- use lightning:: events:: { Event , MessageSendEvent , MessageSendEventsProvider , PathFailure } ;
1026
+ use lightning:: events:: {
1027
+ Event , MessageSendEvent , MessageSendEventsProvider , PathFailure , ReplayEvent ,
1028
+ } ;
1026
1029
use lightning:: ln:: channelmanager;
1027
1030
use lightning:: ln:: channelmanager:: {
1028
1031
ChainParameters , PaymentId , BREAKDOWN_TIMEOUT , MIN_CLTV_EXPIRY_DELTA ,
@@ -2228,6 +2231,54 @@ mod tests {
2228
2231
}
2229
2232
}
2230
2233
2234
+ #[ test]
2235
+ fn test_event_handling_failures_are_replayed ( ) {
2236
+ let ( _, nodes) = create_nodes ( 2 , "test_event_handling_failures_are_replayed" ) ;
2237
+ let channel_value = 100000 ;
2238
+ let data_dir = nodes[ 0 ] . kv_store . get_data_dir ( ) ;
2239
+ let persister = Arc :: new ( Persister :: new ( data_dir. clone ( ) ) ) ;
2240
+
2241
+ let ( first_event_send, first_event_recv) = std:: sync:: mpsc:: sync_channel ( 1 ) ;
2242
+ let ( second_event_send, second_event_recv) = std:: sync:: mpsc:: sync_channel ( 1 ) ;
2243
+ let should_fail_event_handling = Arc :: new ( AtomicBool :: new ( true ) ) ;
2244
+ let event_handler = move |event : Event | {
2245
+ if let Ok ( true ) = should_fail_event_handling. compare_exchange (
2246
+ true ,
2247
+ false ,
2248
+ Ordering :: Acquire ,
2249
+ Ordering :: Relaxed ,
2250
+ ) {
2251
+ first_event_send. send ( event) . unwrap ( ) ;
2252
+ return Err ( ReplayEvent ( ) ) ;
2253
+ }
2254
+
2255
+ second_event_send. send ( event) . unwrap ( ) ;
2256
+ Ok ( ( ) )
2257
+ } ;
2258
+
2259
+ let bg_processor = BackgroundProcessor :: start (
2260
+ persister,
2261
+ event_handler,
2262
+ nodes[ 0 ] . chain_monitor . clone ( ) ,
2263
+ nodes[ 0 ] . node . clone ( ) ,
2264
+ Some ( nodes[ 0 ] . messenger . clone ( ) ) ,
2265
+ nodes[ 0 ] . no_gossip_sync ( ) ,
2266
+ nodes[ 0 ] . peer_manager . clone ( ) ,
2267
+ nodes[ 0 ] . logger . clone ( ) ,
2268
+ Some ( nodes[ 0 ] . scorer . clone ( ) ) ,
2269
+ ) ;
2270
+
2271
+ begin_open_channel ! ( nodes[ 0 ] , nodes[ 1 ] , channel_value) ;
2272
+ assert_eq ! (
2273
+ first_event_recv. recv_timeout( Duration :: from_secs( EVENT_DEADLINE ) ) ,
2274
+ second_event_recv. recv_timeout( Duration :: from_secs( EVENT_DEADLINE ) )
2275
+ ) ;
2276
+
2277
+ if !std:: thread:: panicking ( ) {
2278
+ bg_processor. stop ( ) . unwrap ( ) ;
2279
+ }
2280
+ }
2281
+
2231
2282
#[ test]
2232
2283
fn test_scorer_persistence ( ) {
2233
2284
let ( _, nodes) = create_nodes ( 2 , "test_scorer_persistence" ) ;
0 commit comments