-
Notifications
You must be signed in to change notification settings - Fork 404
Batch on-chain claims more aggressively per channel #3340
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
fd8b84e
24b8390
69e1c70
bbf1d93
0fe90c6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,7 +31,7 @@ use crate::types::payment::PaymentPreimage; | |
use crate::ln::chan_utils::{self, ChannelTransactionParameters, HTLCOutputInCommitment, HolderCommitmentTransaction}; | ||
use crate::chain::ClaimId; | ||
use crate::chain::chaininterface::{FeeEstimator, BroadcasterInterface, LowerBoundedFeeEstimator}; | ||
use crate::chain::channelmonitor::{ANTI_REORG_DELAY, CLTV_SHARED_CLAIM_BUFFER}; | ||
use crate::chain::channelmonitor::ANTI_REORG_DELAY; | ||
use crate::chain::package::{PackageSolvingData, PackageTemplate}; | ||
use crate::chain::transaction::MaybeSignedTransaction; | ||
use crate::util::logger::Logger; | ||
|
@@ -726,7 +726,7 @@ impl<ChannelSigner: EcdsaChannelSigner> OnchainTxHandler<ChannelSigner> { | |
/// does not need to equal the current blockchain tip height, which should be provided via | ||
/// `cur_height`, however it must never be higher than `cur_height`. | ||
pub(super) fn update_claims_view_from_requests<B: Deref, F: Deref, L: Logger>( | ||
&mut self, requests: Vec<PackageTemplate>, conf_height: u32, cur_height: u32, | ||
&mut self, mut requests: Vec<PackageTemplate>, conf_height: u32, cur_height: u32, | ||
broadcaster: &B, conf_target: ConfirmationTarget, | ||
fee_estimator: &LowerBoundedFeeEstimator<F>, logger: &L | ||
) where | ||
|
@@ -737,49 +737,67 @@ impl<ChannelSigner: EcdsaChannelSigner> OnchainTxHandler<ChannelSigner> { | |
log_debug!(logger, "Updating claims view at height {} with {} claim requests", cur_height, requests.len()); | ||
} | ||
|
||
let mut preprocessed_requests = Vec::with_capacity(requests.len()); | ||
let mut aggregated_request = None; | ||
|
||
// Try to aggregate outputs if their timelock expiration isn't imminent (package timelock | ||
// <= CLTV_SHARED_CLAIM_BUFFER) and they don't require an immediate nLockTime (aggregable). | ||
for req in requests { | ||
// Don't claim a outpoint twice that would be bad for privacy and may uselessly lock a CPFP input for a while | ||
if let Some(_) = self.claimable_outpoints.get(req.outpoints()[0]) { | ||
log_info!(logger, "Ignoring second claim for outpoint {}:{}, already registered its claiming request", req.outpoints()[0].txid, req.outpoints()[0].vout); | ||
// First drop any duplicate claims. | ||
requests.retain(|req| { | ||
debug_assert_eq!( | ||
req.outpoints().len(), | ||
1, | ||
"Claims passed to `update_claims_view_from_requests` should not be aggregated" | ||
); | ||
let mut all_outpoints_claiming = true; | ||
for outpoint in req.outpoints() { | ||
if self.claimable_outpoints.get(outpoint).is_none() { | ||
all_outpoints_claiming = false; | ||
} | ||
} | ||
if all_outpoints_claiming { | ||
log_info!(logger, "Ignoring second claim for outpoint {}:{}, already registered its claiming request", | ||
req.outpoints()[0].txid, req.outpoints()[0].vout); | ||
false | ||
} else { | ||
let timelocked_equivalent_package = self.locktimed_packages.iter().map(|v| v.1.iter()).flatten() | ||
.find(|locked_package| locked_package.outpoints() == req.outpoints()); | ||
if let Some(package) = timelocked_equivalent_package { | ||
log_info!(logger, "Ignoring second claim for outpoint {}:{}, we already have one which we're waiting on a timelock at {} for.", | ||
req.outpoints()[0].txid, req.outpoints()[0].vout, package.package_locktime(cur_height)); | ||
continue; | ||
false | ||
} else { | ||
true | ||
} | ||
} | ||
}); | ||
|
||
let package_locktime = req.package_locktime(cur_height); | ||
if package_locktime > cur_height + 1 { | ||
log_info!(logger, "Delaying claim of package until its timelock at {} (current height {}), the following outpoints are spent:", package_locktime, cur_height); | ||
for outpoint in req.outpoints() { | ||
log_info!(logger, " Outpoint {}", outpoint); | ||
// Then try to maximally aggregate `requests`. | ||
for i in (1..requests.len()).rev() { | ||
for j in 0..i { | ||
if requests[i].can_merge_with(&requests[j], cur_height) { | ||
let merge = requests.remove(i); | ||
if let Err(rejected) = requests[j].merge_package(merge, cur_height) { | ||
debug_assert!(false, "Merging package should not be rejected after verifying can_merge_with."); | ||
requests.insert(i, rejected); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, removing then inserting at every step is kinda annoying cause it generally requires a vec shift...I'm not entirely convinced by this commit. If we want to reduce the risk of accidental panic introductions with code changes maybe we rename There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, the re-inserts were only introduced as an alternative to panicking on the The main goal was to push I can remove the commit as well though. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is fine. Note that the fixup commit will need to go on the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Moved the fixup commit into the right place. |
||
} else { | ||
break; | ||
} | ||
self.locktimed_packages.entry(package_locktime).or_default().push(req); | ||
continue; | ||
} | ||
} | ||
} | ||
|
||
log_trace!(logger, "Test if outpoint which our counterparty can spend at {} can be aggregated based on aggregation limit {}", req.counterparty_spendable_height(), cur_height + CLTV_SHARED_CLAIM_BUFFER); | ||
if req.counterparty_spendable_height() <= cur_height + CLTV_SHARED_CLAIM_BUFFER || !req.aggregable() { | ||
preprocessed_requests.push(req); | ||
} else if aggregated_request.is_none() { | ||
aggregated_request = Some(req); | ||
} else { | ||
aggregated_request.as_mut().unwrap().merge_package(req); | ||
// Finally, split requests into timelocked ones and immediately-spendable ones. | ||
let mut preprocessed_requests = Vec::with_capacity(requests.len()); | ||
for req in requests { | ||
let package_locktime = req.package_locktime(cur_height); | ||
if package_locktime > cur_height + 1 { | ||
log_info!(logger, "Delaying claim of package until its timelock at {} (current height {}), the following outpoints are spent:", package_locktime, cur_height); | ||
for outpoint in req.outpoints() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Pre-existing, but it looks like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I took a quick stab at it, but it did not turn out to be entirely straightforward. Is it okay to postpone to another PR? |
||
log_info!(logger, " Outpoint {}", outpoint); | ||
} | ||
self.locktimed_packages.entry(package_locktime).or_default().push(req); | ||
} else { | ||
preprocessed_requests.push(req); | ||
} | ||
} | ||
if let Some(req) = aggregated_request { | ||
preprocessed_requests.push(req); | ||
} | ||
|
||
// Claim everything up to and including `cur_height` | ||
// Claim everything up to and including `cur_height`. | ||
let remaining_locked_packages = self.locktimed_packages.split_off(&(cur_height + 1)); | ||
if !self.locktimed_packages.is_empty() { | ||
log_debug!(logger, | ||
|
@@ -1088,7 +1106,7 @@ impl<ChannelSigner: EcdsaChannelSigner> OnchainTxHandler<ChannelSigner> { | |
OnchainEvent::ContentiousOutpoint { package } => { | ||
if let Some(pending_claim) = self.claimable_outpoints.get(package.outpoints()[0]) { | ||
if let Some(request) = self.pending_claim_requests.get_mut(&pending_claim.0) { | ||
request.merge_package(package); | ||
assert!(request.merge_package(package, height).is_ok()); | ||
// Using a HashMap guarantee us than if we have multiple outpoints getting | ||
// resurrected only one bump claim tx is going to be broadcast | ||
bump_candidates.insert(pending_claim.clone(), request.clone()); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TIL you can do this in our MSRV...lol we've got some code cleanup to do...