Skip to content

Commit 6ffa153

Browse files
Support persistent monitor events
Currently, the resolution of HTLCs (and decisions on when HTLCs can be forwarded) is the responsibility of Channel objects (a part of ChannelManager) until the channel is closed, and then the ChannelMonitor thereafter. This leads to some complexity around race conditions for HTLCs right around channel closure. Additionally, there is lots of complexity reconstructing the state of all HTLCs in the ChannelManager deserialization/loading logic. Instead, we want to do all resolution in ChannelMonitors (in response to ChannelMonitorUpdates) and pass them back to ChannelManager in the form of MonitorEvents (similar to how HTLCs are resolved after channels are closed). In order to have reliable resolution, we'll need to keep MonitorEvents around in the ChannelMonitor until the ChannelManager has finished processing them. This will simplify things - on restart instead of examining the set of HTLCs in monitors we can simply replay all the pending MonitorEvents. Here we complete work that was built on recent prior commits and actually start re-providing monitor events on startup if they went un-acked during runtime. This isn't actually supported in prod yet, so this new code will run randomly in tests, to ensure we still support the old paths.
1 parent 27fa5a1 commit 6ffa153

File tree

4 files changed

+84
-13
lines changed

4 files changed

+84
-13
lines changed

lightning/src/chain/channelmonitor.rs

