@@ -1485,10 +1485,13 @@ pub(crate) const CHANNEL_ANNOUNCEMENT_PROPAGATION_DELAY: u32 = 144;
14851485#[derive(Debug)]
14861486struct PendingChannelMonitorUpdate {
14871487 update: ChannelMonitorUpdate,
1488+ /// `MonitorEvent`s that can be ack'd after `update` is durably persisted.
1489+ post_update_ackable_events: Vec<MonitorEventSource>,
14881490}
14891491
14901492impl_writeable_tlv_based!(PendingChannelMonitorUpdate, {
14911493 (0, update, required),
1494+ (1, post_update_ackable_events, optional_vec),
14921495});
14931496
14941497/// A payment channel with a counterparty throughout its life-cycle, encapsulating negotiation and
@@ -7668,9 +7671,10 @@ where
76687671 if !update_blocked {
76697672 debug_assert!(false, "If there is a pending blocked monitor we should have MonitorUpdateInProgress set");
76707673 let update = self.build_commitment_no_status_check(logger);
7671- self.context
7672- .blocked_monitor_updates
7673- .push(PendingChannelMonitorUpdate { update });
7674+ self.context.blocked_monitor_updates.push(PendingChannelMonitorUpdate {
7675+ update,
7676+ post_update_ackable_events: Vec::new(),
7677+ });
76747678 }
76757679 }
76767680
@@ -8903,8 +8907,10 @@ where
89038907 logger,
89048908 );
89058909 (
8906- self.push_ret_blockable_mon_update(monitor_update)
8907- .map(|upd| (upd, monitor_events_to_ack)),
8910+ self.push_ret_blockable_mon_update_with_event_sources(
8911+ monitor_update,
8912+ monitor_events_to_ack,
8913+ ),
89088914 htlcs_to_fail,
89098915 )
89108916 } else {
@@ -9246,13 +9252,14 @@ where
92469252 let mut monitor_events_to_ack = Vec::new();
92479253 macro_rules! return_with_htlcs_to_fail {
92489254 ($htlcs_to_fail: expr) => {
9255+ let events_to_ack = core::mem::take(&mut monitor_events_to_ack);
92499256 if !release_monitor {
9250- self.context
9251- .blocked_monitor_updates
9252- .push(PendingChannelMonitorUpdate { update: monitor_update });
9257+ self.context.blocked_monitor_updates.push(PendingChannelMonitorUpdate {
9258+ update: monitor_update,
9259+ post_update_ackable_events: events_to_ack,
9260+ });
92539261 return Ok(($htlcs_to_fail, static_invoices, None));
92549262 } else {
9255- let events_to_ack = core::mem::take(&mut monitor_events_to_ack);
92569263 return Ok((
92579264 $htlcs_to_fail,
92589265 static_invoices,
@@ -11346,12 +11353,16 @@ where
1134611353
1134711354 /// Returns the next blocked monitor update, if one exists, and a bool which indicates a
1134811355 /// further blocked monitor update exists after the next.
11349- pub fn unblock_next_blocked_monitor_update(&mut self) -> Option<(ChannelMonitorUpdate, bool)> {
11356+ pub fn unblock_next_blocked_monitor_update(
11357+ &mut self,
11358+ ) -> Option<(ChannelMonitorUpdate, Vec<MonitorEventSource>, bool)> {
1135011359 if self.context.blocked_monitor_updates.is_empty() {
1135111360 return None;
1135211361 }
11362+ let pending = self.context.blocked_monitor_updates.remove(0);
1135311363 Some((
11354- self.context.blocked_monitor_updates.remove(0).update,
11364+ pending.update,
11365+ pending.post_update_ackable_events,
1135511366 !self.context.blocked_monitor_updates.is_empty(),
1135611367 ))
1135711368 }
@@ -11361,14 +11372,24 @@ where
1136111372 #[rustfmt::skip]
1136211373 fn push_ret_blockable_mon_update(&mut self, update: ChannelMonitorUpdate)
1136311374 -> Option<ChannelMonitorUpdate> {
11375+ self.push_ret_blockable_mon_update_with_event_sources(update, Vec::new())
11376+ .map(|(upd, _)| upd)
11377+ }
11378+
11379+ /// Similar to `push_ret_blockable_mon_update`, but allows including a list of
11380+ /// `MonitorEventSource`s that can be ack'd after the `update` is durably persisted.
11381+ fn push_ret_blockable_mon_update_with_event_sources(
11382+ &mut self, update: ChannelMonitorUpdate, monitor_event_sources: Vec<MonitorEventSource>,
11383+ ) -> Option<(ChannelMonitorUpdate, Vec<MonitorEventSource>)> {
1136411384 let release_monitor = self.context.blocked_monitor_updates.is_empty();
1136511385 if !release_monitor {
1136611386 self.context.blocked_monitor_updates.push(PendingChannelMonitorUpdate {
1136711387 update,
11388+ post_update_ackable_events: monitor_event_sources,
1136811389 });
1136911390 None
1137011391 } else {
11371- Some(update)
11392+ Some(( update, monitor_event_sources) )
1137211393 }
1137311394 }
1137411395
@@ -11377,19 +11398,23 @@ where
1137711398 /// here after logging them.
1137811399 pub fn on_startup_drop_completed_blocked_mon_updates_through<L: Logger>(
1137911400 &mut self, logger: &L, loaded_mon_update_id: u64,
11380- ) {
11381- self.context.blocked_monitor_updates.retain(|update| {
11401+ ) -> Vec<MonitorEventSource> {
11402+ let mut monitor_events_to_ack = Vec::new();
11403+ self.context.blocked_monitor_updates.retain_mut(|update| {
1138211404 if update.update.update_id <= loaded_mon_update_id {
1138311405 log_info!(
1138411406 logger,
1138511407 "Dropping completed ChannelMonitorUpdate id {} due to a stale ChannelManager",
1138611408 update.update.update_id,
1138711409 );
11410+ monitor_events_to_ack
11411+ .extend(core::mem::take(&mut update.post_update_ackable_events));
1138811412 false
1138911413 } else {
1139011414 true
1139111415 }
1139211416 });
11417+ monitor_events_to_ack
1139311418 }
1139411419
1139511420 pub fn blocked_monitor_updates_pending(&self) -> usize {
0 commit comments