Skip to content

Reload pending outbound payments from ChannelMonitors on startup #1104

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions lightning/src/chain/channelmonitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1515,6 +1515,101 @@ impl<Signer: Sign> ChannelMonitor<Signer> {

res
}

/// Gets the set of outbound HTLCs which are pending resolution in this channel.
/// This is used to reconstruct pending outbound payments on restart in the ChannelManager.
pub(crate) fn get_pending_outbound_htlcs(&self) -> HashMap<HTLCSource, HTLCOutputInCommitment> {
let mut res = HashMap::new();
let us = self.inner.lock().unwrap();

macro_rules! walk_htlcs {
($holder_commitment: expr, $htlc_iter: expr) => {
for (htlc, source) in $htlc_iter {
if us.htlcs_resolved_on_chain.iter().any(|v| Some(v.input_idx) == htlc.transaction_output_index) {
// We should assert that funding_spend_confirmed is_some() here, but we
// have some unit tests which violate HTLC transaction CSVs entirely and
// would fail.
// TODO: Once tests all connect transactions at consensus-valid times, we
// should assert here like we do in `get_claimable_balances`.
} else if htlc.offered == $holder_commitment {
// If the payment was outbound, check if there's an HTLCUpdate
// indicating we have spent this HTLC with a timeout, claiming it back
// and awaiting confirmations on it.
let htlc_update_confd = us.onchain_events_awaiting_threshold_conf.iter().any(|event| {
if let OnchainEvent::HTLCUpdate { input_idx: Some(input_idx), .. } = event.event {
// If the HTLC was timed out, we wait for ANTI_REORG_DELAY blocks
// before considering it "no longer pending" - this matches when we
// provide the ChannelManager an HTLC failure event.
Some(input_idx) == htlc.transaction_output_index &&
us.best_block.height() >= event.height + ANTI_REORG_DELAY - 1
} else if let OnchainEvent::HTLCSpendConfirmation { input_idx, .. } = event.event {
// If the HTLC was fulfilled with a preimage, we consider the HTLC
// immediately non-pending, matching when we provide ChannelManager
// the preimage.
Some(input_idx) == htlc.transaction_output_index
} else { false }
});
if !htlc_update_confd {
res.insert(source.clone(), htlc.clone());
}
}
}
}
}

// We're only concerned with the confirmation count of HTLC transactions, and don't
// actually care how many confirmations a commitment transaction may or may not have. Thus,
// we look for either a FundingSpendConfirmation event or a funding_spend_confirmed.
let confirmed_txid = us.funding_spend_confirmed.or_else(|| {
us.onchain_events_awaiting_threshold_conf.iter().find_map(|event| {
if let OnchainEvent::FundingSpendConfirmation { .. } = event.event {
Some(event.txid)
} else { None }
})
});
if let Some(txid) = confirmed_txid {
if Some(txid) == us.current_counterparty_commitment_txid || Some(txid) == us.prev_counterparty_commitment_txid {
walk_htlcs!(false, us.counterparty_claimable_outpoints.get(&txid).unwrap().iter().filter_map(|(a, b)| {
if let &Some(ref source) = b {
Some((a, &**source))
} else { None }
}));
} else if txid == us.current_holder_commitment_tx.txid {
walk_htlcs!(true, us.current_holder_commitment_tx.htlc_outputs.iter().filter_map(|(a, _, c)| {
if let Some(source) = c { Some((a, source)) } else { None }
}));
} else if let Some(prev_commitment) = &us.prev_holder_signed_commitment_tx {
if txid == prev_commitment.txid {
walk_htlcs!(true, prev_commitment.htlc_outputs.iter().filter_map(|(a, _, c)| {
if let Some(source) = c { Some((a, source)) } else { None }
}));
}
}
} else {
// If we have not seen a commitment transaction on-chain (ie the channel is not yet
// closed), just examine the available counterparty commitment transactions. See docs
// on `fail_unbroadcast_htlcs`, below, for justification.
macro_rules! walk_counterparty_commitment {
($txid: expr) => {
if let Some(ref latest_outpoints) = us.counterparty_claimable_outpoints.get($txid) {
for &(ref htlc, ref source_option) in latest_outpoints.iter() {
if let &Some(ref source) = source_option {
res.insert((**source).clone(), htlc.clone());
}
}
}
}
}
if let Some(ref txid) = us.current_counterparty_commitment_txid {
walk_counterparty_commitment!(txid);
}
if let Some(ref txid) = us.prev_counterparty_commitment_txid {
walk_counterparty_commitment!(txid);
}
}

res
}
}