Lines changed: 49 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1288,6 +1288,11 @@ pub(crate) struct ChannelMonitorImpl<Signer: EcdsaChannelSigner> {
12881288
// block/transaction-connected events and *not* during block/transaction-disconnected events,
12891289
// we further MUST NOT generate events during block/transaction-disconnection.
12901290
pending_monitor_events: Vec<(u64, MonitorEvent)>,
1291+
// `MonitorEvent`s that have been provided to the `ChannelManager` via
1292+
// [`ChannelMonitor::get_and_clear_pending_monitor_events`] and are awaiting
1293+
// [`ChannelMonitor::ack_monitor_event`] for removal. If an event in this queue is not ack'd, it
1294+
// will be re-provided to the `ChannelManager` on startup.
1295+
provided_monitor_events: Vec<(u64, MonitorEvent)>,
12911296
/// When set, monitor events are retained until explicitly acked rather than cleared on read.
12921297
///
12931298
/// Allows the ChannelManager to reconstruct pending HTLC state by replaying monitor events on
@@ -1764,7 +1769,12 @@ pub(crate) fn write_chanmon_internal<Signer: EcdsaChannelSigner, W: Writer>(
17641769
// Only write `persistent_events_enabled` if it's set to true, as it's an even TLV.
17651770
let persistent_events_enabled = channel_monitor.persistent_events_enabled.then_some(());
17661771
let pending_mon_evs_with_ids = if persistent_events_enabled.is_some() {
1767-
Some(Iterable(channel_monitor.pending_monitor_events.iter()))
1772+
Some(Iterable(
1773+
channel_monitor
1774+
.provided_monitor_events
1775+
.iter()
1776+
.chain(channel_monitor.pending_monitor_events.iter()),
1777+
))
17681778
} else {
17691779
None
17701780
};
@@ -1978,6 +1988,7 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitor<Signer> {
19781988

19791989
payment_preimages: new_hash_map(),
19801990
pending_monitor_events: Vec::new(),
1991+
provided_monitor_events: Vec::new(),
19811992
persistent_events_enabled: false,
19821993
next_monitor_event_id: 0,
19831994
pending_events: Vec::new(),
@@ -2203,20 +2214,32 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitor<Signer> {
22032214

22042215
/// Removes a [`MonitorEvent`] by its event ID, acknowledging that it has been processed.
22052216
/// Generally called by [`chain::Watch::ack_monitor_event`].
2206-
pub fn ack_monitor_event(&self, _event_id: u64) {
2207-
// TODO: once events have ids, remove the corresponding event here
2217+
pub fn ack_monitor_event(&self, event_id: u64) {
2218+
let inner = &mut *self.inner.lock().unwrap();
2219+
inner.ack_monitor_event(event_id);
2220+
}
2221+
2222+
/// Enables persistent monitor events mode. When enabled, monitor events are retained until
2223+
/// explicitly acked rather than cleared on read.
2224+
pub(crate) fn set_persistent_events_enabled(&self, enabled: bool) {
2225+
self.inner.lock().unwrap().persistent_events_enabled = enabled;
22082226
}
22092227

22102228
/// Copies [`MonitorEvent`] state from `other` into `self`.
22112229
/// Used in tests to align transient runtime state before equality comparison after a
22122230
/// serialization round-trip.
22132231
#[cfg(any(test, feature = "_test_utils"))]
22142232
pub fn copy_monitor_event_state(&self, other: &ChannelMonitor<Signer>) {
2215-
let (pending, next_id) = {
2233+
let (provided, pending, next_id) = {
22162234
let other_inner = other.inner.lock().unwrap();
2217-
(other_inner.pending_monitor_events.clone(), other_inner.next_monitor_event_id)
2235+
(
2236+
other_inner.provided_monitor_events.clone(),
2237+
other_inner.pending_monitor_events.clone(),
2238+
other_inner.next_monitor_event_id,
2239+
)
22182240
};
22192241
let mut self_inner = self.inner.lock().unwrap();
2242+
self_inner.provided_monitor_events = provided;
22202243
self_inner.pending_monitor_events = pending;
22212244
self_inner.next_monitor_event_id = next_id;
22222245
}
@@ -4619,10 +4642,23 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitorImpl<Signer> {
46194642
);
46204643
}
46214644

4645+
fn ack_monitor_event(&mut self, event_id: u64) {
4646+
self.provided_monitor_events.retain(|(id, _)| *id != event_id);
4647+
// If this event was generated prior to a restart, it may be in this queue instead
4648+
self.pending_monitor_events.retain(|(id, _)| *id != event_id);
4649+
}
4650+
46224651
fn get_and_clear_pending_monitor_events(&mut self) -> Vec<(u64, MonitorEvent)> {
4623-
let mut ret = Vec::new();
4624-
mem::swap(&mut ret, &mut self.pending_monitor_events);
4625-
ret
4652+
if self.persistent_events_enabled {
4653+
let mut ret = Vec::new();
4654+
mem::swap(&mut ret, &mut self.pending_monitor_events);
4655+
self.provided_monitor_events.extend(ret.iter().cloned());
4656+
ret
4657+
} else {
4658+
let mut ret = Vec::new();
4659+
mem::swap(&mut ret, &mut self.pending_monitor_events);
4660+
ret
4661+
}
46264662
}
46274663

46284664
/// Gets the set of events that are repeated regularly (e.g. those which RBF bump
@@ -5946,8 +5982,8 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitorImpl<Signer> {
59465982
if inbound_htlc_expiry > max_expiry_height {
59475983
continue;
59485984
}
5949-
let duplicate_event = self.pending_monitor_events.iter().any(
5950-
|(_, update)| if let &MonitorEvent::HTLCEvent(ref upd) = update {
5985+
let duplicate_event = self.pending_monitor_events.iter().chain(self.provided_monitor_events.iter())
5986+
.any(|(_, update)| if let &MonitorEvent::HTLCEvent(ref upd) = update {
59515987
upd.source == *source
59525988
} else { false });
59535989
if duplicate_event {
@@ -6363,7 +6399,7 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitorImpl<Signer> {
63636399
// HTLC resolution backwards to and figure out whether we learned a preimage from it.
63646400
if let Some((source, payment_hash, amount_msat)) = payment_data {
63656401
if accepted_preimage_claim {
6366-
if !self.pending_monitor_events.iter().any(
6402+
if !self.pending_monitor_events.iter().chain(self.provided_monitor_events.iter()).any(
63676403
|(_, update)| if let &MonitorEvent::HTLCEvent(ref upd) = update { upd.source == source } else { false }) {
63686404
self.onchain_events_awaiting_threshold_conf.push(OnchainEventEntry {
63696405
txid: tx.compute_txid(),
@@ -6385,7 +6421,7 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitorImpl<Signer> {
63856421
}), &mut self.next_monitor_event_id);
63866422
}
63876423
} else if offered_preimage_claim {
6388-
if !self.pending_monitor_events.iter().any(
6424+
if !self.pending_monitor_events.iter().chain(self.provided_monitor_events.iter()).any(
63896425
|(_, update)| if let &MonitorEvent::HTLCEvent(ref upd) = update {
63906426
upd.source == source
63916427
} else { false }) {
@@ -6974,6 +7010,7 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP
69747010

69757011
payment_preimages,
69767012
pending_monitor_events,
7013+
provided_monitor_events: Vec::new(),
69777014
persistent_events_enabled: persistent_events_enabled.is_some(),
69787015
next_monitor_event_id,
69797016
pending_events,

lightning/src/ln/channelmanager.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3681,6 +3681,8 @@ impl<
36813681
our_network_pubkey, current_timestamp, expanded_inbound_key,
36823682
node_signer.get_receive_auth_key(), secp_ctx.clone(), message_router, logger.clone(),
36833683
);
3684+
#[cfg(test)]
3685+
let override_persistent_monitor_events = config.override_persistent_monitor_events;
36843686

36853687
ChannelManager {
36863688
config: RwLock::new(config),
@@ -3737,7 +3739,27 @@ impl<
37373739

37383740
logger,
37393741

3740-
persistent_monitor_events: false,
3742+
persistent_monitor_events: {
3743+
#[cfg(not(test))]
3744+
{ false }
3745+
#[cfg(test)]
3746+
{
3747+
override_persistent_monitor_events.unwrap_or_else(|| {
3748+
use core::hash::{BuildHasher, Hasher};
3749+
match std::env::var("LDK_TEST_PERSISTENT_MON_EVENTS") {
3750+
Ok(val) => match val.as_str() {
3751+
"1" => true,
3752+
"0" => false,
3753+
_ => panic!("LDK_TEST_PERSISTENT_MON_EVENTS must be 0 or 1, got: {}", val),
3754+
},
3755+
Err(_) => {
3756+
let rand_val = std::collections::hash_map::RandomState::new().build_hasher().finish();
3757+
rand_val % 2 == 0
3758+
},
3759+
}
3760+
})
3761+
}
3762+
},
37413763

37423764
#[cfg(feature = "_test_utils")]
37433765
testing_dnssec_proof_offer_resolution_override: Mutex::new(new_hash_map()),
@@ -11752,6 +11774,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/
1175211774
fail_chan!("Already had channel with the new channel_id");
1175311775
},
1175411776
hash_map::Entry::Vacant(e) => {
11777+
monitor.set_persistent_events_enabled(self.persistent_monitor_events);
1175511778
let monitor_res = self.chain_monitor.watch_channel(monitor.channel_id(), monitor);
1175611779
if let Ok(persist_state) = monitor_res {
1175711780
// There's no problem signing a counterparty's funding transaction if our monitor
@@ -11922,6 +11945,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/
1192211945
match chan
1192311946
.funding_signed(&msg, best_block, &self.signer_provider, &self.logger)
1192411947
.and_then(|(funded_chan, monitor)| {
11948+
monitor.set_persistent_events_enabled(self.persistent_monitor_events);
1192511949
self.chain_monitor
1192611950
.watch_channel(funded_chan.context.channel_id(), monitor)
1192711951
.map_err(|()| {
@@ -12837,6 +12861,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/
1283712861

1283812862
if let Some(chan) = chan.as_funded_mut() {
1283912863
if let Some(monitor) = monitor_opt {
12864+
monitor.set_persistent_events_enabled(self.persistent_monitor_events);
1284012865
let monitor_res =
1284112866
self.chain_monitor.watch_channel(monitor.channel_id(), monitor);
1284212867
if let Ok(persist_state) = monitor_res {

lightning/src/ln/monitor_tests.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3592,6 +3592,9 @@ fn do_test_lost_timeout_monitor_events(confirm_tx: CommitmentType, dust_htlcs: b
35923592
let mut cfg = test_default_channel_config();
35933593
cfg.channel_handshake_config.negotiate_anchors_zero_fee_htlc_tx = true;
35943594
cfg.channel_handshake_config.negotiate_anchor_zero_fee_commitments = p2a_anchor;
3595+
// This test specifically tests lost monitor events, which requires the legacy
3596+
// (non-persistent) monitor event behavior.
3597+
cfg.override_persistent_monitor_events = Some(false);
35953598
let cfgs = [Some(cfg.clone()), Some(cfg.clone()), Some(cfg.clone())];
35963599

35973600
let chanmon_cfgs = create_chanmon_cfgs(3);

lightning/src/util/config.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1103,6 +1103,10 @@ pub struct UserConfig {
11031103
///
11041104
/// [`ChannelManager::splice_channel`]: crate::ln::channelmanager::ChannelManager::splice_channel
11051105
pub reject_inbound_splices: bool,
1106+
/// If set to `Some`, overrides the random selection of whether to use persistent monitor
1107+
/// events. Only available in tests.
1108+
#[cfg(test)]
1109+
pub override_persistent_monitor_events: Option<bool>,
11061110
}
11071111

11081112
impl Default for UserConfig {
@@ -1119,6 +1123,8 @@ impl Default for UserConfig {
11191123
enable_htlc_hold: false,
11201124
hold_outbound_htlcs_at_next_hop: false,
11211125
reject_inbound_splices: true,
1126+
#[cfg(test)]
1127+
override_persistent_monitor_events: None,
11221128
}
11231129
}
11241130
}

0 commit comments

Comments
 (0)