@@ -18,6 +18,7 @@ use lightning::ln::channelmanager::ChannelManager;
18
18
use lightning:: ln:: msgs:: { ChannelMessageHandler , RoutingMessageHandler } ;
19
19
use lightning:: ln:: peer_handler:: { CustomMessageHandler , PeerManager , SocketDescriptor } ;
20
20
use lightning:: routing:: network_graph:: { NetworkGraph , NetGraphMsgHandler } ;
21
+ use lightning:: routing:: scoring:: WriteableScore ;
21
22
use lightning:: util:: events:: { Event , EventHandler , EventsProvider } ;
22
23
use lightning:: util:: logger:: Logger ;
23
24
use lightning:: util:: persist:: Persister ;
@@ -151,6 +152,7 @@ impl BackgroundProcessor {
151
152
/// [`NetworkGraph`]: lightning::routing::network_graph::NetworkGraph
152
153
/// [`NetworkGraph::write`]: lightning::routing::network_graph::NetworkGraph#impl-Writeable
153
154
pub fn start <
155
+ ' a ,
154
156
Signer : ' static + Sign ,
155
157
CA : ' static + Deref + Send + Sync ,
156
158
CF : ' static + Deref + Send + Sync ,
@@ -171,9 +173,11 @@ impl BackgroundProcessor {
171
173
NG : ' static + Deref < Target = NetGraphMsgHandler < G , CA , L > > + Send + Sync ,
172
174
UMH : ' static + Deref + Send + Sync ,
173
175
PM : ' static + Deref < Target = PeerManager < Descriptor , CMH , RMH , L , UMH > > + Send + Sync ,
176
+ S : ' static + Deref < Target = SC > + Send + Sync ,
177
+ SC : WriteableScore < ' a > ,
174
178
> (
175
179
persister : PS , event_handler : EH , chain_monitor : M , channel_manager : CM ,
176
- net_graph_msg_handler : Option < NG > , peer_manager : PM , logger : L
180
+ net_graph_msg_handler : Option < NG > , peer_manager : PM , logger : L , scorer : Option < S >
177
181
) -> Self
178
182
where
179
183
CA :: Target : ' static + chain:: Access ,
@@ -187,7 +191,7 @@ impl BackgroundProcessor {
187
191
CMH :: Target : ' static + ChannelMessageHandler ,
188
192
RMH :: Target : ' static + RoutingMessageHandler ,
189
193
UMH :: Target : ' static + CustomMessageHandler ,
190
- PS :: Target : ' static + Persister < Signer , CW , T , K , F , L >
194
+ PS :: Target : ' static + Persister < ' a , Signer , CW , T , K , F , L , SC > ,
191
195
{
192
196
let stop_thread = Arc :: new ( AtomicBool :: new ( false ) ) ;
193
197
let stop_thread_clone = stop_thread. clone ( ) ;
@@ -274,9 +278,16 @@ impl BackgroundProcessor {
274
278
if let Err ( e) = persister. persist_graph ( handler. network_graph ( ) ) {
275
279
log_error ! ( logger, "Error: Failed to persist network graph, check your disk and permissions {}" , e)
276
280
}
277
- last_prune_call = Instant :: now ( ) ;
278
- have_pruned = true ;
279
281
}
282
+ if let Some ( ref scorer) = scorer {
283
+ log_trace ! ( logger, "Persisting scorer" ) ;
284
+ if let Err ( e) = persister. persist_scorer ( & scorer) {
285
+ log_error ! ( logger, "Error: Failed to persist scorer, check your disk and permissions {}" , e)
286
+ }
287
+ }
288
+
289
+ last_prune_call = Instant :: now ( ) ;
290
+ have_pruned = true ;
280
291
}
281
292
}
282
293
@@ -285,10 +296,16 @@ impl BackgroundProcessor {
285
296
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
286
297
persister. persist_manager ( & * channel_manager) ?;
287
298
299
+ // Persist Scorer on exit
300
+ if let Some ( ref scorer) = scorer {
301
+ persister. persist_scorer ( & scorer) ?;
302
+ }
303
+
288
304
// Persist NetworkGraph on exit
289
305
if let Some ( ref handler) = net_graph_msg_handler {
290
306
persister. persist_graph ( handler. network_graph ( ) ) ?;
291
307
}
308
+
292
309
Ok ( ( ) )
293
310
} ) ;
294
311
Self { stop_thread : stop_thread_clone, thread_handle : Some ( handle) }
@@ -369,6 +386,7 @@ mod tests {
369
386
use std:: path:: PathBuf ;
370
387
use std:: sync:: { Arc , Mutex } ;
371
388
use std:: time:: Duration ;
389
+ use lightning:: routing:: scoring:: { FixedPenaltyScorer } ;
372
390
use super :: { BackgroundProcessor , FRESHNESS_TIMER } ;
373
391
374
392
const EVENT_DEADLINE : u64 = 5 * FRESHNESS_TIMER ;
@@ -395,6 +413,7 @@ mod tests {
395
413
network_graph : Arc < NetworkGraph > ,
396
414
logger : Arc < test_utils:: TestLogger > ,
397
415
best_block : BestBlock ,
416
+ scorer : Arc < Mutex < FixedPenaltyScorer > > ,
398
417
}
399
418
400
419
impl Drop for Node {
@@ -410,13 +429,14 @@ mod tests {
410
429
struct Persister {
411
430
graph_error : Option < ( std:: io:: ErrorKind , & ' static str ) > ,
412
431
manager_error : Option < ( std:: io:: ErrorKind , & ' static str ) > ,
432
+ scorer_error : Option < ( std:: io:: ErrorKind , & ' static str ) > ,
413
433
filesystem_persister : FilesystemPersister ,
414
434
}
415
435
416
436
impl Persister {
417
437
fn new ( data_dir : String ) -> Self {
418
438
let filesystem_persister = FilesystemPersister :: new ( data_dir. clone ( ) ) ;
419
- Self { graph_error : None , manager_error : None , filesystem_persister }
439
+ Self { graph_error : None , manager_error : None , scorer_error : None , filesystem_persister }
420
440
}
421
441
422
442
fn with_graph_error ( self , error : std:: io:: ErrorKind , message : & ' static str ) -> Self {
@@ -426,6 +446,10 @@ mod tests {
426
446
fn with_manager_error ( self , error : std:: io:: ErrorKind , message : & ' static str ) -> Self {
427
447
Self { manager_error : Some ( ( error, message) ) , ..self }
428
448
}
449
+
450
+ fn with_scorer_error ( self , error : std:: io:: ErrorKind , message : & ' static str ) -> Self {
451
+ Self { scorer_error : Some ( ( error, message) ) , ..self }
452
+ }
429
453
}
430
454
431
455
impl KVStorePersister for Persister {
@@ -442,6 +466,12 @@ mod tests {
442
466
}
443
467
}
444
468
469
+ if key == "scorer" {
470
+ if let Some ( ( error, message) ) = self . scorer_error {
471
+ return Err ( std:: io:: Error :: new ( error, message) )
472
+ }
473
+ }
474
+
445
475
self . filesystem_persister . persist ( key, object)
446
476
}
447
477
}
@@ -473,7 +503,8 @@ mod tests {
473
503
let net_graph_msg_handler = Some ( Arc :: new ( NetGraphMsgHandler :: new ( network_graph. clone ( ) , Some ( chain_source. clone ( ) ) , logger. clone ( ) ) ) ) ;
474
504
let msg_handler = MessageHandler { chan_handler : Arc :: new ( test_utils:: TestChannelMessageHandler :: new ( ) ) , route_handler : Arc :: new ( test_utils:: TestRoutingMessageHandler :: new ( ) ) } ;
475
505
let peer_manager = Arc :: new ( PeerManager :: new ( msg_handler, keys_manager. get_node_secret ( Recipient :: Node ) . unwrap ( ) , & seed, logger. clone ( ) , IgnoringMessageHandler { } ) ) ;
476
- let node = Node { node : manager, net_graph_msg_handler, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block } ;
506
+ let scorer = Arc :: new ( Mutex :: new ( test_utils:: TestScorer :: with_penalty ( 0 ) ) ) ;
507
+ let node = Node { node : manager, net_graph_msg_handler, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer } ;
477
508
nodes. push ( node) ;
478
509
}
479
510
@@ -571,7 +602,7 @@ mod tests {
571
602
let data_dir = nodes[ 0 ] . persister . get_data_dir ( ) ;
572
603
let persister = Arc :: new ( Persister :: new ( data_dir) ) ;
573
604
let event_handler = |_: & _ | { } ;
574
- let bg_processor = BackgroundProcessor :: start ( persister, event_handler, nodes[ 0 ] . chain_monitor . clone ( ) , nodes[ 0 ] . node . clone ( ) , nodes[ 0 ] . net_graph_msg_handler . clone ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) ) ;
605
+ let bg_processor = BackgroundProcessor :: start ( persister, event_handler, nodes[ 0 ] . chain_monitor . clone ( ) , nodes[ 0 ] . node . clone ( ) , nodes[ 0 ] . net_graph_msg_handler . clone ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) , Some ( nodes [ 0 ] . scorer . clone ( ) ) ) ;
575
606
576
607
macro_rules! check_persisted_data {
577
608
( $node: expr, $filepath: expr) => {
@@ -621,6 +652,10 @@ mod tests {
621
652
check_persisted_data ! ( network_graph, filepath. clone( ) ) ;
622
653
}
623
654
655
+ // Check scorer is persisted
656
+ let filepath = get_full_filepath ( "test_background_processor_persister_0" . to_string ( ) , "scorer" . to_string ( ) ) ;
657
+ check_persisted_data ! ( nodes[ 0 ] . scorer, filepath. clone( ) ) ;
658
+
624
659
assert ! ( bg_processor. stop( ) . is_ok( ) ) ;
625
660
}
626
661
@@ -632,7 +667,7 @@ mod tests {
632
667
let data_dir = nodes[ 0 ] . persister . get_data_dir ( ) ;
633
668
let persister = Arc :: new ( Persister :: new ( data_dir) ) ;
634
669
let event_handler = |_: & _ | { } ;
635
- let bg_processor = BackgroundProcessor :: start ( persister, event_handler, nodes[ 0 ] . chain_monitor . clone ( ) , nodes[ 0 ] . node . clone ( ) , nodes[ 0 ] . net_graph_msg_handler . clone ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) ) ;
670
+ let bg_processor = BackgroundProcessor :: start ( persister, event_handler, nodes[ 0 ] . chain_monitor . clone ( ) , nodes[ 0 ] . node . clone ( ) , nodes[ 0 ] . net_graph_msg_handler . clone ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) , Some ( nodes [ 0 ] . scorer . clone ( ) ) ) ;
636
671
loop {
637
672
let log_entries = nodes[ 0 ] . logger . lines . lock ( ) . unwrap ( ) ;
638
673
let desired_log = "Calling ChannelManager's timer_tick_occurred" . to_string ( ) ;
@@ -655,7 +690,7 @@ mod tests {
655
690
let data_dir = nodes[ 0 ] . persister . get_data_dir ( ) ;
656
691
let persister = Arc :: new ( Persister :: new ( data_dir) . with_manager_error ( std:: io:: ErrorKind :: Other , "test" ) ) ;
657
692
let event_handler = |_: & _ | { } ;
658
- let bg_processor = BackgroundProcessor :: start ( persister, event_handler, nodes[ 0 ] . chain_monitor . clone ( ) , nodes[ 0 ] . node . clone ( ) , nodes[ 0 ] . net_graph_msg_handler . clone ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) ) ;
693
+ let bg_processor = BackgroundProcessor :: start ( persister, event_handler, nodes[ 0 ] . chain_monitor . clone ( ) , nodes[ 0 ] . node . clone ( ) , nodes[ 0 ] . net_graph_msg_handler . clone ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) , Some ( nodes [ 0 ] . scorer . clone ( ) ) ) ;
659
694
match bg_processor. join ( ) {
660
695
Ok ( _) => panic ! ( "Expected error persisting manager" ) ,
661
696
Err ( e) => {
@@ -672,7 +707,7 @@ mod tests {
672
707
let data_dir = nodes[ 0 ] . persister . get_data_dir ( ) ;
673
708
let persister = Arc :: new ( Persister :: new ( data_dir) . with_graph_error ( std:: io:: ErrorKind :: Other , "test" ) ) ;
674
709
let event_handler = |_: & _ | { } ;
675
- let bg_processor = BackgroundProcessor :: start ( persister, event_handler, nodes[ 0 ] . chain_monitor . clone ( ) , nodes[ 0 ] . node . clone ( ) , nodes[ 0 ] . net_graph_msg_handler . clone ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) ) ;
710
+ let bg_processor = BackgroundProcessor :: start ( persister, event_handler, nodes[ 0 ] . chain_monitor . clone ( ) , nodes[ 0 ] . node . clone ( ) , nodes[ 0 ] . net_graph_msg_handler . clone ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) , Some ( nodes [ 0 ] . scorer . clone ( ) ) ) ;
676
711
677
712
match bg_processor. stop ( ) {
678
713
Ok ( _) => panic ! ( "Expected error persisting network graph" ) ,
@@ -683,6 +718,24 @@ mod tests {
683
718
}
684
719
}
685
720
721
+ #[ test]
722
+ fn test_scorer_persist_error ( ) {
723
+ // Test that if we encounter an error during scorer persistence, an error gets returned.
724
+ let nodes = create_nodes ( 2 , "test_persist_scorer_error" . to_string ( ) ) ;
725
+ let data_dir = nodes[ 0 ] . persister . get_data_dir ( ) ;
726
+ let persister = Arc :: new ( Persister :: new ( data_dir) . with_scorer_error ( std:: io:: ErrorKind :: Other , "test" ) ) ;
727
+ let event_handler = |_: & _ | { } ;
728
+ let bg_processor = BackgroundProcessor :: start ( persister, event_handler, nodes[ 0 ] . chain_monitor . clone ( ) , nodes[ 0 ] . node . clone ( ) , nodes[ 0 ] . net_graph_msg_handler . clone ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) , Some ( nodes[ 0 ] . scorer . clone ( ) ) ) ;
729
+
730
+ match bg_processor. stop ( ) {
731
+ Ok ( _) => panic ! ( "Expected error persisting scorer" ) ,
732
+ Err ( e) => {
733
+ assert_eq ! ( e. kind( ) , std:: io:: ErrorKind :: Other ) ;
734
+ assert_eq ! ( e. get_ref( ) . unwrap( ) . to_string( ) , "test" ) ;
735
+ } ,
736
+ }
737
+ }
738
+
686
739
#[ test]
687
740
fn test_background_event_handling ( ) {
688
741
let mut nodes = create_nodes ( 2 , "test_background_event_handling" . to_string ( ) ) ;
@@ -695,7 +748,7 @@ mod tests {
695
748
let event_handler = move |event : & Event | {
696
749
sender. send ( handle_funding_generation_ready ! ( event, channel_value) ) . unwrap ( ) ;
697
750
} ;
698
- let bg_processor = BackgroundProcessor :: start ( persister, event_handler, nodes[ 0 ] . chain_monitor . clone ( ) , nodes[ 0 ] . node . clone ( ) , nodes[ 0 ] . net_graph_msg_handler . clone ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) ) ;
751
+ let bg_processor = BackgroundProcessor :: start ( persister, event_handler, nodes[ 0 ] . chain_monitor . clone ( ) , nodes[ 0 ] . node . clone ( ) , nodes[ 0 ] . net_graph_msg_handler . clone ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) , Some ( nodes [ 0 ] . scorer . clone ( ) ) ) ;
699
752
700
753
// Open a channel and check that the FundingGenerationReady event was handled.
701
754
begin_open_channel ! ( nodes[ 0 ] , nodes[ 1 ] , channel_value) ;
@@ -720,7 +773,7 @@ mod tests {
720
773
let ( sender, receiver) = std:: sync:: mpsc:: sync_channel ( 1 ) ;
721
774
let event_handler = move |event : & Event | sender. send ( event. clone ( ) ) . unwrap ( ) ;
722
775
let persister = Arc :: new ( Persister :: new ( data_dir) ) ;
723
- let bg_processor = BackgroundProcessor :: start ( persister, event_handler, nodes[ 0 ] . chain_monitor . clone ( ) , nodes[ 0 ] . node . clone ( ) , nodes[ 0 ] . net_graph_msg_handler . clone ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) ) ;
776
+ let bg_processor = BackgroundProcessor :: start ( persister, event_handler, nodes[ 0 ] . chain_monitor . clone ( ) , nodes[ 0 ] . node . clone ( ) , nodes[ 0 ] . net_graph_msg_handler . clone ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) , Some ( nodes [ 0 ] . scorer . clone ( ) ) ) ;
724
777
725
778
// Force close the channel and check that the SpendableOutputs event was handled.
726
779
nodes[ 0 ] . node . force_close_channel ( & nodes[ 0 ] . node . list_channels ( ) [ 0 ] . channel_id ) . unwrap ( ) ;
@@ -747,11 +800,10 @@ mod tests {
747
800
// Initiate the background processors to watch each node.
748
801
let data_dir = nodes[ 0 ] . persister . get_data_dir ( ) ;
749
802
let persister = Arc :: new ( Persister :: new ( data_dir) ) ;
750
- let scorer = Arc :: new ( Mutex :: new ( test_utils:: TestScorer :: with_penalty ( 0 ) ) ) ;
751
803
let router = DefaultRouter :: new ( Arc :: clone ( & nodes[ 0 ] . network_graph ) , Arc :: clone ( & nodes[ 0 ] . logger ) , random_seed_bytes) ;
752
- let invoice_payer = Arc :: new ( InvoicePayer :: new ( Arc :: clone ( & nodes[ 0 ] . node ) , router, scorer, Arc :: clone ( & nodes[ 0 ] . logger ) , |_: & _ | { } , RetryAttempts ( 2 ) ) ) ;
804
+ let invoice_payer = Arc :: new ( InvoicePayer :: new ( Arc :: clone ( & nodes[ 0 ] . node ) , router, Arc :: clone ( & nodes [ 0 ] . scorer ) , Arc :: clone ( & nodes[ 0 ] . logger ) , |_: & _ | { } , RetryAttempts ( 2 ) ) ) ;
753
805
let event_handler = Arc :: clone ( & invoice_payer) ;
754
- let bg_processor = BackgroundProcessor :: start ( persister, event_handler, nodes[ 0 ] . chain_monitor . clone ( ) , nodes[ 0 ] . node . clone ( ) , nodes[ 0 ] . net_graph_msg_handler . clone ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) ) ;
806
+ let bg_processor = BackgroundProcessor :: start ( persister, event_handler, nodes[ 0 ] . chain_monitor . clone ( ) , nodes[ 0 ] . node . clone ( ) , nodes[ 0 ] . net_graph_msg_handler . clone ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) , Some ( nodes [ 0 ] . scorer . clone ( ) ) ) ;
755
807
assert ! ( bg_processor. stop( ) . is_ok( ) ) ;
756
808
}
757
809
}
0 commit comments