Skip to content

Commit 395b30d

Browse files
committed
Batch MPP claims into single ChannelMonitorUpdate
Claiming multiple MPP parts on the same channel was partially sequential, requiring claimee to claim the first part and wait for the peer to respond again before other parts can be claimed. This UX results in claim latency, time spent waiting on channel monitor updates, requiring a full round-trip (RAA/CS) for HTLC fulfillment. This change optimizes the process by batching these claims into a single update and a single commitment_signed message. - Introduce UpdateFulfillsCommitFetch enum and the get_update_fulfill_htlcs_and_commit method to Channel. - Update ChannelManager to group claimable HTLCs by counterparty and channel ID before delegation. - Refactor chanmon_update_fail_tests.rs and payment_tests.rs to align with the new atomic batching semantics. Tests has been updated to reflect this new batching of MPP claims - `test_single_channel_multiple_mpp` - `auto_retry_partial_failure` - `test_keysend_dup_hash_partial_mpp`
1 parent 9f73a98 commit 395b30d

File tree

4 files changed

+426
-306
lines changed

4 files changed

+426
-306
lines changed

lightning/src/ln/chanmon_update_fail_tests.rs

Lines changed: 36 additions & 202 deletions
Original file line numberDiff line numberDiff line change
@@ -4654,34 +4654,20 @@ fn test_claim_to_closed_channel_blocks_claimed_event() {
46544654
}
46554655

