Skip to content

Commit b5a6307

Browse files
authored
Merge pull request lightningdevkit#1023 from TheBlueMatt/2021-07-par-gossip-processing
2 parents 3cae233 + 46009a5 commit b5a6307

File tree

9 files changed

+747
-429
lines changed

9 files changed

+747
-429
lines changed

lightning-net-tokio/src/lib.rs

+32-5
Original file line numberDiff line numberDiff line change
@@ -120,11 +120,28 @@ struct Connection {
120120
id: u64,
121121
}
122122
impl Connection {
123+
async fn poll_event_process<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, mut event_receiver: mpsc::Receiver<()>) where
124+
CMH: ChannelMessageHandler + 'static + Send + Sync,
125+
RMH: RoutingMessageHandler + 'static + Send + Sync,
126+
L: Logger + 'static + ?Sized + Send + Sync,
127+
UMH: CustomMessageHandler + 'static + Send + Sync {
128+
loop {
129+
if event_receiver.recv().await.is_none() {
130+
return;
131+
}
132+
peer_manager.process_events();
133+
}
134+
}
135+
123136
async fn schedule_read<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, us: Arc<Mutex<Self>>, mut reader: io::ReadHalf<TcpStream>, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where
124-
CMH: ChannelMessageHandler + 'static,
125-
RMH: RoutingMessageHandler + 'static,
126-
L: Logger + 'static + ?Sized,
127-
UMH: CustomMessageHandler + 'static {
137+
CMH: ChannelMessageHandler + 'static + Send + Sync,
138+
RMH: RoutingMessageHandler + 'static + Send + Sync,
139+
L: Logger + 'static + ?Sized + Send + Sync,
140+
UMH: CustomMessageHandler + 'static + Send + Sync {
141+
// Create a waker to wake up poll_event_process, above
142+
let (event_waker, event_receiver) = mpsc::channel(1);
143+
tokio::spawn(Self::poll_event_process(Arc::clone(&peer_manager), event_receiver));
144+
128145
// 8KB is nice and big but also should never cause any issues with stack overflowing.
129146
let mut buf = [0; 8192];
130147

@@ -175,7 +192,14 @@ impl Connection {
175192
Err(_) => break Disconnect::PeerDisconnected,
176193
},
177194
}
178-
peer_manager.process_events();
195+
let _ = event_waker.try_send(());
196+
197+
// At this point we've processed a message or two, and reset the ping timer for this
198+
// peer, at least in the "are we still receiving messages" context, if we don't give up
199+
// our timeslice to another task we may just spin on this peer, starving other peers
200+
// and eventually disconnecting them for ping timeouts. Instead, we explicitly yield
201+
// here.
202+
tokio::task::yield_now().await;
179203
};
180204
let writer_option = us.lock().unwrap().writer.take();
181205
if let Some(mut writer) = writer_option {
@@ -443,6 +467,9 @@ impl peer_handler::SocketDescriptor for SocketDescriptor {
443467
// pause read given we're now waiting on the remote end to ACK (and in
444468
// accordance with the send_data() docs).
445469
us.read_paused = true;
470+
// Further, to avoid any current pending read causing a `read_event` call, wake
471+
// up the read_waker and restart its loop.
472+
let _ = us.read_waker.try_send(());
446473
return written_len;
447474
},
448475
}

lightning/src/debug_sync.rs

+2
Original file line numberDiff line numberDiff line change
@@ -362,3 +362,5 @@ fn read_write_lockorder_fail() {
362362
let _a = a.write().unwrap();
363363
}
364364
}
365+
366+
pub type FairRwLock<T> = RwLock<T>;

lightning/src/lib.rs

+2
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,8 @@ mod sync {
159159
pub use debug_sync::*;
160160
#[cfg(not(test))]
161161
pub use ::std::sync::{Arc, Mutex, Condvar, MutexGuard, RwLock, RwLockReadGuard};
162+
#[cfg(not(test))]
163+
pub use crate::util::fairrwlock::FairRwLock;
162164
}
163165

164166
#[cfg(not(feature = "std"))]

lightning/src/ln/peer_handler.rs

+533-395
Large diffs are not rendered by default.

