-
Notifications
You must be signed in to change notification settings - Fork 404
Add utils to persist scorer in BackgroundProcessor #1416
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,7 @@ use lightning::ln::channelmanager::ChannelManager; | |
use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler}; | ||
use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor}; | ||
use lightning::routing::network_graph::{NetworkGraph, NetGraphMsgHandler}; | ||
use lightning::routing::scoring::WriteableScore; | ||
use lightning::util::events::{Event, EventHandler, EventsProvider}; | ||
use lightning::util::logger::Logger; | ||
use lightning::util::persist::Persister; | ||
|
@@ -151,6 +152,7 @@ impl BackgroundProcessor { | |
/// [`NetworkGraph`]: lightning::routing::network_graph::NetworkGraph | ||
/// [`NetworkGraph::write`]: lightning::routing::network_graph::NetworkGraph#impl-Writeable | ||
pub fn start< | ||
'a, | ||
Signer: 'static + Sign, | ||
CA: 'static + Deref + Send + Sync, | ||
CF: 'static + Deref + Send + Sync, | ||
|
@@ -171,9 +173,11 @@ impl BackgroundProcessor { | |
NG: 'static + Deref<Target = NetGraphMsgHandler<G, CA, L>> + Send + Sync, | ||
UMH: 'static + Deref + Send + Sync, | ||
PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync, | ||
S: 'static + Deref<Target = SC> + Send + Sync, | ||
SC: WriteableScore<'a>, | ||
>( | ||
persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM, | ||
net_graph_msg_handler: Option<NG>, peer_manager: PM, logger: L | ||
net_graph_msg_handler: Option<NG>, peer_manager: PM, logger: L, scorer: Option<S> | ||
) -> Self | ||
where | ||
CA::Target: 'static + chain::Access, | ||
|
@@ -187,7 +191,7 @@ impl BackgroundProcessor { | |
CMH::Target: 'static + ChannelMessageHandler, | ||
RMH::Target: 'static + RoutingMessageHandler, | ||
UMH::Target: 'static + CustomMessageHandler, | ||
PS::Target: 'static + Persister<Signer, CW, T, K, F, L> | ||
PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>, | ||
{ | ||
let stop_thread = Arc::new(AtomicBool::new(false)); | ||
let stop_thread_clone = stop_thread.clone(); | ||
|
@@ -274,9 +278,16 @@ impl BackgroundProcessor { | |
if let Err(e) = persister.persist_graph(handler.network_graph()) { | ||
log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}", e) | ||
} | ||
last_prune_call = Instant::now(); | ||
have_pruned = true; | ||
} | ||
if let Some(ref scorer) = scorer { | ||
log_trace!(logger, "Persisting scorer"); | ||
if let Err(e) = persister.persist_scorer(&scorer) { | ||
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e) | ||
} | ||
} | ||
|
||
last_prune_call = Instant::now(); | ||
have_pruned = true; | ||
} | ||
} | ||
|
||
|
@@ -285,10 +296,16 @@ impl BackgroundProcessor { | |
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update. | ||
persister.persist_manager(&*channel_manager)?; | ||
|
||
// Persist Scorer on exit | ||
if let Some(ref scorer) = scorer { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: lets do this before the network graph. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it be worth doing the same in the loop? |
||
persister.persist_scorer(&scorer)?; | ||
} | ||
|
||
// Persist NetworkGraph on exit | ||
if let Some(ref handler) = net_graph_msg_handler { | ||
persister.persist_graph(handler.network_graph())?; | ||
} | ||
|
||
Ok(()) | ||
}); | ||
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) } | ||
|
@@ -369,6 +386,7 @@ mod tests { | |
use std::path::PathBuf; | ||
use std::sync::{Arc, Mutex}; | ||
use std::time::Duration; | ||
use lightning::routing::scoring::{FixedPenaltyScorer}; | ||
use super::{BackgroundProcessor, FRESHNESS_TIMER}; | ||
|
||
const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER; | ||
|
@@ -395,6 +413,7 @@ mod tests { | |
network_graph: Arc<NetworkGraph>, | ||
logger: Arc<test_utils::TestLogger>, | ||
best_block: BestBlock, | ||
scorer: Arc<Mutex<FixedPenaltyScorer>>, | ||
} | ||
|
||
impl Drop for Node { | ||
|
@@ -410,13 +429,14 @@ mod tests { | |
struct Persister { | ||
graph_error: Option<(std::io::ErrorKind, &'static str)>, | ||
manager_error: Option<(std::io::ErrorKind, &'static str)>, | ||
scorer_error: Option<(std::io::ErrorKind, &'static str)>, | ||
filesystem_persister: FilesystemPersister, | ||
} | ||
|
||
impl Persister { | ||
fn new(data_dir: String) -> Self { | ||
let filesystem_persister = FilesystemPersister::new(data_dir.clone()); | ||
Self { graph_error: None, manager_error: None, filesystem_persister } | ||
Self { graph_error: None, manager_error: None, scorer_error: None, filesystem_persister } | ||
} | ||
|
||
fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self { | ||
|
@@ -426,6 +446,10 @@ mod tests { | |
fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self { | ||
Self { manager_error: Some((error, message)), ..self } | ||
} | ||
|
||
fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self { | ||
Self { scorer_error: Some((error, message)), ..self } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks like the scorer_error isn't used - need to update the persist method below. |
||
} | ||
} | ||
|
||
impl KVStorePersister for Persister { | ||
|
@@ -442,6 +466,12 @@ mod tests { | |
} | ||
} | ||
|
||
if key == "scorer" { | ||
if let Some((error, message)) = self.scorer_error { | ||
return Err(std::io::Error::new(error, message)) | ||
} | ||
} | ||
|
||
self.filesystem_persister.persist(key, object) | ||
} | ||
} | ||
|
@@ -473,7 +503,8 @@ mod tests { | |
let net_graph_msg_handler = Some(Arc::new(NetGraphMsgHandler::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()))); | ||
let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )}; | ||
let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), &seed, logger.clone(), IgnoringMessageHandler{})); | ||
let node = Node { node: manager, net_graph_msg_handler, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block }; | ||
let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0))); | ||
let node = Node { node: manager, net_graph_msg_handler, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer }; | ||
nodes.push(node); | ||
} | ||
|
||
|
@@ -571,7 +602,7 @@ mod tests { | |
let data_dir = nodes[0].persister.get_data_dir(); | ||
let persister = Arc::new(Persister::new(data_dir)); | ||
let event_handler = |_: &_| {}; | ||
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); | ||
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); | ||
|
||
macro_rules! check_persisted_data { | ||
($node: expr, $filepath: expr) => { | ||
|
@@ -621,6 +652,10 @@ mod tests { | |
check_persisted_data!(network_graph, filepath.clone()); | ||
} | ||
|
||
// Check scorer is persisted | ||
let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string()); | ||
check_persisted_data!(nodes[0].scorer, filepath.clone()); | ||
|
||
assert!(bg_processor.stop().is_ok()); | ||
} | ||
|
||
|
@@ -632,7 +667,7 @@ mod tests { | |
let data_dir = nodes[0].persister.get_data_dir(); | ||
let persister = Arc::new(Persister::new(data_dir)); | ||
let event_handler = |_: &_| {}; | ||
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); | ||
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); | ||
loop { | ||
let log_entries = nodes[0].logger.lines.lock().unwrap(); | ||
let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string(); | ||
|
@@ -655,7 +690,7 @@ mod tests { | |
let data_dir = nodes[0].persister.get_data_dir(); | ||
let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test")); | ||
let event_handler = |_: &_| {}; | ||
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); | ||
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); | ||
match bg_processor.join() { | ||
Ok(_) => panic!("Expected error persisting manager"), | ||
Err(e) => { | ||
|
@@ -672,7 +707,7 @@ mod tests { | |
let data_dir = nodes[0].persister.get_data_dir(); | ||
let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test")); | ||
let event_handler = |_: &_| {}; | ||
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); | ||
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); | ||
|
||
match bg_processor.stop() { | ||
Ok(_) => panic!("Expected error persisting network graph"), | ||
|
@@ -683,6 +718,24 @@ mod tests { | |
} | ||
} | ||
|
||
#[test] | ||
fn test_scorer_persist_error() { | ||
// Test that if we encounter an error during scorer persistence, an error gets returned. | ||
let nodes = create_nodes(2, "test_persist_scorer_error".to_string()); | ||
let data_dir = nodes[0].persister.get_data_dir(); | ||
let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test")); | ||
let event_handler = |_: &_| {}; | ||
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); | ||
|
||
match bg_processor.stop() { | ||
Ok(_) => panic!("Expected error persisting scorer"), | ||
Err(e) => { | ||
assert_eq!(e.kind(), std::io::ErrorKind::Other); | ||
assert_eq!(e.get_ref().unwrap().to_string(), "test"); | ||
}, | ||
} | ||
} | ||
|
||
#[test] | ||
fn test_background_event_handling() { | ||
let mut nodes = create_nodes(2, "test_background_event_handling".to_string()); | ||
|
@@ -695,7 +748,7 @@ mod tests { | |
let event_handler = move |event: &Event| { | ||
sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap(); | ||
}; | ||
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); | ||
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); | ||
|
||
// Open a channel and check that the FundingGenerationReady event was handled. | ||
begin_open_channel!(nodes[0], nodes[1], channel_value); | ||
|
@@ -720,7 +773,7 @@ mod tests { | |
let (sender, receiver) = std::sync::mpsc::sync_channel(1); | ||
let event_handler = move |event: &Event| sender.send(event.clone()).unwrap(); | ||
let persister = Arc::new(Persister::new(data_dir)); | ||
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); | ||
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); | ||
|
||
// Force close the channel and check that the SpendableOutputs event was handled. | ||
nodes[0].node.force_close_channel(&nodes[0].node.list_channels()[0].channel_id).unwrap(); | ||
|
@@ -747,11 +800,10 @@ mod tests { | |
// Initiate the background processors to watch each node. | ||
let data_dir = nodes[0].persister.get_data_dir(); | ||
let persister = Arc::new(Persister::new(data_dir)); | ||
let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0))); | ||
let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes); | ||
let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, scorer, Arc::clone(&nodes[0].logger), |_: &_| {}, RetryAttempts(2))); | ||
let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].scorer), Arc::clone(&nodes[0].logger), |_: &_| {}, RetryAttempts(2))); | ||
let event_handler = Arc::clone(&invoice_payer); | ||
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); | ||
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); | ||
assert!(bg_processor.stop().is_ok()); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, this logic is busted if we don't have a
net_graph_msg_handler
provided (which, okay, should be rare). Just need to move thelast_prune_call
andhave_pruned
updates out of the above if. While you're at it, can you add a trace-level log before callingpersist_scorer
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this predates this change. @jurvis Could you make a separate commit for this fix?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, I can do that 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jkczyz done