Skip to content

Commit cb4a804

Browse files
Add monitor event ids
Currently, the resolution of HTLCs (and decisions on when HTLCs can be forwarded) is the responsibility of Channel objects (a part of ChannelManager) until the channel is closed, and then the ChannelMonitor thereafter. This leads to some complexity around race conditions for HTLCs right around channel closure. Additionally, there is lots of complexity reconstructing the state of all HTLCs in the ChannelManager deserialization/loading logic. Instead, we want to do all resolution in ChannelMonitors (in response to ChannelMonitorUpdates) and pass them back to ChannelManager in the form of MonitorEvents (similar to how HTLCs are resolved after channels are closed). In order to have reliable resolution, we'll need to keep MonitorEvents around in the ChannelMonitor until the ChannelManager has finished processing them. This simplify things - on restart instead of examining the set of HTLCs in monitors we can simply replay all the pending MonitorEvents. To allow the ChannelManager to ack specific monitor events once they are resolved in upcoming commits, here we give each MonitorEvent a corresponding unique id. It's implemented in such a way that we can delete legacy monitor event code in the future when the new persistent monitor events flag is enabled by default.
1 parent fd90e0c commit cb4a804

File tree

7 files changed

+161
-65
lines changed

7 files changed

+161
-65
lines changed

fuzz/src/chanmon_consistency.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -365,7 +365,7 @@ impl chain::Watch<TestChannelSigner> for TestChainMonitor {
365365

366366
fn release_pending_monitor_events(
367367
&self,
368-
) -> Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, PublicKey)> {
368+
) -> Vec<(OutPoint, ChannelId, Vec<(u64, MonitorEvent)>, PublicKey)> {
369369
return self.chain_monitor.release_pending_monitor_events();
370370
}
371371

lightning/src/chain/chainmonitor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1639,7 +1639,7 @@ where
16391639

16401640
fn release_pending_monitor_events(
16411641
&self,
1642-
) -> Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, PublicKey)> {
1642+
) -> Vec<(OutPoint, ChannelId, Vec<(u64, MonitorEvent)>, PublicKey)> {
16431643
for (channel_id, update_id) in self.persister.get_and_clear_completed_updates() {
16441644
let _ = self.channel_monitor_updated(channel_id, update_id);
16451645
}

lightning/src/chain/channelmonitor.rs

Lines changed: 144 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -183,8 +183,13 @@ impl Readable for ChannelMonitorUpdate {
183183
}
184184
}
185185