lightning/src/ln/wire.rs

+35-9
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,26 @@ pub trait CustomMessageReader {
2828
fn read<R: io::Read>(&self, message_type: u16, buffer: &mut R) -> Result<Option<Self::CustomMessage>, msgs::DecodeError>;
2929
}
3030

31+
// TestEq is a dummy trait which requires PartialEq when built in testing, and otherwise is
32+
// blanket-implemented for all types.
33+
34+
#[cfg(test)]
35+
pub trait TestEq : PartialEq {}
36+
#[cfg(test)]
37+
impl<T: PartialEq> TestEq for T {}
38+
39+
#[cfg(not(test))]
40+
pub(crate) trait TestEq {}
41+
#[cfg(not(test))]
42+
impl<T> TestEq for T {}
43+
44+
3145
/// A Lightning message returned by [`read()`] when decoding bytes received over the wire. Each
3246
/// variant contains a message from [`msgs`] or otherwise the message type if unknown.
3347
#[allow(missing_docs)]
3448
#[derive(Debug)]
35-
pub(crate) enum Message<T> where T: core::fmt::Debug + Type {
49+
#[cfg_attr(test, derive(PartialEq))]
50+
pub(crate) enum Message<T> where T: core::fmt::Debug + Type + TestEq {
3651
Init(msgs::Init),
3752
Error(msgs::ErrorMessage),
3853
Warning(msgs::WarningMessage),
@@ -69,7 +84,7 @@ pub(crate) enum Message<T> where T: core::fmt::Debug + Type {
6984
Custom(T),
7085
}
7186

72-
impl<T> Message<T> where T: core::fmt::Debug + Type {
87+
impl<T> Message<T> where T: core::fmt::Debug + Type + TestEq {
7388
/// Returns the type that was used to decode the message payload.
7489
pub fn type_id(&self) -> u16 {
7590
match self {
@@ -252,6 +267,7 @@ mod encode {
252267

253268
pub(crate) use self::encode::Encode;
254269

270+
#[cfg(not(test))]
255271
/// Defines a type identifier for sending messages over the wire.
256272
///
257273
/// Messages implementing this trait specify a type and must be [`Writeable`].
@@ -260,10 +276,24 @@ pub trait Type: core::fmt::Debug + Writeable {
260276
fn type_id(&self) -> u16;
261277
}
262278

279+
#[cfg(test)]
280+
pub trait Type: core::fmt::Debug + Writeable + PartialEq {
281+
fn type_id(&self) -> u16;
282+
}
283+
284+
#[cfg(any(feature = "_test_utils", fuzzing, test))]
285+
impl Type for () {
286+
fn type_id(&self) -> u16 { unreachable!(); }
287+
}
288+
289+
#[cfg(test)]
290+
impl<T: core::fmt::Debug + Writeable + PartialEq> Type for T where T: Encode {
291+
fn type_id(&self) -> u16 { T::TYPE }
292+
}
293+
294+
#[cfg(not(test))]
263295
impl<T: core::fmt::Debug + Writeable> Type for T where T: Encode {
264-
fn type_id(&self) -> u16 {
265-
T::TYPE
266-
}
296+
fn type_id(&self) -> u16 { T::TYPE }
267297
}
268298

269299
impl Encode for msgs::Init {
@@ -471,10 +501,6 @@ mod tests {
471501
}
472502
}
473503

474-
impl Type for () {
475-
fn type_id(&self) -> u16 { unreachable!(); }
476-
}
477-
478504
#[test]
479505
fn is_even_message_type() {
480506
let message = Message::<()>::Unknown(42);

lightning/src/sync.rs

+2
Original file line numberDiff line numberDiff line change
@@ -113,3 +113,5 @@ impl<T> RwLock<T> {
113113
Err(())
114114
}
115115
}
116+
117+
pub type FairRwLock<T> = RwLock<T>;

lightning/src/util/fairrwlock.rs

+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
use std::sync::{TryLockResult, LockResult, RwLock, RwLockReadGuard, RwLockWriteGuard};
2+
use std::sync::atomic::{AtomicUsize, Ordering};
3+
4+
/// Rust libstd's RwLock does not provide any fairness guarantees (and, in fact, when used on
5+
/// Linux with pthreads under the hood, readers trivially and completely starve writers).
6+
/// Because we often hold read locks while doing message processing in multiple threads which
7+
/// can use significant CPU time, with write locks being time-sensitive but relatively small in
8+
/// CPU time, we can end up with starvation completely blocking incoming connections or pings,
9+
/// especially during initial graph sync.
10+
///
11+
/// Thus, we need to block readers when a writer is pending, which we do with a trivial RwLock
12+
/// wrapper here. Its not particularly optimized, but provides some reasonable fairness by
13+
/// blocking readers (by taking the write lock) if there are writers pending when we go to take
14+
/// a read lock.
15+
pub struct FairRwLock<T> {
16+
lock: RwLock<T>,
17+
waiting_writers: AtomicUsize,
18+
}
19+
20+
impl<T> FairRwLock<T> {
21+
pub fn new(t: T) -> Self {
22+
Self { lock: RwLock::new(t), waiting_writers: AtomicUsize::new(0) }
23+
}
24+
25+
// Note that all atomic accesses are relaxed, as we do not rely on the atomics here for any
26+
// ordering at all, instead relying on the underlying RwLock to provide ordering of unrelated
27+
// memory.
28+
pub fn write(&self) -> LockResult<RwLockWriteGuard<T>> {
29+
self.waiting_writers.fetch_add(1, Ordering::Relaxed);
30+
let res = self.lock.write();
31+
self.waiting_writers.fetch_sub(1, Ordering::Relaxed);
32+
res
33+
}
34+
35+
pub fn try_write(&self) -> TryLockResult<RwLockWriteGuard<T>> {
36+
self.lock.try_write()
37+
}
38+
39+
pub fn read(&self) -> LockResult<RwLockReadGuard<T>> {
40+
if self.waiting_writers.load(Ordering::Relaxed) != 0 {
41+
let _write_queue_lock = self.lock.write();
42+
}
43+
// Note that we don't consider ensuring that an underlying RwLock allowing writers to
44+
// starve readers doesn't exhibit the same behavior here. I'm not aware of any
45+
// libstd-backing RwLock which exhibits this behavior, and as documented in the
46+
// struct-level documentation, it shouldn't pose a significant issue for our current
47+
// codebase.
48+
self.lock.read()
49+
}
50+
}

lightning/src/util/mod.rs

+2
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ pub mod persist;
2525
pub(crate) mod atomic_counter;
2626
pub(crate) mod byte_utils;
2727
pub(crate) mod chacha20;
28+
#[cfg(feature = "std")]
29+
pub(crate) mod fairrwlock;
2830
#[cfg(fuzzing)]
2931
pub mod zbase32;
3032
#[cfg(not(fuzzing))]

lightning/src/util/test_utils.rs

+89-20
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use chain::channelmonitor::MonitorEvent;
1818
use chain::transaction::OutPoint;
1919
use chain::keysinterface;
2020
use ln::features::{ChannelFeatures, InitFeatures};
21-
use ln::msgs;
21+
use ln::{msgs, wire};
2222
use ln::msgs::OptionalField;
2323
use ln::script::ShutdownScript;
2424
use routing::scoring::FixedPenaltyScorer;
@@ -249,37 +249,106 @@ impl chaininterface::BroadcasterInterface for TestBroadcaster {
249249

250250
pub struct TestChannelMessageHandler {
251251
pub pending_events: Mutex<Vec<events::MessageSendEvent>>,
252+
expected_recv_msgs: Mutex<Option<Vec<wire::Message<()>>>>,
252253
}
253254

254255
impl TestChannelMessageHandler {
255256
pub fn new() -> Self {
256257
TestChannelMessageHandler {
257258
pending_events: Mutex::new(Vec::new()),
259+
expected_recv_msgs: Mutex::new(None),
260+
}
261+
}
262+
263+
#[cfg(test)]
264+
pub(crate) fn expect_receive_msg(&self, ev: wire::Message<()>) {
265+
let mut expected_msgs = self.expected_recv_msgs.lock().unwrap();
266+
if expected_msgs.is_none() { *expected_msgs = Some(Vec::new()); }
267+
expected_msgs.as_mut().unwrap().push(ev);
268+
}
269+
270+
fn received_msg(&self, ev: wire::Message<()>) {
271+
let mut msgs = self.expected_recv_msgs.lock().unwrap();
272+
if msgs.is_none() { return; }
273+
assert!(!msgs.as_ref().unwrap().is_empty(), "Received message when we weren't expecting one");
274+
#[cfg(test)]
275+
assert_eq!(msgs.as_ref().unwrap()[0], ev);
276+
msgs.as_mut().unwrap().remove(0);
277+
}
278+
}
279+
280+
impl Drop for TestChannelMessageHandler {
281+
fn drop(&mut self) {
282+
let l = self.expected_recv_msgs.lock().unwrap();
283+
#[cfg(feature = "std")]
284+
{
285+
if !std::thread::panicking() {
286+
assert!(l.is_none() || l.as_ref().unwrap().is_empty());
287+
}
258288
}
259289
}
260290
}
261291

262292
impl msgs::ChannelMessageHandler for TestChannelMessageHandler {
263-
fn handle_open_channel(&self, _their_node_id: &PublicKey, _their_features: InitFeatures, _msg: &msgs::OpenChannel) {}
264-
fn handle_accept_channel(&self, _their_node_id: &PublicKey, _their_features: InitFeatures, _msg: &msgs::AcceptChannel) {}
265-
fn handle_funding_created(&self, _their_node_id: &PublicKey, _msg: &msgs::FundingCreated) {}
266-
fn handle_funding_signed(&self, _their_node_id: &PublicKey, _msg: &msgs::FundingSigned) {}
267-
fn handle_funding_locked(&self, _their_node_id: &PublicKey, _msg: &msgs::FundingLocked) {}
268-
fn handle_shutdown(&self, _their_node_id: &PublicKey, _their_features: &InitFeatures, _msg: &msgs::Shutdown) {}
269-
fn handle_closing_signed(&self, _their_node_id: &PublicKey, _msg: &msgs::ClosingSigned) {}
270-
fn handle_update_add_htlc(&self, _their_node_id: &PublicKey, _msg: &msgs::UpdateAddHTLC) {}
271-
fn handle_update_fulfill_htlc(&self, _their_node_id: &PublicKey, _msg: &msgs::UpdateFulfillHTLC) {}
272-
fn handle_update_fail_htlc(&self, _their_node_id: &PublicKey, _msg: &msgs::UpdateFailHTLC) {}
273-
fn handle_update_fail_malformed_htlc(&self, _their_node_id: &PublicKey, _msg: &msgs::UpdateFailMalformedHTLC) {}
274-
fn handle_commitment_signed(&self, _their_node_id: &PublicKey, _msg: &msgs::CommitmentSigned) {}
275-
fn handle_revoke_and_ack(&self, _their_node_id: &PublicKey, _msg: &msgs::RevokeAndACK) {}
276-
fn handle_update_fee(&self, _their_node_id: &PublicKey, _msg: &msgs::UpdateFee) {}
277-
fn handle_channel_update(&self, _their_node_id: &PublicKey, _msg: &msgs::ChannelUpdate) {}
278-
fn handle_announcement_signatures(&self, _their_node_id: &PublicKey, _msg: &msgs::AnnouncementSignatures) {}
279-
fn handle_channel_reestablish(&self, _their_node_id: &PublicKey, _msg: &msgs::ChannelReestablish) {}
293+
fn handle_open_channel(&self, _their_node_id: &PublicKey, _their_features: InitFeatures, msg: &msgs::OpenChannel) {
294+
self.received_msg(wire::Message::OpenChannel(msg.clone()));
295+
}
296+
fn handle_accept_channel(&self, _their_node_id: &PublicKey, _their_features: InitFeatures, msg: &msgs::AcceptChannel) {
297+
self.received_msg(wire::Message::AcceptChannel(msg.clone()));
298+
}
299+
fn handle_funding_created(&self, _their_node_id: &PublicKey, msg: &msgs::FundingCreated) {
300+
self.received_msg(wire::Message::FundingCreated(msg.clone()));
301+
}
302+
fn handle_funding_signed(&self, _their_node_id: &PublicKey, msg: &msgs::FundingSigned) {
303+
self.received_msg(wire::Message::FundingSigned(msg.clone()));
304+
}
305+
fn handle_funding_locked(&self, _their_node_id: &PublicKey, msg: &msgs::FundingLocked) {
306+
self.received_msg(wire::Message::FundingLocked(msg.clone()));
307+
}
308+
fn handle_shutdown(&self, _their_node_id: &PublicKey, _their_features: &InitFeatures, msg: &msgs::Shutdown) {
309+
self.received_msg(wire::Message::Shutdown(msg.clone()));
310+
}
311+
fn handle_closing_signed(&self, _their_node_id: &PublicKey, msg: &msgs::ClosingSigned) {
312+
self.received_msg(wire::Message::ClosingSigned(msg.clone()));
313+
}
314+
fn handle_update_add_htlc(&self, _their_node_id: &PublicKey, msg: &msgs::UpdateAddHTLC) {
315+
self.received_msg(wire::Message::UpdateAddHTLC(msg.clone()));
316+
}
317+
fn handle_update_fulfill_htlc(&self, _their_node_id: &PublicKey, msg: &msgs::UpdateFulfillHTLC) {
318+
self.received_msg(wire::Message::UpdateFulfillHTLC(msg.clone()));
319+
}
320+
fn handle_update_fail_htlc(&self, _their_node_id: &PublicKey, msg: &msgs::UpdateFailHTLC) {
321+
self.received_msg(wire::Message::UpdateFailHTLC(msg.clone()));
322+
}
323+
fn handle_update_fail_malformed_htlc(&self, _their_node_id: &PublicKey, msg: &msgs::UpdateFailMalformedHTLC) {
324+
self.received_msg(wire::Message::UpdateFailMalformedHTLC(msg.clone()));
325+
}
326+
fn handle_commitment_signed(&self, _their_node_id: &PublicKey, msg: &msgs::CommitmentSigned) {
327+
self.received_msg(wire::Message::CommitmentSigned(msg.clone()));
328+
}
329+
fn handle_revoke_and_ack(&self, _their_node_id: &PublicKey, msg: &msgs::RevokeAndACK) {
330+
self.received_msg(wire::Message::RevokeAndACK(msg.clone()));
331+
}
332+
fn handle_update_fee(&self, _their_node_id: &PublicKey, msg: &msgs::UpdateFee) {
333+
self.received_msg(wire::Message::UpdateFee(msg.clone()));
334+
}
335+
fn handle_channel_update(&self, _their_node_id: &PublicKey, _msg: &msgs::ChannelUpdate) {
336+
// Don't call `received_msg` here as `TestRoutingMessageHandler` generates these sometimes
337+
}
338+
fn handle_announcement_signatures(&self, _their_node_id: &PublicKey, msg: &msgs::AnnouncementSignatures) {
339+
self.received_msg(wire::Message::AnnouncementSignatures(msg.clone()));
340+
}
341+
fn handle_channel_reestablish(&self, _their_node_id: &PublicKey, msg: &msgs::ChannelReestablish) {
342+
self.received_msg(wire::Message::ChannelReestablish(msg.clone()));
343+
}
280344
fn peer_disconnected(&self, _their_node_id: &PublicKey, _no_connection_possible: bool) {}
281-
fn peer_connected(&self, _their_node_id: &PublicKey, _msg: &msgs::Init) {}
282-
fn handle_error(&self, _their_node_id: &PublicKey, _msg: &msgs::ErrorMessage) {}
345+
fn peer_connected(&self, _their_node_id: &PublicKey, _msg: &msgs::Init) {
346+
// Don't bother with `received_msg` for Init as its auto-generated and we don't want to
347+
// bother re-generating the expected Init message in all tests.
348+
}
349+
fn handle_error(&self, _their_node_id: &PublicKey, msg: &msgs::ErrorMessage) {
350+
self.received_msg(wire::Message::Error(msg.clone()));
351+
}
283352
}
284353

285354
impl events::MessageSendEventsProvider for TestChannelMessageHandler {

0 commit comments

Comments
 (0)