Skip to content

Commit 5f7b5bf

Browse files
Support persistent monitor events
Currently, the resolution of HTLCs (and decisions on when HTLCs can be forwarded) is the responsibility of Channel objects (a part of ChannelManager) until the channel is closed, and then the ChannelMonitor thereafter. This leads to some complexity around race conditions for HTLCs right around channel closure. Additionally, there is lots of complexity reconstructing the state of all HTLCs in the ChannelManager deserialization/loading logic. Instead, we want to do all resolution in ChannelMonitors (in response to ChannelMonitorUpdates) and pass them back to ChannelManager in the form of MonitorEvents (similar to how HTLCs are resolved after channels are closed). In order to have reliable resolution, we'll need to keep MonitorEvents around in the ChannelMonitor until the ChannelManager has finished processing them. This will simplify things - on restart instead of examining the set of HTLCs in monitors we can simply replay all the pending MonitorEvents. Here we complete work that was built on recent prior commits and actually start re-providing monitor events on startup if they went un-acked during runtime. This isn't actually supported in prod yet, so this new code will run randomly in tests, to ensure we still support the old paths.
1 parent 491f582 commit 5f7b5bf

File tree

4 files changed

+85
-13
lines changed

4 files changed

+85
-13
lines changed

lightning/src/chain/channelmonitor.rs