186-
fn push_monitor_event(pending_monitor_events: &mut Vec<MonitorEvent>, event: MonitorEvent) {
187-
pending_monitor_events.push(event);
186+
fn push_monitor_event(
187+
pending_monitor_events: &mut Vec<(u64, MonitorEvent)>, event: MonitorEvent,
188+
next_monitor_event_id: &mut u64,
189+
) {
190+
let id = *next_monitor_event_id;
191+
*next_monitor_event_id += 1;
192+
pending_monitor_events.push((id, event));
188193
}
189194

190195
/// An event to be processed by the ChannelManager.
@@ -1282,13 +1287,14 @@ pub(crate) struct ChannelMonitorImpl<Signer: EcdsaChannelSigner> {
12821287
// Note that because the `event_lock` in `ChainMonitor` is only taken in
12831288
// block/transaction-connected events and *not* during block/transaction-disconnected events,
12841289
// we further MUST NOT generate events during block/transaction-disconnection.
1285-
pending_monitor_events: Vec<MonitorEvent>,
1290+
pending_monitor_events: Vec<(u64, MonitorEvent)>,
12861291
/// When set, monitor events are retained until explicitly acked rather than cleared on read.
12871292
///
12881293
/// Allows the ChannelManager to reconstruct pending HTLC state by replaying monitor events on
12891294
/// startup, and make the monitor responsible for both off- and on-chain payment resolution. Will
12901295
/// be always set once support for this feature is complete.
12911296
persistent_events_enabled: bool,
1297+
next_monitor_event_id: u64,
12921298

12931299
pub(super) pending_events: Vec<Event>,
12941300
pub(super) is_processing_pending_events: bool,
@@ -1669,32 +1675,38 @@ pub(crate) fn write_chanmon_internal<Signer: EcdsaChannelSigner, W: Writer>(
16691675
writer.write_all(&payment_preimage.0[..])?;
16701676
}
16711677

1672-
writer.write_all(
1673-
&(channel_monitor
1674-
.pending_monitor_events
1675-
.iter()
1676-
.filter(|ev| match ev {
1677-
MonitorEvent::HTLCEvent(_) => true,
1678-
MonitorEvent::HolderForceClosed(_) => true,
1679-
MonitorEvent::HolderForceClosedWithInfo { .. } => true,
1680-
_ => false,
1681-
})
1682-
.count() as u64)
1683-
.to_be_bytes(),
1684-
)?;
1685-
for event in channel_monitor.pending_monitor_events.iter() {
1686-
match event {
1687-
MonitorEvent::HTLCEvent(upd) => {
1688-
0u8.write(writer)?;
1689-
upd.write(writer)?;
1690-
},
1691-
MonitorEvent::HolderForceClosed(_) => 1u8.write(writer)?,
1692-
// `HolderForceClosedWithInfo` replaced `HolderForceClosed` in v0.0.122. To keep
1693-
// backwards compatibility, we write a `HolderForceClosed` event along with the
1694-
// `HolderForceClosedWithInfo` event. This is deduplicated in the reader.
1695-
MonitorEvent::HolderForceClosedWithInfo { .. } => 1u8.write(writer)?,
1696-
_ => {}, // Covered in the TLV writes below
1678+
if !channel_monitor.persistent_events_enabled {
1679+
writer.write_all(
1680+
&(channel_monitor
1681+
.pending_monitor_events
1682+
.iter()
1683+
.filter(|(_, ev)| match ev {
1684+
MonitorEvent::HTLCEvent(_) => true,
1685+
MonitorEvent::HolderForceClosed(_) => true,
1686+
MonitorEvent::HolderForceClosedWithInfo { .. } => true,
1687+
_ => false,
1688+
})
1689+
.count() as u64)
1690+
.to_be_bytes(),
1691+
)?;
1692+
for (_, event) in channel_monitor.pending_monitor_events.iter() {
1693+
match event {
1694+
MonitorEvent::HTLCEvent(upd) => {
1695+
0u8.write(writer)?;
1696+
upd.write(writer)?;
1697+
},
1698+
MonitorEvent::HolderForceClosed(_) => 1u8.write(writer)?,
1699+
// `HolderForceClosedWithInfo` replaced `HolderForceClosed` in v0.0.122. To keep
1700+
// backwards compatibility, we write a `HolderForceClosed` event along with the
1701+
// `HolderForceClosedWithInfo` event. This is deduplicated in the reader.
1702+
MonitorEvent::HolderForceClosedWithInfo { .. } => 1u8.write(writer)?,
1703+
_ => {}, // Covered in the TLV writes below
1704+
}
16971705
}
1706+
} else {
1707+
// If `persistent_events_enabled` is set, we'll write the events with their event ids in the
1708+
// TLV section below.
1709+
writer.write_all(&(0u64).to_be_bytes())?;
16981710
}
16991711

17001712
writer.write_all(&(channel_monitor.pending_events.len() as u64).to_be_bytes())?;
@@ -1729,25 +1741,40 @@ pub(crate) fn write_chanmon_internal<Signer: EcdsaChannelSigner, W: Writer>(
17291741

17301742
// If we have a `HolderForceClosedWithInfo` event, we need to write the `HolderForceClosed`
17311743
// for backwards compatibility.
1732-
let holder_force_closed_compat = channel_monitor.pending_monitor_events.iter().find_map(|ev| {
1733-
if let MonitorEvent::HolderForceClosedWithInfo { outpoint, .. } = ev {
1734-
Some(MonitorEvent::HolderForceClosed(*outpoint))
1735-
} else {
1736-
None
1737-
}
1738-
});
1739-
let pending_monitor_events_legacy = Iterable(
1740-
channel_monitor.pending_monitor_events.iter().chain(holder_force_closed_compat.as_ref()),
1741-
);
1744+
let holder_force_closed_compat =
1745+
channel_monitor.pending_monitor_events.iter().find_map(|(_, ev)| {
1746+
if let MonitorEvent::HolderForceClosedWithInfo { outpoint, .. } = ev {
1747+
Some(MonitorEvent::HolderForceClosed(*outpoint))
1748+
} else {
1749+
None
1750+
}
1751+
});
1752+
let pending_monitor_events_legacy = if !channel_monitor.persistent_events_enabled {
1753+
Some(Iterable(
1754+
channel_monitor
1755+
.pending_monitor_events
1756+
.iter()
1757+
.map(|(_, ev)| ev)
1758+
.chain(holder_force_closed_compat.as_ref()),
1759+
))
1760+
} else {
1761+
None
1762+
};
17421763

17431764
// Only write `persistent_events_enabled` if it's set to true, as it's an even TLV.
17441765
let persistent_events_enabled = channel_monitor.persistent_events_enabled.then_some(());
1766+
let pending_mon_evs_with_ids = if persistent_events_enabled.is_some() {
1767+
Some(Iterable(channel_monitor.pending_monitor_events.iter()))
1768+
} else {
1769+
None
1770+
};
17451771

17461772
write_tlv_fields!(writer, {
17471773
(1, channel_monitor.funding_spend_confirmed, option),
17481774
(2, persistent_events_enabled, option),
17491775
(3, channel_monitor.htlcs_resolved_on_chain, required_vec),
1750-
(5, pending_monitor_events_legacy, required), // Equivalent to required_vec because Iterable also writes as WithoutLength
1776+
(4, pending_mon_evs_with_ids, option),
1777+
(5, pending_monitor_events_legacy, option), // Equivalent to optional_vec because Iterable also writes as WithoutLength
17511778
(7, channel_monitor.funding_spend_seen, required),
17521779
(9, channel_monitor.counterparty_node_id, required),
17531780
(11, channel_monitor.confirmed_commitment_tx_counterparty_output, option),
@@ -1767,6 +1794,7 @@ pub(crate) fn write_chanmon_internal<Signer: EcdsaChannelSigner, W: Writer>(
17671794
(35, channel_monitor.is_manual_broadcast, required),
17681795
(37, channel_monitor.funding_seen_onchain, required),
17691796
(39, channel_monitor.best_block.previous_blocks, required),
1797+
(41, channel_monitor.next_monitor_event_id, required),
17701798
});
17711799

17721800
Ok(())
@@ -1951,6 +1979,7 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitor<Signer> {
19511979
payment_preimages: new_hash_map(),
19521980
pending_monitor_events: Vec::new(),
19531981
persistent_events_enabled: false,
1982+
next_monitor_event_id: 0,
19541983
pending_events: Vec::new(),
19551984
is_processing_pending_events: false,
19561985

@@ -2164,7 +2193,7 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitor<Signer> {
21642193

21652194
/// Get the list of HTLCs who's status has been updated on chain. This should be called by
21662195
/// ChannelManager via [`chain::Watch::release_pending_monitor_events`].
2167-
pub fn get_and_clear_pending_monitor_events(&self) -> Vec<MonitorEvent> {
2196+
pub fn get_and_clear_pending_monitor_events(&self) -> Vec<(u64, MonitorEvent)> {
21682197
self.inner.lock().unwrap().get_and_clear_pending_monitor_events()
21692198
}
21702199

@@ -2178,6 +2207,20 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitor<Signer> {
21782207
// TODO: once events have ids, remove the corresponding event here
21792208
}
21802209

2210+
/// Copies [`MonitorEvent`] state from `other` into `self`.
2211+
/// Used in tests to align transient runtime state before equality comparison after a
2212+
/// serialization round-trip.
2213+
#[cfg(any(test, feature = "_test_utils"))]
2214+
pub fn copy_monitor_event_state(&self, other: &ChannelMonitor<Signer>) {
2215+
let (pending, next_id) = {
2216+
let other_inner = other.inner.lock().unwrap();
2217+
(other_inner.pending_monitor_events.clone(), other_inner.next_monitor_event_id)
2218+
};
2219+
let mut self_inner = self.inner.lock().unwrap();
2220+
self_inner.pending_monitor_events = pending;
2221+
self_inner.next_monitor_event_id = next_id;
2222+
}
2223+
21812224
/// Processes [`SpendableOutputs`] events produced from each [`ChannelMonitor`] upon maturity.
21822225
///
21832226
/// For channels featuring anchor outputs, this method will also process [`BumpTransaction`]
@@ -3903,7 +3946,7 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitorImpl<Signer> {
39033946
outpoint: funding_outpoint,
39043947
channel_id: self.channel_id,
39053948
};
3906-
push_monitor_event(&mut self.pending_monitor_events, event);
3949+
push_monitor_event(&mut self.pending_monitor_events, event, &mut self.next_monitor_event_id);
39073950
}
39083951

39093952
// Although we aren't signing the transaction directly here, the transaction will be signed
@@ -4494,12 +4537,16 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitorImpl<Signer> {
44944537
"Failing HTLC from late counterparty commitment update immediately \
44954538
(funding spend already confirmed)"
44964539
);
4497-
self.pending_monitor_events.push(MonitorEvent::HTLCEvent(HTLCUpdate {
4498-
payment_hash,
4499-
payment_preimage: None,
4500-
source: source.clone(),
4501-
htlc_value_satoshis,
4502-
}));
4540+
push_monitor_event(
4541+
&mut self.pending_monitor_events,
4542+
MonitorEvent::HTLCEvent(HTLCUpdate {
4543+
payment_hash,
4544+
payment_preimage: None,
4545+
source: source.clone(),
4546+
htlc_value_satoshis,
4547+
}),
4548+
&mut self.next_monitor_event_id,
4549+
);
45034550
self.htlcs_resolved_on_chain.push(IrrevocablyResolvedHTLC {
45044551
commitment_tx_output_idx: None,
45054552
resolving_txid: Some(confirmed_txid),
@@ -4565,10 +4612,14 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitorImpl<Signer> {
45654612
}
45664613

45674614
fn push_monitor_event(&mut self, event: MonitorEvent) {
4568-
push_monitor_event(&mut self.pending_monitor_events, event);
4615+
push_monitor_event(
4616+
&mut self.pending_monitor_events,
4617+
event,
4618+
&mut self.next_monitor_event_id,
4619+
);
45694620
}
45704621

4571-
fn get_and_clear_pending_monitor_events(&mut self) -> Vec<MonitorEvent> {
4622+
fn get_and_clear_pending_monitor_events(&mut self) -> Vec<(u64, MonitorEvent)> {
45724623
let mut ret = Vec::new();
45734624
mem::swap(&mut ret, &mut self.pending_monitor_events);
45744625
ret
@@ -5899,7 +5950,7 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitorImpl<Signer> {
58995950
continue;
59005951
}
59015952
let duplicate_event = self.pending_monitor_events.iter().any(
5902-
|update| if let &MonitorEvent::HTLCEvent(ref upd) = update {
5953+
|(_, update)| if let &MonitorEvent::HTLCEvent(ref upd) = update {
59035954
upd.source == *source
59045955
} else { false });
59055956
if duplicate_event {
@@ -5917,7 +5968,7 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitorImpl<Signer> {
59175968
payment_preimage: None,
59185969
payment_hash: htlc.payment_hash,
59195970
htlc_value_satoshis: Some(htlc.amount_msat / 1000),
5920-
}));
5971+
}), &mut self.next_monitor_event_id);
59215972
}
59225973
}
59235974
}
@@ -6316,7 +6367,7 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitorImpl<Signer> {
63166367
if let Some((source, payment_hash, amount_msat)) = payment_data {
63176368
if accepted_preimage_claim {
63186369
if !self.pending_monitor_events.iter().any(
6319-
|update| if let &MonitorEvent::HTLCEvent(ref upd) = update { upd.source == source } else { false }) {
6370+
|(_, update)| if let &MonitorEvent::HTLCEvent(ref upd) = update { upd.source == source } else { false }) {
63206371
self.onchain_events_awaiting_threshold_conf.push(OnchainEventEntry {
63216372
txid: tx.compute_txid(),
63226373
height,
@@ -6334,11 +6385,11 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitorImpl<Signer> {
63346385
payment_preimage: Some(payment_preimage),
63356386
payment_hash,
63366387
htlc_value_satoshis: Some(amount_msat / 1000),
6337-
}));
6388+
}), &mut self.next_monitor_event_id);
63386389
}
63396390
} else if offered_preimage_claim {
63406391
if !self.pending_monitor_events.iter().any(
6341-
|update| if let &MonitorEvent::HTLCEvent(ref upd) = update {
6392+
|(_, update)| if let &MonitorEvent::HTLCEvent(ref upd) = update {
63426393
upd.source == source
63436394
} else { false }) {
63446395
self.onchain_events_awaiting_threshold_conf.push(OnchainEventEntry {
@@ -6358,7 +6409,7 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitorImpl<Signer> {
63586409
payment_preimage: Some(payment_preimage),
63596410
payment_hash,
63606411
htlc_value_satoshis: Some(amount_msat / 1000),
6361-
}));
6412+
}), &mut self.next_monitor_event_id);
63626413
}
63636414
} else {
63646415
self.onchain_events_awaiting_threshold_conf.retain(|ref entry| {
@@ -6723,10 +6774,13 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP
67236774
let mut funding_seen_onchain = RequiredWrapper(None);
67246775
let mut best_block_previous_blocks = None;
67256776
let mut persistent_events_enabled: Option<()> = None;
6777+
let mut next_monitor_event_id: Option<u64> = None;
6778+
let mut pending_mon_evs_with_ids: Option<Vec<ReadableIdMonitorEvent>> = None;
67266779
read_tlv_fields!(reader, {
67276780
(1, funding_spend_confirmed, option),
67286781
(2, persistent_events_enabled, option),
67296782
(3, htlcs_resolved_on_chain, optional_vec),
6783+
(4, pending_mon_evs_with_ids, optional_vec),
67306784
(5, pending_monitor_events_legacy, optional_vec),
67316785
(7, funding_spend_seen, option),
67326786
(9, counterparty_node_id, option),
@@ -6747,6 +6801,7 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP
67476801
(35, is_manual_broadcast, (default_value, false)),
67486802
(37, funding_seen_onchain, (default_value, true)),
67496803
(39, best_block_previous_blocks, option), // Added and always set in 0.3
6804+
(41, next_monitor_event_id, option),
67506805
});
67516806
if let Some(previous_blocks) = best_block_previous_blocks {
67526807
best_block.previous_blocks = previous_blocks;
@@ -6788,6 +6843,22 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP
67886843
}
67896844
}
67906845

6846+
// If persistent events are enabled, use the events with their persisted IDs from TLV 4.
6847+
// Otherwise, use the legacy events from TLV 5 and assign sequential IDs.
6848+
let (next_monitor_event_id, pending_monitor_events): (u64, Vec<(u64, MonitorEvent)>) =
6849+
if persistent_events_enabled.is_some() {
6850+
let evs = pending_mon_evs_with_ids.unwrap_or_default()
6851+
.into_iter().map(|ev| (ev.0, ev.1)).collect();
6852+
(next_monitor_event_id.unwrap_or(0), evs)
6853+
} else if let Some(events) = pending_monitor_events_legacy {
6854+
let next_id = next_monitor_event_id.unwrap_or(events.len() as u64);
6855+
let evs = events.into_iter().enumerate()
6856+
.map(|(i, ev)| (i as u64, ev)).collect();
6857+
(next_id, evs)
6858+
} else {
6859+
(next_monitor_event_id.unwrap_or(0), Vec::new())
6860+
};
6861+
67916862
let channel_parameters = channel_parameters.unwrap_or_else(|| {
67926863
onchain_tx_handler.channel_parameters().clone()
67936864
});
@@ -6905,8 +6976,9 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP
69056976
current_holder_commitment_number,
69066977

69076978
payment_preimages,
6908-
pending_monitor_events: pending_monitor_events_legacy.unwrap(),
6979+
pending_monitor_events,
69096980
persistent_events_enabled: persistent_events_enabled.is_some(),
6981+
next_monitor_event_id,
69106982
pending_events,
69116983
is_processing_pending_events: false,
69126984

@@ -6955,6 +7027,22 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP
69557027
}
69567028
}
69577029

7030+
/// Deserialization wrapper for reading a `(u64, MonitorEvent)`.
7031+
/// Necessary because we can't deserialize a (Readable, MaybeReadable) tuple due to trait
7032+
/// conflicts.
7033+
struct ReadableIdMonitorEvent(u64, MonitorEvent);
7034+
7035+
impl MaybeReadable for ReadableIdMonitorEvent {
7036+
fn read<R: io::Read>(reader: &mut R) -> Result<Option<Self>, DecodeError> {
7037+
let id: u64 = Readable::read(reader)?;
7038+
let event_opt: Option<MonitorEvent> = MaybeReadable::read(reader)?;
7039+
match event_opt {
7040+
Some(ev) => Ok(Some(ReadableIdMonitorEvent(id, ev))),
7041+
None => Ok(None),
7042+
}
7043+
}
7044+
}
7045+
69587046
#[cfg(test)]
69597047
pub(super) fn dummy_monitor<S: EcdsaChannelSigner + 'static>(
69607048
channel_id: ChannelId, wrap_signer: impl FnOnce(crate::sign::InMemorySigner) -> S,

0 commit comments

Comments
 (0)