/// Compares a broadcasted commitment transaction's HTLCs with those in the latest state,
Expand Down
108 changes: 88 additions & 20 deletions lightning/src/ln/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,29 @@ pub enum UpdateFulfillCommitFetch {
DuplicateClaim {},
}

/// The return value of `revoke_and_ack` on success, primarily updates to other channels or HTLC
/// state.
pub(super) struct RAAUpdates {
pub commitment_update: Option<msgs::CommitmentUpdate>,
pub accepted_htlcs: Vec<(PendingHTLCInfo, u64)>,
pub failed_htlcs: Vec<(HTLCSource, PaymentHash, HTLCFailReason)>,
pub finalized_claimed_htlcs: Vec<HTLCSource>,
pub monitor_update: ChannelMonitorUpdate,
pub holding_cell_failed_htlcs: Vec<(HTLCSource, PaymentHash)>,
}

/// The return value of `monitor_updating_restored`
pub(super) struct MonitorRestoreUpdates {
pub raa: Option<msgs::RevokeAndACK>,
pub commitment_update: Option<msgs::CommitmentUpdate>,
pub order: RAACommitmentOrder,
pub accepted_htlcs: Vec<(PendingHTLCInfo, u64)>,
pub failed_htlcs: Vec<(HTLCSource, PaymentHash, HTLCFailReason)>,
pub finalized_claimed_htlcs: Vec<HTLCSource>,
pub funding_broadcastable: Option<Transaction>,
pub funding_locked: Option<msgs::FundingLocked>,
}

