diff --git a/fuzz/src/chanmon_consistency.rs b/fuzz/src/chanmon_consistency.rs index e0b038c30cd..8b41c53a635 100644 --- a/fuzz/src/chanmon_consistency.rs +++ b/fuzz/src/chanmon_consistency.rs @@ -80,8 +80,9 @@ use bitcoin::secp256k1::ecdsa::{RecoverableSignature, Signature}; use bitcoin::secp256k1::schnorr; use bitcoin::secp256k1::{self, Message, PublicKey, Scalar, Secp256k1, SecretKey}; -use lightning::io::Cursor; use lightning::util::dyn_signer::DynSigner; + +use std::cell::RefCell; use std::cmp::{self, Ordering}; use std::mem; use std::sync::atomic; @@ -251,7 +252,7 @@ impl chain::Watch for TestChainMonitor { .unwrap_or(&map_entry.persisted_monitor); let deserialized_monitor = <(BlockHash, channelmonitor::ChannelMonitor)>::read( - &mut Cursor::new(&latest_monitor_data), + &mut &latest_monitor_data[..], (&*self.keys, &*self.keys), ) .unwrap() @@ -674,81 +675,100 @@ pub fn do_test(data: &[u8], underlying_out: Out, anchors: bool) { }}; } - macro_rules! reload_node { - ($ser: expr, $node_id: expr, $old_monitors: expr, $keys_manager: expr, $fee_estimator: expr) => {{ - let keys_manager = Arc::clone(&$keys_manager); - let logger: Arc = - Arc::new(test_logger::TestLogger::new($node_id.to_string(), out.clone())); - let chain_monitor = Arc::new(TestChainMonitor::new( - broadcast.clone(), - logger.clone(), - $fee_estimator.clone(), - Arc::new(TestPersister { - update_ret: Mutex::new(ChannelMonitorUpdateStatus::Completed), - }), - Arc::clone(&$keys_manager), - )); - - let mut config = UserConfig::default(); - config.channel_config.forwarding_fee_proportional_millionths = 0; - config.channel_handshake_config.announce_for_forwarding = true; - if anchors { - config.channel_handshake_config.negotiate_anchors_zero_fee_htlc_tx = true; - config.manually_accept_inbound_channels = true; - } - - let mut monitors = new_hash_map(); - let mut old_monitors = $old_monitors.latest_monitors.lock().unwrap(); - for (channel_id, mut prev_state) in old_monitors.drain() { - monitors.insert( - channel_id, - <(BlockHash, ChannelMonitor)>::read( - &mut Cursor::new(&prev_state.persisted_monitor), - (&*$keys_manager, &*$keys_manager), - ) - .expect("Failed to read monitor") - .1, - ); - // Wipe any `ChannelMonitor`s which we never told LDK we finished persisting, - // considering them discarded. LDK should replay these for us as they're stored in - // the `ChannelManager`. - prev_state.pending_monitors.clear(); - chain_monitor.latest_monitors.lock().unwrap().insert(channel_id, prev_state); - } - let mut monitor_refs = new_hash_map(); - for (channel_id, monitor) in monitors.iter() { - monitor_refs.insert(*channel_id, monitor); - } + let default_mon_style = RefCell::new(ChannelMonitorUpdateStatus::Completed); + let mon_style = [default_mon_style.clone(), default_mon_style.clone(), default_mon_style]; + + let reload_node = |ser: &Vec, + node_id: u8, + old_monitors: &TestChainMonitor, + mut use_old_mons, + keys, + fee_estimator| { + let keys_manager = Arc::clone(keys); + let logger: Arc = + Arc::new(test_logger::TestLogger::new(node_id.to_string(), out.clone())); + let chain_monitor = Arc::new(TestChainMonitor::new( + broadcast.clone(), + logger.clone(), + Arc::clone(fee_estimator), + Arc::new(TestPersister { + update_ret: Mutex::new(ChannelMonitorUpdateStatus::Completed), + }), + Arc::clone(keys), + )); + + let mut config = UserConfig::default(); + config.channel_config.forwarding_fee_proportional_millionths = 0; + config.channel_handshake_config.announce_for_forwarding = true; + if anchors { + config.channel_handshake_config.negotiate_anchors_zero_fee_htlc_tx = true; + config.manually_accept_inbound_channels = true; + } - let read_args = ChannelManagerReadArgs { - entropy_source: keys_manager.clone(), - node_signer: keys_manager.clone(), - signer_provider: keys_manager.clone(), - fee_estimator: $fee_estimator.clone(), - chain_monitor: chain_monitor.clone(), - tx_broadcaster: broadcast.clone(), - router: &router, - message_router: &router, - logger, - default_config: config, - channel_monitors: monitor_refs, + let mut monitors = new_hash_map(); + let mut old_monitors = old_monitors.latest_monitors.lock().unwrap(); + for (channel_id, mut prev_state) in old_monitors.drain() { + let serialized_mon = if use_old_mons % 3 == 0 { + // Reload with the oldest `ChannelMonitor` (the one that we already told + // `ChannelManager` we finished persisting). + prev_state.persisted_monitor + } else if use_old_mons % 3 == 1 { + // Reload with the second-oldest `ChannelMonitor` + let old_mon = prev_state.persisted_monitor; + prev_state.pending_monitors.drain(..).next().map(|(_, v)| v).unwrap_or(old_mon) + } else { + // Reload with the newest `ChannelMonitor` + let old_mon = prev_state.persisted_monitor; + prev_state.pending_monitors.pop().map(|(_, v)| v).unwrap_or(old_mon) }; + // Use a different value of `use_old_mons` if we have another monitor (only for node B) + // by shifting `use_old_mons` one in base-3. + use_old_mons /= 3; + let mon = <(BlockHash, ChannelMonitor)>::read( + &mut &serialized_mon[..], + (&**keys, &**keys), + ) + .expect("Failed to read monitor"); + monitors.insert(channel_id, mon.1); + // Update the latest `ChannelMonitor` state to match what we just told LDK. + prev_state.persisted_monitor = serialized_mon; + // Wipe any `ChannelMonitor`s which we never told LDK we finished persisting, + // considering them discarded. LDK should replay these for us as they're stored in + // the `ChannelManager`. + prev_state.pending_monitors.clear(); + chain_monitor.latest_monitors.lock().unwrap().insert(channel_id, prev_state); + } + let mut monitor_refs = new_hash_map(); + for (channel_id, monitor) in monitors.iter() { + monitor_refs.insert(*channel_id, monitor); + } + + let read_args = ChannelManagerReadArgs { + entropy_source: Arc::clone(&keys_manager), + node_signer: Arc::clone(&keys_manager), + signer_provider: keys_manager, + fee_estimator: Arc::clone(fee_estimator), + chain_monitor: chain_monitor.clone(), + tx_broadcaster: broadcast.clone(), + router: &router, + message_router: &router, + logger, + default_config: config, + channel_monitors: monitor_refs, + }; - let res = ( - <(BlockHash, ChanMan)>::read(&mut Cursor::new(&$ser.0), read_args) - .expect("Failed to read manager") - .1, - chain_monitor.clone(), + let manager = + <(BlockHash, ChanMan)>::read(&mut &ser[..], read_args).expect("Failed to read manager"); + let res = (manager.1, chain_monitor.clone()); + for (channel_id, mon) in monitors.drain() { + assert_eq!( + chain_monitor.chain_monitor.watch_channel(channel_id, mon), + Ok(ChannelMonitorUpdateStatus::Completed) ); - for (channel_id, mon) in monitors.drain() { - assert_eq!( - chain_monitor.chain_monitor.watch_channel(channel_id, mon), - Ok(ChannelMonitorUpdateStatus::Completed) - ); - } - res - }}; - } + } + *chain_monitor.persister.update_ret.lock().unwrap() = *mon_style[node_id as usize].borrow(); + res + }; let mut channel_txn = Vec::new(); macro_rules! make_channel { @@ -978,12 +998,9 @@ pub fn do_test(data: &[u8], underlying_out: Out, anchors: bool) { let mut bc_events = Vec::new(); let mut cb_events = Vec::new(); - let mut node_a_ser = VecWriter(Vec::new()); - nodes[0].write(&mut node_a_ser).unwrap(); - let mut node_b_ser = VecWriter(Vec::new()); - nodes[1].write(&mut node_b_ser).unwrap(); - let mut node_c_ser = VecWriter(Vec::new()); - nodes[2].write(&mut node_c_ser).unwrap(); + let mut node_a_ser = nodes[0].encode(); + let mut node_b_ser = nodes[1].encode(); + let mut node_c_ser = nodes[2].encode(); macro_rules! test_return { () => {{ @@ -1393,28 +1410,22 @@ pub fn do_test(data: &[u8], underlying_out: Out, anchors: bool) { // bit-twiddling mutations to have similar effects. This is probably overkill, but no // harm in doing so. 0x00 => { - *monitor_a.persister.update_ret.lock().unwrap() = - ChannelMonitorUpdateStatus::InProgress + *mon_style[0].borrow_mut() = ChannelMonitorUpdateStatus::InProgress; }, 0x01 => { - *monitor_b.persister.update_ret.lock().unwrap() = - ChannelMonitorUpdateStatus::InProgress + *mon_style[1].borrow_mut() = ChannelMonitorUpdateStatus::InProgress; }, 0x02 => { - *monitor_c.persister.update_ret.lock().unwrap() = - ChannelMonitorUpdateStatus::InProgress + *mon_style[2].borrow_mut() = ChannelMonitorUpdateStatus::InProgress; }, 0x04 => { - *monitor_a.persister.update_ret.lock().unwrap() = - ChannelMonitorUpdateStatus::Completed + *mon_style[0].borrow_mut() = ChannelMonitorUpdateStatus::Completed; }, 0x05 => { - *monitor_b.persister.update_ret.lock().unwrap() = - ChannelMonitorUpdateStatus::Completed + *mon_style[1].borrow_mut() = ChannelMonitorUpdateStatus::Completed; }, 0x06 => { - *monitor_c.persister.update_ret.lock().unwrap() = - ChannelMonitorUpdateStatus::Completed + *mon_style[2].borrow_mut() = ChannelMonitorUpdateStatus::Completed; }, 0x08 => complete_all_monitor_updates(&monitor_a, &chan_1_id), @@ -1503,59 +1514,6 @@ pub fn do_test(data: &[u8], underlying_out: Out, anchors: bool) { 0x26 => process_ev_noret!(2, true), 0x27 => process_ev_noret!(2, false), - 0x2c => { - if !chan_a_disconnected { - nodes[1].peer_disconnected(nodes[0].get_our_node_id()); - chan_a_disconnected = true; - push_excess_b_events!( - nodes[1].get_and_clear_pending_msg_events().drain(..), - Some(0) - ); - ab_events.clear(); - ba_events.clear(); - } - let (new_node_a, new_monitor_a) = - reload_node!(node_a_ser, 0, monitor_a, keys_manager_a, fee_est_a); - nodes[0] = new_node_a; - monitor_a = new_monitor_a; - }, - 0x2d => { - if !chan_a_disconnected { - nodes[0].peer_disconnected(nodes[1].get_our_node_id()); - chan_a_disconnected = true; - nodes[0].get_and_clear_pending_msg_events(); - ab_events.clear(); - ba_events.clear(); - } - if !chan_b_disconnected { - nodes[2].peer_disconnected(nodes[1].get_our_node_id()); - chan_b_disconnected = true; - nodes[2].get_and_clear_pending_msg_events(); - bc_events.clear(); - cb_events.clear(); - } - let (new_node_b, new_monitor_b) = - reload_node!(node_b_ser, 1, monitor_b, keys_manager_b, fee_est_b); - nodes[1] = new_node_b; - monitor_b = new_monitor_b; - }, - 0x2e => { - if !chan_b_disconnected { - nodes[1].peer_disconnected(nodes[2].get_our_node_id()); - chan_b_disconnected = true; - push_excess_b_events!( - nodes[1].get_and_clear_pending_msg_events().drain(..), - Some(2) - ); - bc_events.clear(); - cb_events.clear(); - } - let (new_node_c, new_monitor_c) = - reload_node!(node_c_ser, 2, monitor_c, keys_manager_c, fee_est_c); - nodes[2] = new_node_c; - monitor_c = new_monitor_c; - }, - // 1/10th the channel size: 0x30 => send_noret(&nodes[0], &nodes[1], chan_a, 10_000_000, &mut p_id, &mut p_idx), 0x31 => send_noret(&nodes[1], &nodes[0], chan_a, 10_000_000, &mut p_id, &mut p_idx), @@ -1703,6 +1661,65 @@ pub fn do_test(data: &[u8], underlying_out: Out, anchors: bool) { nodes[2].maybe_propose_quiescence(&nodes[1].get_our_node_id(), &chan_b_id).unwrap() }, + 0xb0 | 0xb1 | 0xb2 => { + // Restart node A, picking among the in-flight `ChannelMonitor`s to use based on + // the value of `v` we're matching. + if !chan_a_disconnected { + nodes[1].peer_disconnected(nodes[0].get_our_node_id()); + chan_a_disconnected = true; + push_excess_b_events!( + nodes[1].get_and_clear_pending_msg_events().drain(..), + Some(0) + ); + ab_events.clear(); + ba_events.clear(); + } + let (new_node_a, new_monitor_a) = + reload_node(&node_a_ser, 0, &monitor_a, v, &keys_manager_a, &fee_est_a); + nodes[0] = new_node_a; + monitor_a = new_monitor_a; + }, + 0xb3..=0xbb => { + // Restart node B, picking among the in-flight `ChannelMonitor`s to use based on + // the value of `v` we're matching. + if !chan_a_disconnected { + nodes[0].peer_disconnected(nodes[1].get_our_node_id()); + chan_a_disconnected = true; + nodes[0].get_and_clear_pending_msg_events(); + ab_events.clear(); + ba_events.clear(); + } + if !chan_b_disconnected { + nodes[2].peer_disconnected(nodes[1].get_our_node_id()); + chan_b_disconnected = true; + nodes[2].get_and_clear_pending_msg_events(); + bc_events.clear(); + cb_events.clear(); + } + let (new_node_b, new_monitor_b) = + reload_node(&node_b_ser, 1, &monitor_b, v, &keys_manager_b, &fee_est_b); + nodes[1] = new_node_b; + monitor_b = new_monitor_b; + }, + 0xbc | 0xbd | 0xbe => { + // Restart node C, picking among the in-flight `ChannelMonitor`s to use based on + // the value of `v` we're matching. + if !chan_b_disconnected { + nodes[1].peer_disconnected(nodes[2].get_our_node_id()); + chan_b_disconnected = true; + push_excess_b_events!( + nodes[1].get_and_clear_pending_msg_events().drain(..), + Some(2) + ); + bc_events.clear(); + cb_events.clear(); + } + let (new_node_c, new_monitor_c) = + reload_node(&node_c_ser, 2, &monitor_c, v, &keys_manager_c, &fee_est_c); + nodes[2] = new_node_c; + monitor_c = new_monitor_c; + }, + 0xf0 => complete_monitor_update(&monitor_a, &chan_1_id, &complete_first), 0xf1 => complete_monitor_update(&monitor_a, &chan_1_id, &complete_second), 0xf2 => complete_monitor_update(&monitor_a, &chan_1_id, &Vec::pop), @@ -1722,21 +1739,8 @@ pub fn do_test(data: &[u8], underlying_out: Out, anchors: bool) { 0xff => { // Test that no channel is in a stuck state where neither party can send funds even // after we resolve all pending events. - // First make sure there are no pending monitor updates and further update - // operations complete. - *monitor_a.persister.update_ret.lock().unwrap() = - ChannelMonitorUpdateStatus::Completed; - *monitor_b.persister.update_ret.lock().unwrap() = - ChannelMonitorUpdateStatus::Completed; - *monitor_c.persister.update_ret.lock().unwrap() = - ChannelMonitorUpdateStatus::Completed; - - complete_all_monitor_updates(&monitor_a, &chan_1_id); - complete_all_monitor_updates(&monitor_b, &chan_1_id); - complete_all_monitor_updates(&monitor_b, &chan_2_id); - complete_all_monitor_updates(&monitor_c, &chan_2_id); - - // Next, make sure peers are all connected to each other + + // First, make sure peers are all connected to each other if chan_a_disconnected { let init_1 = Init { features: nodes[1].init_features(), @@ -1769,42 +1773,65 @@ pub fn do_test(data: &[u8], underlying_out: Out, anchors: bool) { } macro_rules! process_all_events { - () => { + () => { { + let mut last_pass_no_updates = false; for i in 0..std::usize::MAX { if i == 100 { panic!("It may take may iterations to settle the state, but it should not take forever"); } + // Next, make sure no monitor updates are pending + complete_all_monitor_updates(&monitor_a, &chan_1_id); + complete_all_monitor_updates(&monitor_b, &chan_1_id); + complete_all_monitor_updates(&monitor_b, &chan_2_id); + complete_all_monitor_updates(&monitor_c, &chan_2_id); // Then, make sure any current forwards make their way to their destination if process_msg_events!(0, false, ProcessMessages::AllMessages) { + last_pass_no_updates = false; continue; } if process_msg_events!(1, false, ProcessMessages::AllMessages) { + last_pass_no_updates = false; continue; } if process_msg_events!(2, false, ProcessMessages::AllMessages) { + last_pass_no_updates = false; continue; } // ...making sure any pending PendingHTLCsForwardable events are handled and // payments claimed. if process_events!(0, false) { + last_pass_no_updates = false; continue; } if process_events!(1, false) { + last_pass_no_updates = false; continue; } if process_events!(2, false) { + last_pass_no_updates = false; continue; } - break; + if last_pass_no_updates { + // In some cases, we may generate a message to send in + // `process_msg_events`, but block sending until + // `complete_all_monitor_updates` gets called on the next + // iteration. + // + // Thus, we only exit if we manage two iterations with no messages + // or events to process. + break; + } + last_pass_no_updates = true; } - }; + } }; } - // At this point, we may be pending quiescence, so we'll process all messages to - // ensure we can complete its handshake. We'll then exit quiescence and process all - // messages again, to resolve any pending HTLCs (only irrevocably committed ones) - // before attempting to send more payments. + // We may be pending quiescence, so first process all messages to ensure we can + // complete the quiescence handshake. process_all_events!(); + + // Then exit quiescence and process all messages again, to resolve any pending + // HTLCs (only irrevocably committed ones) before attempting to send more payments. nodes[0].exit_quiescence(&nodes[1].get_our_node_id(), &chan_a_id).unwrap(); nodes[1].exit_quiescence(&nodes[0].get_our_node_id(), &chan_a_id).unwrap(); nodes[1].exit_quiescence(&nodes[2].get_our_node_id(), &chan_b_id).unwrap(); @@ -1833,16 +1860,13 @@ pub fn do_test(data: &[u8], underlying_out: Out, anchors: bool) { } if nodes[0].get_and_clear_needs_persistence() == true { - node_a_ser.0.clear(); - nodes[0].write(&mut node_a_ser).unwrap(); + node_a_ser = nodes[0].encode(); } if nodes[1].get_and_clear_needs_persistence() == true { - node_b_ser.0.clear(); - nodes[1].write(&mut node_b_ser).unwrap(); + node_b_ser = nodes[1].encode(); } if nodes[2].get_and_clear_needs_persistence() == true { - node_c_ser.0.clear(); - nodes[2].write(&mut node_c_ser).unwrap(); + node_c_ser = nodes[2].encode(); } } } diff --git a/lightning/src/chain/mod.rs b/lightning/src/chain/mod.rs index 21fceb09cab..65a8383ca96 100644 --- a/lightning/src/chain/mod.rs +++ b/lightning/src/chain/mod.rs @@ -209,6 +209,12 @@ pub enum ChannelMonitorUpdateStatus { /// /// This includes performing any `fsync()` calls required to ensure the update is guaranteed to /// be available on restart even if the application crashes. + /// + /// If you return this variant, you cannot later return [`InProgress`] from the same instance of + /// [`Persist`]/[`Watch`] without first restarting. + /// + /// [`InProgress`]: ChannelMonitorUpdateStatus::InProgress + /// [`Persist`]: chainmonitor::Persist Completed, /// Indicates that the update will happen asynchronously in the background or that a transient /// failure occurred which is being retried in the background and will eventually complete. @@ -234,7 +240,12 @@ pub enum ChannelMonitorUpdateStatus { /// reliable, this feature is considered beta, and a handful of edge-cases remain. Until the /// remaining cases are fixed, in rare cases, *using this feature may lead to funds loss*. /// + /// If you return this variant, you cannot later return [`Completed`] from the same instance of + /// [`Persist`]/[`Watch`] without first restarting. + /// /// [`InProgress`]: ChannelMonitorUpdateStatus::InProgress + /// [`Completed`]: ChannelMonitorUpdateStatus::Completed + /// [`Persist`]: chainmonitor::Persist InProgress, /// Indicates that an update has failed and will not complete at any point in the future. /// diff --git a/lightning/src/ln/chanmon_update_fail_tests.rs b/lightning/src/ln/chanmon_update_fail_tests.rs index 1192a14e2b9..019afbd05c8 100644 --- a/lightning/src/ln/chanmon_update_fail_tests.rs +++ b/lightning/src/ln/chanmon_update_fail_tests.rs @@ -3341,93 +3341,6 @@ fn test_durable_preimages_on_closed_channel() { do_test_durable_preimages_on_closed_channel(false, false, false); } -#[test] -fn test_sync_async_persist_doesnt_hang() { - // Previously, we checked if a channel was a candidate for making forward progress based on if - // the `MonitorEvent::Completed` matched the channel's latest monitor update id. However, this - // could lead to a rare race when `ChannelMonitor`s were being persisted both synchronously and - // asynchronously leading to channel hangs. - // - // To hit this case, we need to generate a `MonitorEvent::Completed` prior to a new channel - // update, but which is only processed after the channel update. - let chanmon_cfgs = create_chanmon_cfgs(2); - let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); - let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); - let nodes = create_network(2, &node_cfgs, &node_chanmgrs); - - let chan_id_ab = create_announced_chan_between_nodes(&nodes, 0, 1).2; - - // Send two payments from A to B, then claim the first, marking the very last - // ChannelMonitorUpdate as InProgress... - let (payment_preimage_1, payment_hash_1, ..) = route_payment(&nodes[0], &[&nodes[1]], 1_000_000); - let (payment_preimage_2, payment_hash_2, ..) = route_payment(&nodes[0], &[&nodes[1]], 1_000_000); - - nodes[1].node.claim_funds(payment_preimage_1); - check_added_monitors(&nodes[1], 1); - expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000); - - let bs_updates = get_htlc_update_msgs(&nodes[1], &nodes[0].node.get_our_node_id()); - nodes[0].node.handle_update_fulfill_htlc(nodes[1].node.get_our_node_id(), &bs_updates.update_fulfill_htlcs[0]); - expect_payment_sent(&nodes[0], payment_preimage_1, None, false, false); - nodes[0].node.handle_commitment_signed_batch_test(nodes[1].node.get_our_node_id(), &bs_updates.commitment_signed); - check_added_monitors(&nodes[0], 1); - let (as_raa, as_cs) = get_revoke_commit_msgs!(nodes[0], nodes[1].node.get_our_node_id()); - - nodes[1].node.handle_revoke_and_ack(nodes[0].node.get_our_node_id(), &as_raa); - check_added_monitors(&nodes[1], 1); - nodes[1].node.handle_commitment_signed_batch_test(nodes[0].node.get_our_node_id(), &as_cs); - check_added_monitors(&nodes[1], 1); - - let bs_final_raa = get_event_msg!(nodes[1], MessageSendEvent::SendRevokeAndACK, nodes[0].node.get_our_node_id()); - chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress); - nodes[0].node.handle_revoke_and_ack(nodes[1].node.get_our_node_id(), &bs_final_raa); - check_added_monitors(&nodes[0], 1); - - // Immediately complete the monitor update, but before the ChannelManager has a chance to see - // the MonitorEvent::Completed, create a channel update by receiving a claim on the second - // payment. - let (_, ab_update_id) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_id_ab).unwrap().clone(); - nodes[0].chain_monitor.chain_monitor.channel_monitor_updated(chan_id_ab, ab_update_id).unwrap(); - - nodes[1].node.claim_funds(payment_preimage_2); - check_added_monitors(&nodes[1], 1); - expect_payment_claimed!(nodes[1], payment_hash_2, 1_000_000); - - let bs_updates = get_htlc_update_msgs(&nodes[1], &nodes[0].node.get_our_node_id()); - nodes[0].node.handle_update_fulfill_htlc(nodes[1].node.get_our_node_id(), &bs_updates.update_fulfill_htlcs[0]); - nodes[0].node.handle_commitment_signed_batch_test(nodes[1].node.get_our_node_id(), &bs_updates.commitment_signed); - check_added_monitors(&nodes[0], 1); - - // At this point, we have completed an extra `ChannelMonitorUpdate` but the `ChannelManager` - // hasn't yet seen our `MonitorEvent::Completed`. When we call - // `get_and_clear_pending_msg_events` here, the `ChannelManager` finally sees that event and - // should return the channel to normal operation. - let (as_raa, as_cs) = get_revoke_commit_msgs!(nodes[0], nodes[1].node.get_our_node_id()); - - // Now that we've completed our test, process the events we have queued up (which we were not - // able to check until now as they would have caused the `ChannelManager` to look at the - // pending `MonitorEvent`s). - let pending_events = nodes[0].node.get_and_clear_pending_events(); - assert_eq!(pending_events.len(), 2); - if let Event::PaymentPathSuccessful { ref payment_hash, ..} = pending_events[1] { - assert_eq!(payment_hash.unwrap(), payment_hash_1); - } else { panic!(); } - if let Event::PaymentSent { ref payment_hash, ..} = pending_events[0] { - assert_eq!(*payment_hash, payment_hash_2); - } else { panic!(); } - - // Finally, complete the claiming of the second payment - nodes[1].node.handle_revoke_and_ack(nodes[0].node.get_our_node_id(), &as_raa); - check_added_monitors(&nodes[1], 1); - nodes[1].node.handle_commitment_signed_batch_test(nodes[0].node.get_our_node_id(), &as_cs); - check_added_monitors(&nodes[1], 1); - - let bs_final_raa = get_event_msg!(nodes[1], MessageSendEvent::SendRevokeAndACK, nodes[0].node.get_our_node_id()); - nodes[0].node.handle_revoke_and_ack(nodes[1].node.get_our_node_id(), &bs_final_raa); - check_added_monitors(&nodes[0], 1); - expect_payment_path_successful!(nodes[0]); -} - fn do_test_reload_mon_update_completion_actions(close_during_reload: bool) { // Test that if a `ChannelMonitorUpdate` completes but a `ChannelManager` isn't serialized // before restart we run the monitor update completion action on startup. diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index d0a73e89992..34751bfc96f 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -2569,6 +2569,13 @@ where #[cfg(any(test, feature = "_test_utils"))] pub(super) per_peer_state: FairRwLock>>>, + /// We only support using one of [`ChannelMonitorUpdateStatus::InProgress`] and + /// [`ChannelMonitorUpdateStatus::Completed`] without restarting. Because the API does not + /// otherwise directly enforce this, we enforce it in non-test builds here by storing which one + /// is in use. + #[cfg(not(any(test, feature = "_externalize_tests")))] + monitor_update_type: AtomicUsize, + /// The set of events which we need to give to the user to handle. In some cases an event may /// require some further action after the user handles it (currently only blocking a monitor /// update from being handed to the user to ensure the included changes to the channel state @@ -3312,11 +3319,19 @@ macro_rules! handle_new_monitor_update { panic!("{}", err_str); }, ChannelMonitorUpdateStatus::InProgress => { + #[cfg(not(any(test, feature = "_externalize_tests")))] + if $self.monitor_update_type.swap(1, Ordering::Relaxed) == 2 { + panic!("Cannot use both ChannelMonitorUpdateStatus modes InProgress and Completed without restart"); + } log_debug!($logger, "ChannelMonitor update for {} in flight, holding messages until the update completes.", $channel_id); false }, ChannelMonitorUpdateStatus::Completed => { + #[cfg(not(any(test, feature = "_externalize_tests")))] + if $self.monitor_update_type.swap(2, Ordering::Relaxed) == 1 { + panic!("Cannot use both ChannelMonitorUpdateStatus modes InProgress and Completed without restart"); + } $completed; true }, @@ -3577,6 +3592,9 @@ where per_peer_state: FairRwLock::new(new_hash_map()), + #[cfg(not(any(test, feature = "_externalize_tests")))] + monitor_update_type: AtomicUsize::new(0), + pending_events: Mutex::new(VecDeque::new()), pending_events_processor: AtomicBool::new(false), pending_background_events: Mutex::new(Vec::new()), @@ -14747,6 +14765,9 @@ where per_peer_state: FairRwLock::new(per_peer_state), + #[cfg(not(any(test, feature = "_externalize_tests")))] + monitor_update_type: AtomicUsize::new(0), + pending_events: Mutex::new(pending_events_read), pending_events_processor: AtomicBool::new(false), pending_background_events: Mutex::new(pending_background_events),