46564656
#[test]
4657-
#[cfg(all(feature = "std", not(target_os = "windows")))]
46584657
fn test_single_channel_multiple_mpp() {
46594658
use crate::util::config::UserConfig;
4660-
use std::sync::atomic::{AtomicBool, Ordering};
46614659

4662-
// Test what happens when we attempt to claim an MPP with many parts that came to us through
4663-
// the same channel with a synchronous persistence interface which has very high latency.
4664-
//
4665-
// Previously, if a `revoke_and_ack` came in while we were still running in
4666-
// `ChannelManager::claim_payment` we'd end up hanging waiting to apply a
4667-
// `ChannelMonitorUpdate` until after it completed. See the commit which introduced this test
4668-
// for more info.
4660+
// Test that when an MPP payment has many parts ariving on the same channel, all of them are
4661+
// claimed in a single commitment update rather than requiring a round-trip per claim.
46694662
let chanmon_cfgs = create_chanmon_cfgs(9);
46704663
let node_cfgs = create_node_cfgs(9, &chanmon_cfgs);
46714664
let mut config = test_default_channel_config();
4672-
// Set the percentage to the default value at the time this test was written
46734665
config.channel_handshake_config.announced_channel_max_inbound_htlc_value_in_flight_percentage =
46744666
10;
46754667
let configs: [Option<UserConfig>; 9] = core::array::from_fn(|_| Some(config.clone()));
46764668
let node_chanmgrs = create_node_chanmgrs(9, &node_cfgs, &configs);
46774669
let mut nodes = create_network(9, &node_cfgs, &node_chanmgrs);
46784670

4679-
let node_b_id = nodes[1].node.get_our_node_id();
4680-
let node_c_id = nodes[2].node.get_our_node_id();
4681-
let node_d_id = nodes[3].node.get_our_node_id();
4682-
let node_e_id = nodes[4].node.get_our_node_id();
4683-
let node_f_id = nodes[5].node.get_our_node_id();
4684-
let node_g_id = nodes[6].node.get_our_node_id();
46854671
let node_h_id = nodes[7].node.get_our_node_id();
46864672
let node_i_id = nodes[8].node.get_our_node_id();
46874673

@@ -4691,28 +4677,7 @@ fn test_single_channel_multiple_mpp() {
46914677
// 7
46924678
// 8
46934679
//
4694-
// We can in theory reproduce this issue with fewer channels/HTLCs, but getting this test
4695-
// robust is rather challenging. We rely on having the main test thread wait on locks held in
4696-
// the background `claim_funds` thread and unlocking when the `claim_funds` thread completes a
4697-
// single `ChannelMonitorUpdate`.
4698-
// This thread calls `get_and_clear_pending_msg_events()` and `handle_revoke_and_ack()`, both
4699-
// of which require `ChannelManager` locks, but we have to make sure this thread gets a chance
4700-
// to be blocked on the mutexes before we let the background thread wake `claim_funds` so that
4701-
// the mutex can switch to this main thread.
4702-
// This relies on our locks being fair, but also on our threads getting runtime during the test
4703-
// run, which can be pretty competitive. Thus we do a dumb dance to be as conservative as
4704-
// possible - we have a background thread which completes a `ChannelMonitorUpdate` (by sending
4705-
// into the `write_blocker` mpsc) but it doesn't run until a mpsc channel sends from this main
4706-
// thread to the background thread, and then we let it sleep a while before we send the
4707-
// `ChannelMonitorUpdate` unblocker.
4708-
// Further, we give ourselves two chances each time, needing 4 HTLCs just to unlock our two
4709-
// `ChannelManager` calls. We then need a few remaining HTLCs to actually trigger the bug, so
4710-
// we use 6 HTLCs.
4711-
// Finaly, we do not run this test on Winblowz because it, somehow, in 2025, does not implement
4712-
// actual preemptive multitasking and thinks that cooperative multitasking somehow is
4713-
// acceptable in the 21st century, let alone a quarter of the way into it.
4714-
const MAX_THREAD_INIT_TIME: std::time::Duration = std::time::Duration::from_secs(1);
4715-
4680+
// All six parts converge on the same channel (7->8)
47164681
create_announced_chan_between_nodes_with_value(&nodes, 0, 1, 100_000, 0);
47174682
create_announced_chan_between_nodes_with_value(&nodes, 0, 2, 100_000, 0);
47184683
create_announced_chan_between_nodes_with_value(&nodes, 0, 3, 100_000, 0);
@@ -4728,7 +4693,7 @@ fn test_single_channel_multiple_mpp() {
47284693
create_announced_chan_between_nodes_with_value(&nodes, 6, 7, 100_000, 0);
47294694
create_announced_chan_between_nodes_with_value(&nodes, 7, 8, 1_000_000, 0);
47304695

4731-
let (mut route, payment_hash, payment_preimage, payment_secret) =
4696+
let (route, payment_hash, payment_preimage, payment_secret) =
47324697
get_route_and_payment_hash!(&nodes[0], nodes[8], 50_000_000);
47334698

47344699
send_along_route_with_secret(
@@ -4747,177 +4712,46 @@ fn test_single_channel_multiple_mpp() {
47474712
payment_secret,
47484713
);
47494714

4750-
let (do_a_write, blocker) = std::sync::mpsc::sync_channel(0);
4751-
*nodes[8].chain_monitor.write_blocker.lock().unwrap() = Some(blocker);
4752-
4753-
// Until we have std::thread::scoped we have to unsafe { turn off the borrow checker }.
4754-
// We do this by casting a pointer to a `TestChannelManager` to a pointer to a
4755-
// `TestChannelManager` with different (in this case 'static) lifetime.
4756-
// This is even suggested in the second example at
4757-
// https://doc.rust-lang.org/std/mem/fn.transmute.html#examples
4758-
let claim_node: &'static TestChannelManager<'static, 'static> =
4759-
unsafe { std::mem::transmute(nodes[8].node as &TestChannelManager) };
4760-
let thrd = std::thread::spawn(move || {
4761-
// Initiate the claim in a background thread as it will immediately block waiting on the
4762-
// `write_blocker` we set above.
4763-
claim_node.claim_funds(payment_preimage);
4764-
});
4765-
4766-
// First unlock one monitor so that we have a pending
4767-
// `update_fulfill_htlc`/`commitment_signed` pair to pass to our counterparty.
4768-
do_a_write.send(()).unwrap();
4769-
4770-
let event_node: &'static TestChannelManager<'static, 'static> =
4771-
unsafe { std::mem::transmute(nodes[8].node as &TestChannelManager) };
4772-
let thrd_event = std::thread::spawn(move || {
4773-
let mut have_event = false;
4774-
while !have_event {
4775-
let mut events = event_node.get_and_clear_pending_events();
4776-
assert!(events.len() == 1 || events.len() == 0);
4777-
if events.len() == 1 {
4778-
if let Event::PaymentClaimed { .. } = events[0] {
4779-
} else {
4780-
panic!("Unexpected event {events:?}");
4781-
}
4782-
have_event = true;
4783-
}
4784-
if !have_event {
4785-
std::thread::yield_now();
4786-
}
4787-
}
4788-
});
4789-
4790-
// Then fetch the `update_fulfill_htlc`/`commitment_signed`. Note that the
4791-
// `get_and_clear_pending_msg_events` will immediately hang trying to take a peer lock which
4792-
// `claim_funds` is holding. Thus, we release a second write after a small sleep in the
4793-
// background to give `claim_funds` a chance to step forward, unblocking
4794-
// `get_and_clear_pending_msg_events`.
4795-
let do_a_write_background = do_a_write.clone();
4796-
let block_thrd2 = AtomicBool::new(true);
4797-
let block_thrd2_read: &'static AtomicBool = unsafe { std::mem::transmute(&block_thrd2) };
4798-
let thrd2 = std::thread::spawn(move || {
4799-
while block_thrd2_read.load(Ordering::Acquire) {
4800-
std::thread::yield_now();
4801-
}
4802-
std::thread::sleep(MAX_THREAD_INIT_TIME);
4803-
do_a_write_background.send(()).unwrap();
4804-
std::thread::sleep(MAX_THREAD_INIT_TIME);
4805-
do_a_write_background.send(()).unwrap();
4806-
});
4807-
block_thrd2.store(false, Ordering::Release);
4808-
let mut first_updates = get_htlc_update_msgs(&nodes[8], &node_h_id);
4809-
4810-
// Thread 2 could unblock first, or it could get blocked waiting on us to process a
4811-
// `PaymentClaimed` event. Either way, wait until both have finished.
4812-
thrd2.join().unwrap();
4813-
thrd_event.join().unwrap();
4814-
4815-
// Disconnect node 6 from all its peers so it doesn't bother to fail the HTLCs back
4816-
nodes[7].node.peer_disconnected(node_b_id);
4817-
nodes[7].node.peer_disconnected(node_c_id);
4818-
nodes[7].node.peer_disconnected(node_d_id);
4819-
nodes[7].node.peer_disconnected(node_e_id);
4820-
nodes[7].node.peer_disconnected(node_f_id);
4821-
nodes[7].node.peer_disconnected(node_g_id);
4822-
4823-
let first_update_fulfill = first_updates.update_fulfill_htlcs.remove(0);
4824-
nodes[7].node.handle_update_fulfill_htlc(node_i_id, first_update_fulfill);
4825-
check_added_monitors(&nodes[7], 1);
4826-
expect_payment_forwarded!(nodes[7], nodes[1], nodes[8], Some(1000), false, false);
4827-
nodes[7].node.handle_commitment_signed_batch_test(node_i_id, &first_updates.commitment_signed);
4828-
check_added_monitors(&nodes[7], 1);
4829-
let (raa, cs) = get_revoke_commit_msgs(&nodes[7], &node_i_id);
4830-
4831-
// Now, handle the `revoke_and_ack` from node 5. Note that `claim_funds` is still blocked on
4832-
// our peer lock, so we have to release a write to let it process.
4833-
// After this call completes, the channel previously would be locked up and should not be able
4834-
// to make further progress.
4835-
let do_a_write_background = do_a_write.clone();
4836-
let block_thrd3 = AtomicBool::new(true);
4837-
let block_thrd3_read: &'static AtomicBool = unsafe { std::mem::transmute(&block_thrd3) };
4838-
let thrd3 = std::thread::spawn(move || {
4839-
while block_thrd3_read.load(Ordering::Acquire) {
4840-
std::thread::yield_now();
4841-
}
4842-
std::thread::sleep(MAX_THREAD_INIT_TIME);
4843-
do_a_write_background.send(()).unwrap();
4844-
std::thread::sleep(MAX_THREAD_INIT_TIME);
4845-
do_a_write_background.send(()).unwrap();
4846-
});
4847-
block_thrd3.store(false, Ordering::Release);
4848-
nodes[8].node.handle_revoke_and_ack(node_h_id, &raa);
4849-
thrd3.join().unwrap();
4850-
assert!(!thrd.is_finished());
4851-
4852-
let thrd4 = std::thread::spawn(move || {
4853-
do_a_write.send(()).unwrap();
4854-
do_a_write.send(()).unwrap();
4855-
});
4856-
4857-
thrd4.join().unwrap();
4858-
thrd.join().unwrap();
4859-
4860-
// At the end, we should have 7 ChannelMonitorUpdates - 6 for HTLC claims, and one for the
4861-
// above `revoke_and_ack`.
4862-
check_added_monitors(&nodes[8], 7);
4863-
4864-
// Now drive everything to the end, at least as far as node 7 is concerned...
4865-
*nodes[8].chain_monitor.write_blocker.lock().unwrap() = None;
4866-
nodes[8].node.handle_commitment_signed_batch_test(node_h_id, &cs);
4715+
// All six parts are on the same channel, so claiming should produce a single batched
4716+
// ChannelMonitorUpdate containing all 6 preimages and one commitment.
4717+
nodes[8].node.claim_funds(payment_preimage);
4718+
expect_payment_claimed!(nodes[8], payment_hash, 50_000_000);
48674719
check_added_monitors(&nodes[8], 1);
48684720

4869-
let (mut updates, raa) = get_updates_and_revoke(&nodes[8], &node_h_id);
4870-
4871-
nodes[7].node.handle_update_fulfill_htlc(node_i_id, updates.update_fulfill_htlcs.remove(0));
4872-
expect_payment_forwarded!(nodes[7], nodes[2], nodes[8], Some(1000), false, false);
4873-
nodes[7].node.handle_update_fulfill_htlc(node_i_id, updates.update_fulfill_htlcs.remove(0));
4874-
expect_payment_forwarded!(nodes[7], nodes[3], nodes[8], Some(1000), false, false);
4875-
let mut next_source = 4;
4876-
if let Some(update) = updates.update_fulfill_htlcs.get(0) {
4877-
nodes[7].node.handle_update_fulfill_htlc(node_i_id, update.clone());
4878-
expect_payment_forwarded!(nodes[7], nodes[4], nodes[8], Some(1000), false, false);
4879-
next_source += 1;
4721+
let mut first_updates = get_htlc_update_msgs(&nodes[8], &node_h_id);
4722+
assert_eq!(first_updates.update_fulfill_htlcs.len(), 6);
4723+
4724+
// Disconnect node 7 from intermediate nodes so it doesn't bother forwarding back.
4725+
nodes[7].node.peer_disconnected(nodes[1].node.get_our_node_id());
4726+
nodes[7].node.peer_disconnected(nodes[2].node.get_our_node_id());
4727+
nodes[7].node.peer_disconnected(nodes[3].node.get_our_node_id());
4728+
nodes[7].node.peer_disconnected(nodes[4].node.get_our_node_id());
4729+
nodes[7].node.peer_disconnected(nodes[5].node.get_our_node_id());
4730+
nodes[7].node.peer_disconnected(nodes[6].node.get_our_node_id());
4731+
4732+
// Deliver all 6 fulfills to node 7 before handling the commitment_signed.
4733+
// Each handle_update_fulfill_htlc triggers claim_funds_internal on node 7's upstream
4734+
// channels (which are disconnected), generating a preimage monitor update + PaymentForwarded.
4735+
for fulfill in first_updates.update_fulfill_htlcs.drain(..) {
4736+
nodes[7].node.handle_update_fulfill_htlc(node_i_id, fulfill);
4737+
check_added_monitors(&nodes[7], 1);
48804738
}
4881-
4882-
nodes[7].node.handle_commitment_signed_batch_test(node_i_id, &updates.commitment_signed);
4883-
nodes[7].node.handle_revoke_and_ack(node_i_id, &raa);
4884-
if updates.update_fulfill_htlcs.get(0).is_some() {
4885-
check_added_monitors(&nodes[7], 5);
4886-
} else {
4887-
check_added_monitors(&nodes[7], 4);
4739+
let events = nodes[7].node.get_and_clear_pending_events();
4740+
assert_eq!(events.len(), 6);
4741+
for event in events {
4742+
match event {
4743+
Event::PaymentForwarded { .. } => {},
4744+
_ => panic!("Unexpected event {:?}", event),
4745+
}
48884746
}
4889-
4747+
nodes[7].node.handle_commitment_signed_batch_test(node_i_id, &first_updates.commitment_signed);
4748+
check_added_monitors(&nodes[7], 1);
48904749
let (raa, cs) = get_revoke_commit_msgs(&nodes[7], &node_i_id);
48914750

48924751
nodes[8].node.handle_revoke_and_ack(node_h_id, &raa);
4752+
check_added_monitors(&nodes[8], 1);
48934753
nodes[8].node.handle_commitment_signed_batch_test(node_h_id, &cs);
4894-
check_added_monitors(&nodes[8], 2);
4895-
4896-
let (mut updates, raa) = get_updates_and_revoke(&nodes[8], &node_h_id);
4897-
4898-
nodes[7].node.handle_update_fulfill_htlc(node_i_id, updates.update_fulfill_htlcs.remove(0));
4899-
expect_payment_forwarded!(nodes[7], nodes[next_source], nodes[8], Some(1000), false, false);
4900-
next_source += 1;
4901-
nodes[7].node.handle_update_fulfill_htlc(node_i_id, updates.update_fulfill_htlcs.remove(0));
4902-
expect_payment_forwarded!(nodes[7], nodes[next_source], nodes[8], Some(1000), false, false);
4903-
next_source += 1;
4904-
if let Some(update) = updates.update_fulfill_htlcs.get(0) {
4905-
nodes[7].node.handle_update_fulfill_htlc(node_i_id, update.clone());
4906-
expect_payment_forwarded!(nodes[7], nodes[next_source], nodes[8], Some(1000), false, false);
4907-
}
4908-
4909-
nodes[7].node.handle_commitment_signed_batch_test(node_i_id, &updates.commitment_signed);
4910-
nodes[7].node.handle_revoke_and_ack(node_i_id, &raa);
4911-
if updates.update_fulfill_htlcs.get(0).is_some() {
4912-
check_added_monitors(&nodes[7], 5);
4913-
} else {
4914-
check_added_monitors(&nodes[7], 4);
4915-
}
4916-
4917-
let (raa, cs) = get_revoke_commit_msgs(&nodes[7], &node_i_id);
4918-
nodes[8].node.handle_revoke_and_ack(node_h_id, &raa);
4919-
nodes[8].node.handle_commitment_signed_batch_test(node_h_id, &cs);
4920-
check_added_monitors(&nodes[8], 2);
4754+
check_added_monitors(&nodes[8], 1);
49214755

49224756
let raa = get_event_msg!(nodes[8], MessageSendEvent::SendRevokeAndACK, node_h_id);
49234757
nodes[7].node.handle_revoke_and_ack(node_i_id, &raa);

0 commit comments

Comments
 (0)