Lines changed: 49 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1288,6 +1288,11 @@ pub(crate) struct ChannelMonitorImpl<Signer: EcdsaChannelSigner> {
12881288
// block/transaction-connected events and *not* during block/transaction-disconnected events,
12891289
// we further MUST NOT generate events during block/transaction-disconnection.
12901290
pending_monitor_events: Vec<(u64, MonitorEvent)>,
1291+
// `MonitorEvent`s that have been provided to the `ChannelManager` via
1292+
// [`ChannelMonitor::get_and_clear_pending_monitor_events`] and are awaiting
1293+
// [`ChannelMonitor::ack_monitor_event`] for removal. If an event in this queue is not ack'd, it
1294+
// will be re-provided to the `ChannelManager` on startup.
1295+
provided_monitor_events: Vec<(u64, MonitorEvent)>,
12911296
/// When set, monitor events are retained until explicitly acked rather than cleared on read.
12921297
///
12931298
/// Allows the ChannelManager to reconstruct pending HTLC state by replaying monitor events on
@@ -1764,7 +1769,12 @@ pub(crate) fn write_chanmon_internal<Signer: EcdsaChannelSigner, W: Writer>(
17641769
// Only write `persistent_events_enabled` if it's set to true, as it's an even TLV.
17651770
let persistent_events_enabled = channel_monitor.persistent_events_enabled.then_some(());
17661771
let pending_mon_evs_with_ids = if persistent_events_enabled.is_some() {
1767-
Some(Iterable(channel_monitor.pending_monitor_events.iter()))
1772+
Some(Iterable(
1773+
channel_monitor
1774+
.provided_monitor_events
1775+
.iter()
1776+
.chain(channel_monitor.pending_monitor_events.iter()),
1777+
))
17681778
} else {
17691779
None
17701780
};
@@ -1978,6 +1988,7 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitor<Signer> {
19781988

19791989
payment_preimages: new_hash_map(),
19801990
pending_monitor_events: Vec::new(),
1991+
provided_monitor_events: Vec::new(),
19811992
persistent_events_enabled: false,
19821993
next_monitor_event_id: 0,
19831994
pending_events: Vec::new(),
@@ -2203,20 +2214,32 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitor<Signer> {
22032214

22042215
/// Removes a [`MonitorEvent`] by its event ID, acknowledging that it has been processed.
22052216
/// Generally called by [`chain::Watch::ack_monitor_event`].
2206-
pub fn ack_monitor_event(&self, _event_id: u64) {
2207-
// TODO: once events have ids, remove the corresponding event here
2217+
pub fn ack_monitor_event(&self, event_id: u64) {
2218+
let inner = &mut *self.inner.lock().unwrap();
2219+
inner.ack_monitor_event(event_id);
2220+
}
2221+
2222+
/// Enables persistent monitor events mode. When enabled, monitor events are retained until
2223+
/// explicitly acked rather than cleared on read.
2224+
pub(crate) fn set_persistent_events_enabled(&self, enabled: bool) {
2225+
self.inner.lock().unwrap().persistent_events_enabled = enabled;
22082226
}
22092227

22102228
/// Copies [`MonitorEvent`] state from `other` into `self`.
22112229
/// Used in tests to align transient runtime state before equality comparison after a
22122230
/// serialization round-trip.
22132231
#[cfg(any(test, feature = "_test_utils"))]
22142232
pub fn copy_monitor_event_state(&self, other: &ChannelMonitor<Signer>) {
2215-
let (pending, next_id) = {
2233+
let (provided, pending, next_id) = {
22162234
let other_inner = other.inner.lock().unwrap();
2217-
(other_inner.pending_monitor_events.clone(), other_inner.next_monitor_event_id)
2235+
(
2236+
other_inner.provided_monitor_events.clone(),
2237+
other_inner.pending_monitor_events.clone(),
2238+
other_inner.next_monitor_event_id,
2239+
)
22182240
};
22192241
let mut self_inner = self.inner.lock().unwrap();
2242+
self_inner.provided_monitor_events = provided;
22202243
self_inner.pending_monitor_events = pending;
22212244
self_inner.next_monitor_event_id = next_id;
22222245
}
@@ -4619,10 +4642,23 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitorImpl<Signer> {
46194642
);
46204643
}
46214644

4645+
fn ack_monitor_event(&mut self, event_id: u64) {
4646+
self.provided_monitor_events.retain(|(id, _)| *id != event_id);
4647+
// If this event was generated prior to a restart, it may be in this queue instead
4648+
self.pending_monitor_events.retain(|(id, _)| *id != event_id);
4649+
}
4650+
46224651
fn get_and_clear_pending_monitor_events(&mut self) -> Vec<(u64, MonitorEvent)> {
4623-
let mut ret = Vec::new();
4624-
mem::swap(&mut ret, &mut self.pending_monitor_events);
4625-
ret
4652+
if self.persistent_events_enabled {
4653+
let mut ret = Vec::new();
4654+
mem::swap(&mut ret, &mut self.pending_monitor_events);
4655+
self.provided_monitor_events.extend(ret.iter().cloned());
4656+
ret
4657+
} else {
4658+
let mut ret = Vec::new();
4659+
mem::swap(&mut ret, &mut self.pending_monitor_events);
4660+
ret
4661+
}
46264662
}
46274663

46284664
/// Gets the set of events that are repeated regularly (e.g. those which RBF bump
@@ -5949,8 +5985,8 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitorImpl<Signer> {
59495985
if inbound_htlc_expiry > max_expiry_height {
59505986
continue;
59515987
}
5952-
let duplicate_event = self.pending_monitor_events.iter().any(
5953-
|(_, update)| if let &MonitorEvent::HTLCEvent(ref upd) = update {
5988+
let duplicate_event = self.pending_monitor_events.iter().chain(self.provided_monitor_events.iter())
5989+
.any(|(_, update)| if let &MonitorEvent::HTLCEvent(ref upd) = update {
59545990
upd.source == *source
59555991
} else { false });
59565992
if duplicate_event {
@@ -6366,7 +6402,7 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitorImpl<Signer> {
63666402
// HTLC resolution backwards to and figure out whether we learned a preimage from it.
63676403
if let Some((source, payment_hash, amount_msat)) = payment_data {
63686404
if accepted_preimage_claim {
6369-
if !self.pending_monitor_events.iter().any(
6405+
if !self.pending_monitor_events.iter().chain(self.provided_monitor_events.iter()).any(
63706406
|(_, update)| if let &MonitorEvent::HTLCEvent(ref upd) = update { upd.source == source } else { false }) {
63716407
self.onchain_events_awaiting_threshold_conf.push(OnchainEventEntry {
63726408
txid: tx.compute_txid(),
@@ -6388,7 +6424,7 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitorImpl<Signer> {
63886424
}), &mut self.next_monitor_event_id);
63896425
}
63906426
} else if offered_preimage_claim {
6391-
if !self.pending_monitor_events.iter().any(
6427+
if !self.pending_monitor_events.iter().chain(self.provided_monitor_events.iter()).any(
63926428
|(_, update)| if let &MonitorEvent::HTLCEvent(ref upd) = update {
63936429
upd.source == source
63946430
} else { false }) {
@@ -6977,6 +7013,7 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP
69777013

69787014
payment_preimages,
69797015
pending_monitor_events,
7016+
provided_monitor_events: Vec::new(),
69807017
persistent_events_enabled: persistent_events_enabled.is_some(),
69817018
next_monitor_event_id,
69827019
pending_events,

lightning/src/ln/channelmanager.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3613,6 +3613,8 @@ impl<
36133613
our_network_pubkey, current_timestamp, expanded_inbound_key,
36143614
node_signer.get_receive_auth_key(), secp_ctx.clone(), message_router, logger.clone(),
36153615
);
3616+
#[cfg(any(test, feature = "_test_utils"))]
3617+
let override_persistent_monitor_events = config.override_persistent_monitor_events;
36163618

36173619
ChannelManager {
36183620
config: RwLock::new(config),
@@ -3669,7 +3671,27 @@ impl<
36693671

36703672
logger,
36713673

3672-
persistent_monitor_events: false,
3674+
persistent_monitor_events: {
3675+
#[cfg(not(any(test, feature = "_test_utils")))]
3676+
{ false }
3677+
#[cfg(any(test, feature = "_test_utils"))]
3678+
{
3679+
override_persistent_monitor_events.unwrap_or_else(|| {
3680+
use core::hash::{BuildHasher, Hasher};
3681+
match std::env::var("LDK_TEST_PERSISTENT_MON_EVENTS") {
3682+
Ok(val) => match val.as_str() {
3683+
"1" => true,
3684+
"0" => false,
3685+
_ => panic!("LDK_TEST_PERSISTENT_MON_EVENTS must be 0 or 1, got: {}", val),
3686+
},
3687+
Err(_) => {
3688+
let rand_val = std::collections::hash_map::RandomState::new().build_hasher().finish();
3689+
rand_val % 2 == 0
3690+
},
3691+
}
3692+
})
3693+
}
3694+
},
36733695
}
36743696
}
36753697

@@ -11614,6 +11636,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/
1161411636
fail_chan!("Already had channel with the new channel_id");
1161511637
},
1161611638
hash_map::Entry::Vacant(e) => {
11639+
monitor.set_persistent_events_enabled(self.persistent_monitor_events);
1161711640
let monitor_res = self.chain_monitor.watch_channel(monitor.channel_id(), monitor);
1161811641
if let Ok(persist_state) = monitor_res {
1161911642
// There's no problem signing a counterparty's funding transaction if our monitor
@@ -11784,6 +11807,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/
1178411807
match chan
1178511808
.funding_signed(&msg, best_block, &self.signer_provider, &self.logger)
1178611809
.and_then(|(funded_chan, monitor)| {
11810+
monitor.set_persistent_events_enabled(self.persistent_monitor_events);
1178711811
self.chain_monitor
1178811812
.watch_channel(funded_chan.context.channel_id(), monitor)
1178911813
.map_err(|()| {
@@ -12698,6 +12722,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/
1269812722

1269912723
if let Some(chan) = chan.as_funded_mut() {
1270012724
if let Some(monitor) = monitor_opt {
12725+
monitor.set_persistent_events_enabled(self.persistent_monitor_events);
1270112726
let monitor_res =
1270212727
self.chain_monitor.watch_channel(monitor.channel_id(), monitor);
1270312728
if let Ok(persist_state) = monitor_res {

lightning/src/ln/monitor_tests.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3594,6 +3594,9 @@ fn do_test_lost_timeout_monitor_events(confirm_tx: CommitmentType, dust_htlcs: b
35943594
let mut cfg = test_default_channel_config();
35953595
cfg.channel_handshake_config.negotiate_anchors_zero_fee_htlc_tx = true;
35963596
cfg.channel_handshake_config.negotiate_anchor_zero_fee_commitments = p2a_anchor;
3597+
// This test specifically tests lost monitor events, which requires the legacy
3598+
// (non-persistent) monitor event behavior.
3599+
cfg.override_persistent_monitor_events = Some(false);
35973600
let cfgs = [Some(cfg.clone()), Some(cfg.clone()), Some(cfg.clone())];
35983601

35993602
let chanmon_cfgs = create_chanmon_cfgs(3);

lightning/src/util/config.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1132,6 +1132,10 @@ pub struct UserConfig {
11321132
///
11331133
/// [`ChannelManager::splice_channel`]: crate::ln::channelmanager::ChannelManager::splice_channel
11341134
pub reject_inbound_splices: bool,
1135+
/// If set to `Some`, overrides the random selection of whether to use persistent monitor
1136+
/// events. Only available in tests.
1137+
#[cfg(any(test, feature = "_test_utils"))]
1138+
pub override_persistent_monitor_events: Option<bool>,
11351139
}
11361140

11371141
impl Default for UserConfig {
@@ -1148,6 +1152,8 @@ impl Default for UserConfig {
11481152
enable_htlc_hold: false,
11491153
hold_outbound_htlcs_at_next_hop: false,
11501154
reject_inbound_splices: true,
1155+
#[cfg(any(test, feature = "_test_utils"))]
1156+
override_persistent_monitor_events: None,
11511157
}
11521158
}
11531159
}
@@ -1170,6 +1176,7 @@ impl Readable for UserConfig {
11701176
hold_outbound_htlcs_at_next_hop: Readable::read(reader)?,
11711177
enable_htlc_hold: Readable::read(reader)?,
11721178
reject_inbound_splices: Readable::read(reader)?,
1179+
override_persistent_monitor_events: Readable::read(reader)?,
11731180
})
11741181
}
11751182
}

0 commit comments

Comments
 (0)