/// If the majority of the channels funds are to the fundee and the initiator holds only just
/// enough funds to cover their reserve value, channels are at risk of getting "stuck". Because the
/// initiator controls the feerate, if they then go to increase the channel fee, they may have no
Expand Down Expand Up @@ -406,6 +429,7 @@ pub(super) struct Channel<Signer: Sign> {
monitor_pending_commitment_signed: bool,
monitor_pending_forwards: Vec<(PendingHTLCInfo, u64)>,
monitor_pending_failures: Vec<(HTLCSource, PaymentHash, HTLCFailReason)>,
monitor_pending_finalized_fulfills: Vec<HTLCSource>,

// pending_update_fee is filled when sending and receiving update_fee.
//
Expand Down Expand Up @@ -692,6 +716,7 @@ impl<Signer: Sign> Channel<Signer> {
monitor_pending_commitment_signed: false,
monitor_pending_forwards: Vec::new(),
monitor_pending_failures: Vec::new(),
monitor_pending_finalized_fulfills: Vec::new(),

#[cfg(debug_assertions)]
holder_max_commitment_tx_output: Mutex::new((channel_value_satoshis * 1000 - push_msat, push_msat)),
Expand Down Expand Up @@ -955,6 +980,7 @@ impl<Signer: Sign> Channel<Signer> {
monitor_pending_commitment_signed: false,
monitor_pending_forwards: Vec::new(),
monitor_pending_failures: Vec::new(),
monitor_pending_finalized_fulfills: Vec::new(),

#[cfg(debug_assertions)]
holder_max_commitment_tx_output: Mutex::new((msg.push_msat, msg.funding_satoshis * 1000 - msg.push_msat)),
Expand Down Expand Up @@ -2711,7 +2737,7 @@ impl<Signer: Sign> Channel<Signer> {
/// waiting on this revoke_and_ack. The generation of this new commitment_signed may also fail,
/// generating an appropriate error *after* the channel state has been updated based on the
/// revoke_and_ack message.
pub fn revoke_and_ack<L: Deref>(&mut self, msg: &msgs::RevokeAndACK, logger: &L) -> Result<(Option<msgs::CommitmentUpdate>, Vec<(PendingHTLCInfo, u64)>, Vec<(HTLCSource, PaymentHash, HTLCFailReason)>, ChannelMonitorUpdate, Vec<(HTLCSource, PaymentHash)>), ChannelError>
pub fn revoke_and_ack<L: Deref>(&mut self, msg: &msgs::RevokeAndACK, logger: &L) -> Result<RAAUpdates, ChannelError>
where L::Target: Logger,
{
if (self.channel_state & (ChannelState::ChannelFunded as u32)) != (ChannelState::ChannelFunded as u32) {
Expand Down Expand Up @@ -2777,6 +2803,7 @@ impl<Signer: Sign> Channel<Signer> {
log_trace!(logger, "Updating HTLCs on receipt of RAA in channel {}...", log_bytes!(self.channel_id()));
let mut to_forward_infos = Vec::new();
let mut revoked_htlcs = Vec::new();
let mut finalized_claimed_htlcs = Vec::new();
let mut update_fail_htlcs = Vec::new();
let mut update_fail_malformed_htlcs = Vec::new();
let mut require_commitment = false;
Expand All @@ -2803,6 +2830,7 @@ impl<Signer: Sign> Channel<Signer> {
if let Some(reason) = fail_reason.clone() { // We really want take() here, but, again, non-mut ref :(
revoked_htlcs.push((htlc.source.clone(), htlc.payment_hash, reason));
} else {
finalized_claimed_htlcs.push(htlc.source.clone());
// They fulfilled, so we sent them money
value_to_self_msat_diff -= htlc.amount_msat as i64;
}
Expand Down Expand Up @@ -2899,8 +2927,14 @@ impl<Signer: Sign> Channel<Signer> {
}
self.monitor_pending_forwards.append(&mut to_forward_infos);
self.monitor_pending_failures.append(&mut revoked_htlcs);
self.monitor_pending_finalized_fulfills.append(&mut finalized_claimed_htlcs);
log_debug!(logger, "Received a valid revoke_and_ack for channel {} but awaiting a monitor update resolution to reply.", log_bytes!(self.channel_id()));
return Ok((None, Vec::new(), Vec::new(), monitor_update, Vec::new()))
return Ok(RAAUpdates {
commitment_update: None, finalized_claimed_htlcs: Vec::new(),
accepted_htlcs: Vec::new(), failed_htlcs: Vec::new(),
monitor_update,
holding_cell_failed_htlcs: Vec::new()
});
}

match self.free_holding_cell_htlcs(logger)? {
Expand All @@ -2919,7 +2953,14 @@ impl<Signer: Sign> Channel<Signer> {
self.latest_monitor_update_id = monitor_update.update_id;
monitor_update.updates.append(&mut additional_update.updates);

Ok((Some(commitment_update), to_forward_infos, revoked_htlcs, monitor_update, htlcs_to_fail))
Ok(RAAUpdates {
commitment_update: Some(commitment_update),
finalized_claimed_htlcs,
accepted_htlcs: to_forward_infos,
failed_htlcs: revoked_htlcs,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar question wrt "failed" and "revoked". Just want to make sure I understand if it is context dependent.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same answer, there really isn't.

monitor_update,
holding_cell_failed_htlcs: htlcs_to_fail
})
},
(None, htlcs_to_fail) => {
if require_commitment {
Expand All @@ -2932,17 +2973,27 @@ impl<Signer: Sign> Channel<Signer> {

log_debug!(logger, "Received a valid revoke_and_ack for channel {}. Responding with a commitment update with {} HTLCs failed.",
log_bytes!(self.channel_id()), update_fail_htlcs.len() + update_fail_malformed_htlcs.len());
Ok((Some(msgs::CommitmentUpdate {
update_add_htlcs: Vec::new(),
update_fulfill_htlcs: Vec::new(),
update_fail_htlcs,
update_fail_malformed_htlcs,
update_fee: None,
commitment_signed
}), to_forward_infos, revoked_htlcs, monitor_update, htlcs_to_fail))
Ok(RAAUpdates {
commitment_update: Some(msgs::CommitmentUpdate {
update_add_htlcs: Vec::new(),
update_fulfill_htlcs: Vec::new(),
update_fail_htlcs,
update_fail_malformed_htlcs,
update_fee: None,
commitment_signed
}),
finalized_claimed_htlcs,
accepted_htlcs: to_forward_infos, failed_htlcs: revoked_htlcs,
monitor_update, holding_cell_failed_htlcs: htlcs_to_fail
})
} else {
log_debug!(logger, "Received a valid revoke_and_ack for channel {} with no reply necessary.", log_bytes!(self.channel_id()));
Ok((None, to_forward_infos, revoked_htlcs, monitor_update, htlcs_to_fail))
Ok(RAAUpdates {
commitment_update: None,
finalized_claimed_htlcs,
accepted_htlcs: to_forward_infos, failed_htlcs: revoked_htlcs,
monitor_update, holding_cell_failed_htlcs: htlcs_to_fail
})
}
}
}
Expand Down Expand Up @@ -3057,18 +3108,23 @@ impl<Signer: Sign> Channel<Signer> {
/// which failed. The messages which were generated from that call which generated the
/// monitor update failure must *not* have been sent to the remote end, and must instead
/// have been dropped. They will be regenerated when monitor_updating_restored is called.
pub fn monitor_update_failed(&mut self, resend_raa: bool, resend_commitment: bool, mut pending_forwards: Vec<(PendingHTLCInfo, u64)>, mut pending_fails: Vec<(HTLCSource, PaymentHash, HTLCFailReason)>) {
pub fn monitor_update_failed(&mut self, resend_raa: bool, resend_commitment: bool,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: move these parameters down a line, aligned with the other parameters

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a small function that doesn't seem to look any better to my eye? Feels very "rustfmt wants your code to use as many lines as possible" to me :).

mut pending_forwards: Vec<(PendingHTLCInfo, u64)>,
mut pending_fails: Vec<(HTLCSource, PaymentHash, HTLCFailReason)>,
mut pending_finalized_claimed_htlcs: Vec<HTLCSource>
) {
self.monitor_pending_revoke_and_ack |= resend_raa;
self.monitor_pending_commitment_signed |= resend_commitment;
self.monitor_pending_forwards.append(&mut pending_forwards);
self.monitor_pending_failures.append(&mut pending_fails);
self.monitor_pending_finalized_fulfills.append(&mut pending_finalized_claimed_htlcs);
self.channel_state |= ChannelState::MonitorUpdateFailed as u32;
}

/// Indicates that the latest ChannelMonitor update has been committed by the client
/// successfully and we should restore normal operation. Returns messages which should be sent
/// to the remote side.
pub fn monitor_updating_restored<L: Deref>(&mut self, logger: &L) -> (Option<msgs::RevokeAndACK>, Option<msgs::CommitmentUpdate>, RAACommitmentOrder, Vec<(PendingHTLCInfo, u64)>, Vec<(HTLCSource, PaymentHash, HTLCFailReason)>, Option<Transaction>, Option<msgs::FundingLocked>) where L::Target: Logger {
pub fn monitor_updating_restored<L: Deref>(&mut self, logger: &L) -> MonitorRestoreUpdates where L::Target: Logger {
assert_eq!(self.channel_state & ChannelState::MonitorUpdateFailed as u32, ChannelState::MonitorUpdateFailed as u32);
self.channel_state &= !(ChannelState::MonitorUpdateFailed as u32);

Expand All @@ -3091,15 +3147,20 @@ impl<Signer: Sign> Channel<Signer> {
})
} else { None };

let mut forwards = Vec::new();
mem::swap(&mut forwards, &mut self.monitor_pending_forwards);
let mut failures = Vec::new();
mem::swap(&mut failures, &mut self.monitor_pending_failures);
let mut accepted_htlcs = Vec::new();
mem::swap(&mut accepted_htlcs, &mut self.monitor_pending_forwards);
let mut failed_htlcs = Vec::new();
mem::swap(&mut failed_htlcs, &mut self.monitor_pending_failures);
let mut finalized_claimed_htlcs = Vec::new();
mem::swap(&mut finalized_claimed_htlcs, &mut self.monitor_pending_finalized_fulfills);

if self.channel_state & (ChannelState::PeerDisconnected as u32) != 0 {
self.monitor_pending_revoke_and_ack = false;
self.monitor_pending_commitment_signed = false;
return (None, None, RAACommitmentOrder::RevokeAndACKFirst, forwards, failures, funding_broadcastable, funding_locked);
return MonitorRestoreUpdates {
raa: None, commitment_update: None, order: RAACommitmentOrder::RevokeAndACKFirst,
accepted_htlcs, failed_htlcs, finalized_claimed_htlcs, funding_broadcastable, funding_locked
};
}

let raa = if self.monitor_pending_revoke_and_ack {
Expand All @@ -3116,7 +3177,9 @@ impl<Signer: Sign> Channel<Signer> {
log_bytes!(self.channel_id()), if funding_broadcastable.is_some() { "a funding broadcastable, " } else { "" },
if commitment_update.is_some() { "a" } else { "no" }, if raa.is_some() { "an" } else { "no" },
match order { RAACommitmentOrder::CommitmentFirst => "commitment", RAACommitmentOrder::RevokeAndACKFirst => "RAA"});
(raa, commitment_update, order, forwards, failures, funding_broadcastable, funding_locked)
MonitorRestoreUpdates {
raa, commitment_update, order, accepted_htlcs, failed_htlcs, finalized_claimed_htlcs, funding_broadcastable, funding_locked
}
}

pub fn update_fee<F: Deref>(&mut self, fee_estimator: &F, msg: &msgs::UpdateFee) -> Result<(), ChannelError>
Expand Down Expand Up @@ -5176,6 +5239,7 @@ impl<Signer: Sign> Writeable for Channel<Signer> {
(5, self.config, required),
(7, self.shutdown_scriptpubkey, option),
(9, self.target_closing_feerate_sats_per_kw, option),
(11, self.monitor_pending_finalized_fulfills, vec_type),
});

Ok(())
Expand Down Expand Up @@ -5409,13 +5473,15 @@ impl<'a, Signer: Sign, K: Deref> ReadableArgs<&'a K> for Channel<Signer>

let mut announcement_sigs = None;
let mut target_closing_feerate_sats_per_kw = None;
let mut monitor_pending_finalized_fulfills = Some(Vec::new());
read_tlv_fields!(reader, {
(0, announcement_sigs, option),
(1, minimum_depth, option),
(3, counterparty_selected_channel_reserve_satoshis, option),
(5, config, option), // Note that if none is provided we will *not* overwrite the existing one.
(7, shutdown_scriptpubkey, option),
(9, target_closing_feerate_sats_per_kw, option),
(11, monitor_pending_finalized_fulfills, vec_type),
});

let mut secp_ctx = Secp256k1::new();
Expand Down Expand Up @@ -5451,6 +5517,7 @@ impl<'a, Signer: Sign, K: Deref> ReadableArgs<&'a K> for Channel<Signer>
monitor_pending_commitment_signed,
monitor_pending_forwards,
monitor_pending_failures,
monitor_pending_finalized_fulfills: monitor_pending_finalized_fulfills.unwrap(),

pending_update_fee,
holding_cell_update_fee,
Expand Down Expand Up @@ -5700,6 +5767,7 @@ mod tests {
session_priv: SecretKey::from_slice(&hex::decode("0fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff").unwrap()[..]).unwrap(),
first_hop_htlc_msat: 548,
payment_id: PaymentId([42; 32]),
payment_secret: None,
}
});

Expand Down
Loading