Skip to content

Commit 2bfddea

Browse files
authored
Merge pull request #2995 from tnull/2024-04-fallible-event-handler
Make event handling fallible
2 parents 0cfe55c + e617a39 commit 2bfddea

File tree

9 files changed

+594
-258
lines changed

9 files changed

+594
-258
lines changed

lightning-background-processor/src/lib.rs

+101-33
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ use lightning::chain::chainmonitor::{ChainMonitor, Persist};
2626
use lightning::events::EventHandler;
2727
#[cfg(feature = "std")]
2828
use lightning::events::EventsProvider;
29+
#[cfg(feature = "futures")]
30+
use lightning::events::ReplayEvent;
2931
use lightning::events::{Event, PathFailure};
3032

3133
use lightning::ln::channelmanager::AChannelManager;
@@ -583,6 +585,7 @@ use futures_util::{dummy_waker, Selector, SelectorOutput};
583585
/// could setup `process_events_async` like this:
584586
/// ```
585587
/// # use lightning::io;
588+
/// # use lightning::events::ReplayEvent;
586589
/// # use std::sync::{Arc, RwLock};
587590
/// # use std::sync::atomic::{AtomicBool, Ordering};
588591
/// # use std::time::SystemTime;
@@ -600,7 +603,7 @@ use futures_util::{dummy_waker, Selector, SelectorOutput};
600603
/// # }
601604
/// # struct EventHandler {}
602605
/// # impl EventHandler {
603-
/// # async fn handle_event(&self, _: lightning::events::Event) {}
606+
/// # async fn handle_event(&self, _: lightning::events::Event) -> Result<(), ReplayEvent> { Ok(()) }
604607
/// # }
605608
/// # #[derive(Eq, PartialEq, Clone, Hash)]
606609
/// # struct SocketDescriptor {}
@@ -698,7 +701,7 @@ pub async fn process_events_async<
698701
G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
699702
L: 'static + Deref + Send + Sync,
700703
P: 'static + Deref + Send + Sync,
701-
EventHandlerFuture: core::future::Future<Output = ()>,
704+
EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
702705
EventHandler: Fn(Event) -> EventHandlerFuture,
703706
PS: 'static + Deref + Send,
704707
M: 'static
@@ -751,12 +754,16 @@ where
751754
if update_scorer(scorer, &event, duration_since_epoch) {
752755
log_trace!(logger, "Persisting scorer after update");
753756
if let Err(e) = persister.persist_scorer(&scorer) {
754-
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
757+
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e);
758+
// We opt not to abort early on persistence failure here as persisting
759+
// the scorer is non-critical and we still hope that it will have
760+
// resolved itself when it is potentially critical in event handling
761+
// below.
755762
}
756763
}
757764
}
758765
}
759-
event_handler(event).await;
766+
event_handler(event).await
760767
})
761768
};
762769
define_run_body!(
@@ -913,7 +920,7 @@ impl BackgroundProcessor {
913920
}
914921
}
915922
}
916-
event_handler.handle_event(event);
923+
event_handler.handle_event(event)
917924
};
918925
define_run_body!(
919926
persister,
@@ -1012,10 +1019,13 @@ mod tests {
10121019
use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey};
10131020
use bitcoin::transaction::Version;
10141021
use bitcoin::{Amount, ScriptBuf, Txid};
1022+
use core::sync::atomic::{AtomicBool, Ordering};
10151023
use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
10161024
use lightning::chain::transaction::OutPoint;
10171025
use lightning::chain::{chainmonitor, BestBlock, Confirm, Filter};
1018-
use lightning::events::{Event, MessageSendEvent, MessageSendEventsProvider, PathFailure};
1026+
use lightning::events::{
1027+
Event, MessageSendEvent, MessageSendEventsProvider, PathFailure, ReplayEvent,
1028+
};
10191029
use lightning::ln::channelmanager;
10201030
use lightning::ln::channelmanager::{
10211031
ChainParameters, PaymentId, BREAKDOWN_TIMEOUT, MIN_CLTV_EXPIRY_DELTA,
@@ -1757,7 +1767,7 @@ mod tests {
17571767
// Initiate the background processors to watch each node.
17581768
let data_dir = nodes[0].kv_store.get_data_dir();
17591769
let persister = Arc::new(Persister::new(data_dir));
1760-
let event_handler = |_: _| {};
1770+
let event_handler = |_: _| Ok(());
17611771
let bg_processor = BackgroundProcessor::start(
17621772
persister,
17631773
event_handler,
@@ -1847,7 +1857,7 @@ mod tests {
18471857
let (_, nodes) = create_nodes(1, "test_timer_tick_called");
18481858
let data_dir = nodes[0].kv_store.get_data_dir();
18491859
let persister = Arc::new(Persister::new(data_dir));
1850-
let event_handler = |_: _| {};
1860+
let event_handler = |_: _| Ok(());
18511861
let bg_processor = BackgroundProcessor::start(
18521862
persister,
18531863
event_handler,
@@ -1889,7 +1899,7 @@ mod tests {
18891899
let persister = Arc::new(
18901900
Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"),
18911901
);
1892-
let event_handler = |_: _| {};
1902+
let event_handler = |_: _| Ok(());
18931903
let bg_processor = BackgroundProcessor::start(
18941904
persister,
18951905
event_handler,
@@ -1924,7 +1934,7 @@ mod tests {
19241934

19251935
let bp_future = super::process_events_async(
19261936
persister,
1927-
|_: _| async {},
1937+
|_: _| async { Ok(()) },
19281938
nodes[0].chain_monitor.clone(),
19291939
nodes[0].node.clone(),
19301940
Some(nodes[0].messenger.clone()),
@@ -1957,7 +1967,7 @@ mod tests {
19571967
let data_dir = nodes[0].kv_store.get_data_dir();
19581968
let persister =
19591969
Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
1960-
let event_handler = |_: _| {};
1970+
let event_handler = |_: _| Ok(());
19611971
let bg_processor = BackgroundProcessor::start(
19621972
persister,
19631973
event_handler,
@@ -1986,7 +1996,7 @@ mod tests {
19861996
let data_dir = nodes[0].kv_store.get_data_dir();
19871997
let persister =
19881998
Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
1989-
let event_handler = |_: _| {};
1999+
let event_handler = |_: _| Ok(());
19902000
let bg_processor = BackgroundProcessor::start(
19912001
persister,
19922002
event_handler,
@@ -2021,13 +2031,16 @@ mod tests {
20212031
// Set up a background event handler for FundingGenerationReady events.
20222032
let (funding_generation_send, funding_generation_recv) = std::sync::mpsc::sync_channel(1);
20232033
let (channel_pending_send, channel_pending_recv) = std::sync::mpsc::sync_channel(1);
2024-
let event_handler = move |event: Event| match event {
2025-
Event::FundingGenerationReady { .. } => funding_generation_send
2026-
.send(handle_funding_generation_ready!(event, channel_value))
2027-
.unwrap(),
2028-
Event::ChannelPending { .. } => channel_pending_send.send(()).unwrap(),
2029-
Event::ChannelReady { .. } => {},
2030-
_ => panic!("Unexpected event: {:?}", event),
2034+
let event_handler = move |event: Event| {
2035+
match event {
2036+
Event::FundingGenerationReady { .. } => funding_generation_send
2037+
.send(handle_funding_generation_ready!(event, channel_value))
2038+
.unwrap(),
2039+
Event::ChannelPending { .. } => channel_pending_send.send(()).unwrap(),
2040+
Event::ChannelReady { .. } => {},
2041+
_ => panic!("Unexpected event: {:?}", event),
2042+
}
2043+
Ok(())
20312044
};
20322045

20332046
let bg_processor = BackgroundProcessor::start(
@@ -2082,11 +2095,14 @@ mod tests {
20822095

20832096
// Set up a background event handler for SpendableOutputs events.
20842097
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
2085-
let event_handler = move |event: Event| match event {
2086-
Event::SpendableOutputs { .. } => sender.send(event).unwrap(),
2087-
Event::ChannelReady { .. } => {},
2088-
Event::ChannelClosed { .. } => {},
2089-
_ => panic!("Unexpected event: {:?}", event),
2098+
let event_handler = move |event: Event| {
2099+
match event {
2100+
Event::SpendableOutputs { .. } => sender.send(event).unwrap(),
2101+
Event::ChannelReady { .. } => {},
2102+
Event::ChannelClosed { .. } => {},
2103+
_ => panic!("Unexpected event: {:?}", event),
2104+
}
2105+
Ok(())
20902106
};
20912107
let persister = Arc::new(Persister::new(data_dir));
20922108
let bg_processor = BackgroundProcessor::start(
@@ -2215,12 +2231,60 @@ mod tests {
22152231
}
22162232
}
22172233

2234+
#[test]
2235+
fn test_event_handling_failures_are_replayed() {
2236+
let (_, nodes) = create_nodes(2, "test_event_handling_failures_are_replayed");
2237+
let channel_value = 100000;
2238+
let data_dir = nodes[0].kv_store.get_data_dir();
2239+
let persister = Arc::new(Persister::new(data_dir.clone()));
2240+
2241+
let (first_event_send, first_event_recv) = std::sync::mpsc::sync_channel(1);
2242+
let (second_event_send, second_event_recv) = std::sync::mpsc::sync_channel(1);
2243+
let should_fail_event_handling = Arc::new(AtomicBool::new(true));
2244+
let event_handler = move |event: Event| {
2245+
if let Ok(true) = should_fail_event_handling.compare_exchange(
2246+
true,
2247+
false,
2248+
Ordering::Acquire,
2249+
Ordering::Relaxed,
2250+
) {
2251+
first_event_send.send(event).unwrap();
2252+
return Err(ReplayEvent());
2253+
}
2254+
2255+
second_event_send.send(event).unwrap();
2256+
Ok(())
2257+
};
2258+
2259+
let bg_processor = BackgroundProcessor::start(
2260+
persister,
2261+
event_handler,
2262+
nodes[0].chain_monitor.clone(),
2263+
nodes[0].node.clone(),
2264+
Some(nodes[0].messenger.clone()),
2265+
nodes[0].no_gossip_sync(),
2266+
nodes[0].peer_manager.clone(),
2267+
nodes[0].logger.clone(),
2268+
Some(nodes[0].scorer.clone()),
2269+
);
2270+
2271+
begin_open_channel!(nodes[0], nodes[1], channel_value);
2272+
assert_eq!(
2273+
first_event_recv.recv_timeout(Duration::from_secs(EVENT_DEADLINE)),
2274+
second_event_recv.recv_timeout(Duration::from_secs(EVENT_DEADLINE))
2275+
);
2276+
2277+
if !std::thread::panicking() {
2278+
bg_processor.stop().unwrap();
2279+
}
2280+
}
2281+
22182282
#[test]
22192283
fn test_scorer_persistence() {
22202284
let (_, nodes) = create_nodes(2, "test_scorer_persistence");
22212285
let data_dir = nodes[0].kv_store.get_data_dir();
22222286
let persister = Arc::new(Persister::new(data_dir));
2223-
let event_handler = |_: _| {};
2287+
let event_handler = |_: _| Ok(());
22242288
let bg_processor = BackgroundProcessor::start(
22252289
persister,
22262290
event_handler,
@@ -2315,7 +2379,7 @@ mod tests {
23152379
let data_dir = nodes[0].kv_store.get_data_dir();
23162380
let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
23172381

2318-
let event_handler = |_: _| {};
2382+
let event_handler = |_: _| Ok(());
23192383
let background_processor = BackgroundProcessor::start(
23202384
persister,
23212385
event_handler,
@@ -2350,7 +2414,7 @@ mod tests {
23502414
let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
23512415
let bp_future = super::process_events_async(
23522416
persister,
2353-
|_: _| async {},
2417+
|_: _| async { Ok(()) },
23542418
nodes[0].chain_monitor.clone(),
23552419
nodes[0].node.clone(),
23562420
Some(nodes[0].messenger.clone()),
@@ -2492,12 +2556,15 @@ mod tests {
24922556
#[test]
24932557
fn test_payment_path_scoring() {
24942558
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
2495-
let event_handler = move |event: Event| match event {
2496-
Event::PaymentPathFailed { .. } => sender.send(event).unwrap(),
2497-
Event::PaymentPathSuccessful { .. } => sender.send(event).unwrap(),
2498-
Event::ProbeSuccessful { .. } => sender.send(event).unwrap(),
2499-
Event::ProbeFailed { .. } => sender.send(event).unwrap(),
2500-
_ => panic!("Unexpected event: {:?}", event),
2559+
let event_handler = move |event: Event| {
2560+
match event {
2561+
Event::PaymentPathFailed { .. } => sender.send(event).unwrap(),
2562+
Event::PaymentPathSuccessful { .. } => sender.send(event).unwrap(),
2563+
Event::ProbeSuccessful { .. } => sender.send(event).unwrap(),
2564+
Event::ProbeFailed { .. } => sender.send(event).unwrap(),
2565+
_ => panic!("Unexpected event: {:?}", event),
2566+
}
2567+
Ok(())
25012568
};
25022569

25032570
let (_, nodes) = create_nodes(1, "test_payment_path_scoring");
@@ -2543,6 +2610,7 @@ mod tests {
25432610
Event::ProbeFailed { .. } => sender_ref.send(event).await.unwrap(),
25442611
_ => panic!("Unexpected event: {:?}", event),
25452612
}
2613+
Ok(())
25462614
}
25472615
};
25482616

lightning-invoice/src/utils.rs

+1
Original file line numberDiff line numberDiff line change
@@ -1391,6 +1391,7 @@ mod test {
13911391
} else {
13921392
other_events.borrow_mut().push(event);
13931393
}
1394+
Ok(())
13941395
};
13951396
nodes[fwd_idx].node.process_pending_events(&forward_event_handler);
13961397
nodes[fwd_idx].node.process_pending_events(&forward_event_handler);

lightning/src/chain/chainmonitor.rs

+16-7
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,7 @@ use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance
3333
use crate::chain::transaction::{OutPoint, TransactionData};
3434
use crate::ln::types::ChannelId;
3535
use crate::sign::ecdsa::EcdsaChannelSigner;
36-
use crate::events;
37-
use crate::events::{Event, EventHandler};
36+
use crate::events::{self, Event, EventHandler, ReplayEvent};
3837
use crate::util::logger::{Logger, WithContext};
3938
use crate::util::errors::APIError;
4039
use crate::util::wakers::{Future, Notifier};
@@ -533,7 +532,7 @@ where C::Target: chain::Filter,
533532
pub fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
534533
use crate::events::EventsProvider;
535534
let events = core::cell::RefCell::new(Vec::new());
536-
let event_handler = |event: events::Event| events.borrow_mut().push(event);
535+
let event_handler = |event: events::Event| Ok(events.borrow_mut().push(event));
537536
self.process_pending_events(&event_handler);
538537
events.into_inner()
539538
}
@@ -544,16 +543,21 @@ where C::Target: chain::Filter,
544543
/// See the trait-level documentation of [`EventsProvider`] for requirements.
545544
///
546545
/// [`EventsProvider`]: crate::events::EventsProvider
547-
pub async fn process_pending_events_async<Future: core::future::Future, H: Fn(Event) -> Future>(
546+
pub async fn process_pending_events_async<Future: core::future::Future<Output = Result<(), ReplayEvent>>, H: Fn(Event) -> Future>(
548547
&self, handler: H
549548
) {
550549
// Sadly we can't hold the monitors read lock through an async call. Thus we have to do a
551550
// crazy dance to process a monitor's events then only remove them once we've done so.
552551
let mons_to_process = self.monitors.read().unwrap().keys().cloned().collect::<Vec<_>>();
553552
for funding_txo in mons_to_process {
554553
let mut ev;
555-
super::channelmonitor::process_events_body!(
556-
self.monitors.read().unwrap().get(&funding_txo).map(|m| &m.monitor), ev, handler(ev).await);
554+
match super::channelmonitor::process_events_body!(
555+
self.monitors.read().unwrap().get(&funding_txo).map(|m| &m.monitor), ev, handler(ev).await) {
556+
Ok(()) => {},
557+
Err(ReplayEvent ()) => {
558+
self.event_notifier.notify();
559+
}
560+
}
557561
}
558562
}
559563

@@ -880,7 +884,12 @@ impl<ChannelSigner: EcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref,
880884
/// [`BumpTransaction`]: events::Event::BumpTransaction
881885
fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler {
882886
for monitor_state in self.monitors.read().unwrap().values() {
883-
monitor_state.monitor.process_pending_events(&handler);
887+
match monitor_state.monitor.process_pending_events(&handler) {
888+
Ok(()) => {},
889+
Err(ReplayEvent ()) => {
890+
self.event_notifier.notify();
891+
}
892+
}
884893
}
885894
}
886895
}

0 commit comments

Comments
 (0)