@@ -294,6 +294,24 @@ const OUTBOUND_BUFFER_LIMIT_READ_PAUSE: usize = 10;
294
294
/// the peer.
295
295
const OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP : usize = 20 ;
296
296
297
+ /// If we've sent a ping, and are still awaiting a response, we (or our peer) may need to churn our
298
+ /// (or their) way through the socket receive buffer before receiving the ping.
299
+ ///
300
+ /// On a fairly old Arm64 board, with Linux defaults, this can take as long as 20 seconds, not
301
+ /// including any network delays or outbound traffic.
302
+ ///
303
+ /// Thus, to avoid needlessly disconnecting a peer, we allow a peer to take this many timer ticks
304
+ /// to respond to a ping, as long as they send us at least one message during each tick or if we
305
+ /// sent a lot of messages, ensuring we aren't actually just disconnected. With a timer tick
306
+ /// interval of five seconds, this translates to about 30 seconds.
307
+ pub const MAX_BUFFER_DRAIN_TICK_INTERVALS : i8 = 6 ;
308
+
309
+ /// This is the minimum number of messages we expect a peer to be able to handle within one timer
310
+ /// tick. Once we have sent this many messages since the last ping, we send a ping right away to
311
+ /// ensures we don't just fill up our send buffer and leave the peer with too many messages to
312
+ /// process before the next ping.
313
+ pub const BUFFER_DRAIN_MSGS_PER_TICK : usize = 32 ;
314
+
297
315
struct Peer {
298
316
channel_encryptor : PeerChannelEncryptor ,
299
317
their_node_id : Option < PublicKey > ,
@@ -309,7 +327,9 @@ struct Peer {
309
327
310
328
sync_status : InitSyncTracker ,
311
329
312
- awaiting_pong : bool ,
330
+ msgs_sent_since_pong : usize ,
331
+ awaiting_pong_tick_intervals : i8 ,
332
+ received_message_since_timer_tick : bool ,
313
333
}
314
334
315
335
impl Peer {
@@ -551,7 +571,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
551
571
552
572
sync_status : InitSyncTracker :: NoSyncRequested ,
553
573
554
- awaiting_pong : false ,
574
+ msgs_sent_since_pong : 0 ,
575
+ awaiting_pong_tick_intervals : 0 ,
576
+ received_message_since_timer_tick : false ,
555
577
} ) . is_some ( ) {
556
578
panic ! ( "PeerManager driver duplicated descriptors!" ) ;
557
579
} ;
@@ -589,7 +611,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
589
611
590
612
sync_status : InitSyncTracker :: NoSyncRequested ,
591
613
592
- awaiting_pong : false ,
614
+ msgs_sent_since_pong : 0 ,
615
+ awaiting_pong_tick_intervals : 0 ,
616
+ received_message_since_timer_tick : false ,
593
617
} ) . is_some ( ) {
594
618
panic ! ( "PeerManager driver duplicated descriptors!" ) ;
595
619
} ;
@@ -598,7 +622,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
598
622
599
623
fn do_attempt_write_data ( & self , descriptor : & mut Descriptor , peer : & mut Peer ) {
600
624
while !peer. awaiting_write_event {
601
- if peer. pending_outbound_buffer . len ( ) < OUTBOUND_BUFFER_LIMIT_READ_PAUSE {
625
+ if peer. pending_outbound_buffer . len ( ) < OUTBOUND_BUFFER_LIMIT_READ_PAUSE && peer . msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK {
602
626
match peer. sync_status {
603
627
InitSyncTracker :: NoSyncRequested => { } ,
604
628
InitSyncTracker :: ChannelsSyncing ( c) if c < 0xffff_ffff_ffff_ffff => {
@@ -643,6 +667,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
643
667
} ,
644
668
}
645
669
}
670
+ if peer. msgs_sent_since_pong >= BUFFER_DRAIN_MSGS_PER_TICK {
671
+ self . maybe_send_extra_ping ( peer) ;
672
+ }
646
673
647
674
if {
648
675
let next_buff = match peer. pending_outbound_buffer . front ( ) {
@@ -725,6 +752,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
725
752
let encoded_message = buffer. 0 ;
726
753
727
754
log_trace ! ( self . logger, "Enqueueing message {:?} to {}" , message, log_pubkey!( peer. their_node_id. unwrap( ) ) ) ;
755
+ peer. msgs_sent_since_pong += 1 ;
728
756
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encoded_message[ ..] ) ) ;
729
757
}
730
758
@@ -919,6 +947,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
919
947
message : wire:: Message < <<CMH as core:: ops:: Deref >:: Target as wire:: CustomMessageReader >:: CustomMessage >
920
948
) -> Result < Option < wire:: Message < <<CMH as core:: ops:: Deref >:: Target as wire:: CustomMessageReader >:: CustomMessage > > , MessageHandlingError > {
921
949
log_trace ! ( self . logger, "Received message {:?} from {}" , message, log_pubkey!( peer. their_node_id. unwrap( ) ) ) ;
950
+ peer. received_message_since_timer_tick = true ;
922
951
923
952
// Need an Init as first message
924
953
if let wire:: Message :: Init ( _) = message {
@@ -982,7 +1011,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
982
1011
}
983
1012
} ,
984
1013
wire:: Message :: Pong ( _msg) => {
985
- peer. awaiting_pong = false ;
1014
+ peer. awaiting_pong_tick_intervals = 0 ;
1015
+ peer. msgs_sent_since_pong = 0 ;
986
1016
} ,
987
1017
988
1018
// Channel messages:
@@ -1103,7 +1133,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
1103
1133
!peer. should_forward_channel_announcement ( msg. contents . short_channel_id ) {
1104
1134
continue
1105
1135
}
1106
- if peer. pending_outbound_buffer . len ( ) > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP {
1136
+ if peer. pending_outbound_buffer . len ( ) > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP || peer . msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * 2 {
1107
1137
log_trace ! ( self . logger, "Skipping broadcast message to {:?} as its outbound buffer is full" , peer. their_node_id) ;
1108
1138
continue ;
1109
1139
}
@@ -1115,6 +1145,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
1115
1145
continue ;
1116
1146
}
1117
1147
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encoded_msg[ ..] ) ) ;
1148
+ peer. msgs_sent_since_pong += 1 ;
1118
1149
}
1119
1150
} ,
1120
1151
wire:: Message :: NodeAnnouncement ( ref msg) => {
@@ -1126,7 +1157,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
1126
1157
!peer. should_forward_node_announcement ( msg. contents . node_id ) {
1127
1158
continue
1128
1159
}
1129
- if peer. pending_outbound_buffer . len ( ) > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP {
1160
+ if peer. pending_outbound_buffer . len ( ) > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP || peer . msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * 2 {
1130
1161
log_trace ! ( self . logger, "Skipping broadcast message to {:?} as its outbound buffer is full" , peer. their_node_id) ;
1131
1162
continue ;
1132
1163
}
@@ -1137,6 +1168,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
1137
1168
continue ;
1138
1169
}
1139
1170
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encoded_msg[ ..] ) ) ;
1171
+ peer. msgs_sent_since_pong += 1 ;
1140
1172
}
1141
1173
} ,
1142
1174
wire:: Message :: ChannelUpdate ( ref msg) => {
@@ -1148,14 +1180,15 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
1148
1180
!peer. should_forward_channel_announcement ( msg. contents . short_channel_id ) {
1149
1181
continue
1150
1182
}
1151
- if peer. pending_outbound_buffer . len ( ) > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP {
1183
+ if peer. pending_outbound_buffer . len ( ) > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP || peer . msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * 2 {
1152
1184
log_trace ! ( self . logger, "Skipping broadcast message to {:?} as its outbound buffer is full" , peer. their_node_id) ;
1153
1185
continue ;
1154
1186
}
1155
1187
if except_node. is_some ( ) && peer. their_node_id . as_ref ( ) == except_node {
1156
1188
continue ;
1157
1189
}
1158
1190
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encoded_msg[ ..] ) ) ;
1191
+ peer. msgs_sent_since_pong += 1 ;
1159
1192
}
1160
1193
} ,
1161
1194
_ => debug_assert ! ( false , "We shouldn't attempt to forward anything but gossip messages" ) ,
@@ -1431,6 +1464,20 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
1431
1464
}
1432
1465
}
1433
1466
1467
+ /// This is called when we're blocked on sending additional gossip messages until we receive a
1468
+ /// pong. If we aren't waiting on a pong, we take this opportunity to send a ping (setting
1469
+ /// `awaiting_pong_tick_intervals` to a special flag value to indicate this).
1470
+ fn maybe_send_extra_ping ( & self , peer : & mut Peer ) {
1471
+ if peer. awaiting_pong_tick_intervals == 0 {
1472
+ peer. awaiting_pong_tick_intervals = -1 ;
1473
+ let ping = msgs:: Ping {
1474
+ ponglen : 0 ,
1475
+ byteslen : 64 ,
1476
+ } ;
1477
+ self . enqueue_message ( peer, & ping) ;
1478
+ }
1479
+ }
1480
+
1434
1481
/// Send pings to each peer and disconnect those which did not respond to the last round of
1435
1482
/// pings.
1436
1483
///
@@ -1451,7 +1498,14 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
1451
1498
let mut descriptors_needing_disconnect = Vec :: new ( ) ;
1452
1499
1453
1500
peers. retain ( |descriptor, peer| {
1454
- if peer. awaiting_pong {
1501
+ if !peer. channel_encryptor . is_ready_for_encryption ( ) {
1502
+ // The peer needs to complete its handshake before we can exchange messages
1503
+ return true ;
1504
+ }
1505
+
1506
+ if ( peer. awaiting_pong_tick_intervals > 0 && !peer. received_message_since_timer_tick )
1507
+ || peer. awaiting_pong_tick_intervals > MAX_BUFFER_DRAIN_TICK_INTERVALS
1508
+ {
1455
1509
descriptors_needing_disconnect. push ( descriptor. clone ( ) ) ;
1456
1510
match peer. their_node_id {
1457
1511
Some ( node_id) => {
@@ -1468,21 +1522,26 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
1468
1522
return false ;
1469
1523
}
1470
1524
1471
- if !peer. channel_encryptor . is_ready_for_encryption ( ) {
1472
- // The peer needs to complete its handshake before we can exchange messages
1525
+ peer. received_message_since_timer_tick = false ;
1526
+ if peer. awaiting_pong_tick_intervals == -1 {
1527
+ // Magic value set in `maybe_send_extra_ping`.
1528
+ peer. awaiting_pong_tick_intervals = 1 ;
1529
+ return true ;
1530
+ }
1531
+
1532
+ if peer. awaiting_pong_tick_intervals > 0 {
1533
+ peer. awaiting_pong_tick_intervals += 1 ;
1473
1534
return true ;
1474
1535
}
1475
1536
1537
+ peer. awaiting_pong_tick_intervals = 1 ;
1476
1538
let ping = msgs:: Ping {
1477
1539
ponglen : 0 ,
1478
1540
byteslen : 64 ,
1479
1541
} ;
1480
1542
self . enqueue_message ( peer, & ping) ;
1543
+ self . do_attempt_write_data ( & mut ( descriptor. clone ( ) ) , & mut * peer) ;
1481
1544
1482
- let mut descriptor_clone = descriptor. clone ( ) ;
1483
- self . do_attempt_write_data ( & mut descriptor_clone, peer) ;
1484
-
1485
- peer. awaiting_pong = true ;
1486
1545
true
1487
1546
} ) ;
1488
1547
@@ -1641,11 +1700,14 @@ mod tests {
1641
1700
// than can fit into a peer's buffer).
1642
1701
let ( mut fd_a, mut fd_b) = establish_connection ( & peers[ 0 ] , & peers[ 1 ] ) ;
1643
1702
1644
- // Make each peer to read the messages that the other peer just wrote to them.
1645
- peers[ 0 ] . process_events ( ) ;
1646
- peers[ 1 ] . read_event ( & mut fd_b, & fd_a. outbound_data . lock ( ) . unwrap ( ) . split_off ( 0 ) ) . unwrap ( ) ;
1647
- peers[ 1 ] . process_events ( ) ;
1648
- peers[ 0 ] . read_event ( & mut fd_a, & fd_b. outbound_data . lock ( ) . unwrap ( ) . split_off ( 0 ) ) . unwrap ( ) ;
1703
+ // Make each peer to read the messages that the other peer just wrote to them. Note that
1704
+ // due to the max-messagse-before-bing limits this may take a few iterations to complete.
1705
+ for _ in 0 ..10 {
1706
+ peers[ 0 ] . process_events ( ) ;
1707
+ peers[ 1 ] . read_event ( & mut fd_b, & fd_a. outbound_data . lock ( ) . unwrap ( ) . split_off ( 0 ) ) . unwrap ( ) ;
1708
+ peers[ 1 ] . process_events ( ) ;
1709
+ peers[ 0 ] . read_event ( & mut fd_a, & fd_b. outbound_data . lock ( ) . unwrap ( ) . split_off ( 0 ) ) . unwrap ( ) ;
1710
+ }
1649
1711
1650
1712
// Check that each peer has received the expected number of channel updates and channel
1651
1713
// announcements.
0 commit comments