@@ -85,7 +85,6 @@ use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler, NetAddre
85
85
use lightning:: util:: logger:: Logger ;
86
86
87
87
use std:: task;
88
- use std:: net:: IpAddr ;
89
88
use std:: net:: SocketAddr ;
90
89
use std:: net:: TcpStream as StdTcpStream ;
91
90
use std:: sync:: { Arc , Mutex } ;
@@ -212,6 +211,20 @@ impl Connection {
212
211
}
213
212
}
214
213
214
+ fn get_addr_from_stream ( stream : & StdTcpStream ) -> Option < NetAddress > {
215
+ match stream. peer_addr ( ) {
216
+ Ok ( SocketAddr :: V4 ( sockaddr) ) => Some ( NetAddress :: IPv4 {
217
+ addr : sockaddr. ip ( ) . octets ( ) ,
218
+ port : sockaddr. port ( ) ,
219
+ } ) ,
220
+ Ok ( SocketAddr :: V6 ( sockaddr) ) => Some ( NetAddress :: IPv6 {
221
+ addr : sockaddr. ip ( ) . octets ( ) ,
222
+ port : sockaddr. port ( ) ,
223
+ } ) ,
224
+ Err ( _) => None ,
225
+ }
226
+ }
227
+
215
228
/// Process incoming messages and feed outgoing messages on the provided socket generated by
216
229
/// accepting an incoming connection.
217
230
///
@@ -223,21 +236,12 @@ pub fn setup_inbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManag
223
236
RMH : RoutingMessageHandler + ' static + Send + Sync ,
224
237
L : Logger + ' static + ?Sized + Send + Sync ,
225
238
UMH : CustomMessageHandler + ' static + Send + Sync {
226
- let ip_addr = stream . peer_addr ( ) . unwrap ( ) ;
239
+ let remote_addr = get_addr_from_stream ( & stream ) ;
227
240
let ( reader, write_receiver, read_receiver, us) = Connection :: new ( stream) ;
228
241
#[ cfg( debug_assertions) ]
229
242
let last_us = Arc :: clone ( & us) ;
230
243
231
- let handle_opt = if let Ok ( _) = peer_manager. new_inbound_connection ( SocketDescriptor :: new ( us. clone ( ) ) , match ip_addr. ip ( ) {
232
- IpAddr :: V4 ( ip) => Some ( NetAddress :: IPv4 {
233
- addr : ip. octets ( ) ,
234
- port : ip_addr. port ( ) ,
235
- } ) ,
236
- IpAddr :: V6 ( ip) => Some ( NetAddress :: IPv6 {
237
- addr : ip. octets ( ) ,
238
- port : ip_addr. port ( ) ,
239
- } ) ,
240
- } ) {
244
+ let handle_opt = if let Ok ( _) = peer_manager. new_inbound_connection ( SocketDescriptor :: new ( us. clone ( ) ) , remote_addr) {
241
245
Some ( tokio:: spawn ( Connection :: schedule_read ( peer_manager, us, reader, read_receiver, write_receiver) ) )
242
246
} else {
243
247
// Note that we will skip socket_disconnected here, in accordance with the PeerManager
@@ -274,20 +278,11 @@ pub fn setup_outbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerMana
274
278
RMH : RoutingMessageHandler + ' static + Send + Sync ,
275
279
L : Logger + ' static + ?Sized + Send + Sync ,
276
280
UMH : CustomMessageHandler + ' static + Send + Sync {
277
- let ip_addr = stream . peer_addr ( ) . unwrap ( ) ;
281
+ let remote_addr = get_addr_from_stream ( & stream ) ;
278
282
let ( reader, mut write_receiver, read_receiver, us) = Connection :: new ( stream) ;
279
283
#[ cfg( debug_assertions) ]
280
284
let last_us = Arc :: clone ( & us) ;
281
- let handle_opt = if let Ok ( initial_send) = peer_manager. new_outbound_connection ( their_node_id, SocketDescriptor :: new ( us. clone ( ) ) , match ip_addr. ip ( ) {
282
- IpAddr :: V4 ( ip) => Some ( NetAddress :: IPv4 {
283
- addr : ip. octets ( ) ,
284
- port : ip_addr. port ( ) ,
285
- } ) ,
286
- IpAddr :: V6 ( ip) => Some ( NetAddress :: IPv6 {
287
- addr : ip. octets ( ) ,
288
- port : ip_addr. port ( ) ,
289
- } ) ,
290
- } ) {
285
+ let handle_opt = if let Ok ( initial_send) = peer_manager. new_outbound_connection ( their_node_id, SocketDescriptor :: new ( us. clone ( ) ) , remote_addr) {
291
286
Some ( tokio:: spawn ( async move {
292
287
// We should essentially always have enough room in a TCP socket buffer to send the
293
288
// initial 10s of bytes. However, tokio running in single-threaded mode will always
@@ -561,6 +556,22 @@ mod tests {
561
556
}
562
557
}
563
558
559
+ fn make_tcp_connection ( ) -> ( std:: net:: TcpStream , std:: net:: TcpStream ) {
560
+ if let Ok ( listener) = std:: net:: TcpListener :: bind ( "127.0.0.1:9735" ) {
561
+ ( std:: net:: TcpStream :: connect ( "127.0.0.1:9735" ) . unwrap ( ) , listener. accept ( ) . unwrap ( ) . 0 )
562
+ } else if let Ok ( listener) = std:: net:: TcpListener :: bind ( "127.0.0.1:19735" ) {
563
+ ( std:: net:: TcpStream :: connect ( "127.0.0.1:19735" ) . unwrap ( ) , listener. accept ( ) . unwrap ( ) . 0 )
564
+ } else if let Ok ( listener) = std:: net:: TcpListener :: bind ( "127.0.0.1:9997" ) {
565
+ ( std:: net:: TcpStream :: connect ( "127.0.0.1:9997" ) . unwrap ( ) , listener. accept ( ) . unwrap ( ) . 0 )
566
+ } else if let Ok ( listener) = std:: net:: TcpListener :: bind ( "127.0.0.1:9998" ) {
567
+ ( std:: net:: TcpStream :: connect ( "127.0.0.1:9998" ) . unwrap ( ) , listener. accept ( ) . unwrap ( ) . 0 )
568
+ } else if let Ok ( listener) = std:: net:: TcpListener :: bind ( "127.0.0.1:9999" ) {
569
+ ( std:: net:: TcpStream :: connect ( "127.0.0.1:9999" ) . unwrap ( ) , listener. accept ( ) . unwrap ( ) . 0 )
570
+ } else if let Ok ( listener) = std:: net:: TcpListener :: bind ( "127.0.0.1:46926" ) {
571
+ ( std:: net:: TcpStream :: connect ( "127.0.0.1:46926" ) . unwrap ( ) , listener. accept ( ) . unwrap ( ) . 0 )
572
+ } else { panic ! ( "Failed to bind to v4 localhost on common ports" ) ; }
573
+ }
574
+
564
575
async fn do_basic_connection_test ( ) {
565
576
let secp_ctx = Secp256k1 :: new ( ) ;
566
577
let a_key = SecretKey :: from_slice ( & [ 1 ; 32 ] ) . unwrap ( ) ;
@@ -600,13 +611,7 @@ mod tests {
600
611
// address. This may not always be the case in containers and the like, so if this test is
601
612
// failing for you check that you have a loopback interface and it is configured with
602
613
// 127.0.0.1.
603
- let ( conn_a, conn_b) = if let Ok ( listener) = std:: net:: TcpListener :: bind ( "127.0.0.1:9735" ) {
604
- ( std:: net:: TcpStream :: connect ( "127.0.0.1:9735" ) . unwrap ( ) , listener. accept ( ) . unwrap ( ) . 0 )
605
- } else if let Ok ( listener) = std:: net:: TcpListener :: bind ( "127.0.0.1:9999" ) {
606
- ( std:: net:: TcpStream :: connect ( "127.0.0.1:9999" ) . unwrap ( ) , listener. accept ( ) . unwrap ( ) . 0 )
607
- } else if let Ok ( listener) = std:: net:: TcpListener :: bind ( "127.0.0.1:46926" ) {
608
- ( std:: net:: TcpStream :: connect ( "127.0.0.1:46926" ) . unwrap ( ) , listener. accept ( ) . unwrap ( ) . 0 )
609
- } else { panic ! ( "Failed to bind to v4 localhost on common ports" ) ; } ;
614
+ let ( conn_a, conn_b) = make_tcp_connection ( ) ;
610
615
611
616
let fut_a = super :: setup_outbound ( Arc :: clone ( & a_manager) , b_pub, conn_a) ;
612
617
let fut_b = super :: setup_inbound ( b_manager, conn_b) ;
@@ -634,8 +639,53 @@ mod tests {
634
639
async fn basic_threaded_connection_test ( ) {
635
640
do_basic_connection_test ( ) . await ;
636
641
}
642
+
637
643
#[ tokio:: test]
638
644
async fn basic_unthreaded_connection_test ( ) {
639
645
do_basic_connection_test ( ) . await ;
640
646
}
647
+
648
+ async fn race_disconnect_accept ( ) {
649
+ // Previously, if we handed an already-disconnected socket to `setup_inbound` we'd panic.
650
+ // This attempts to find other similar races by opening connections and shutting them down
651
+ // while connecting. Sadly in testing this did *not* reproduce the previous issue.
652
+ let secp_ctx = Secp256k1 :: new ( ) ;
653
+ let a_key = SecretKey :: from_slice ( & [ 1 ; 32 ] ) . unwrap ( ) ;
654
+ let b_key = SecretKey :: from_slice ( & [ 2 ; 32 ] ) . unwrap ( ) ;
655
+ let b_pub = PublicKey :: from_secret_key ( & secp_ctx, & b_key) ;
656
+
657
+ let a_manager = Arc :: new ( PeerManager :: new ( MessageHandler {
658
+ chan_handler : Arc :: new ( lightning:: ln:: peer_handler:: ErroringMessageHandler :: new ( ) ) ,
659
+ route_handler : Arc :: new ( lightning:: ln:: peer_handler:: IgnoringMessageHandler { } ) ,
660
+ } , a_key, & [ 1 ; 32 ] , Arc :: new ( TestLogger ( ) ) , Arc :: new ( lightning:: ln:: peer_handler:: IgnoringMessageHandler { } ) ) ) ;
661
+
662
+ // Make two connections, one for an inbound and one for an outbound connection
663
+ let conn_a = {
664
+ let ( conn_a, _) = make_tcp_connection ( ) ;
665
+ conn_a
666
+ } ;
667
+ let conn_b = {
668
+ let ( _, conn_b) = make_tcp_connection ( ) ;
669
+ conn_b
670
+ } ;
671
+
672
+ // Call connection setup inside new tokio tasks.
673
+ let manager_reference = Arc :: clone ( & a_manager) ;
674
+ tokio:: spawn ( async move {
675
+ super :: setup_inbound ( manager_reference, conn_a) . await
676
+ } ) ;
677
+ tokio:: spawn ( async move {
678
+ super :: setup_outbound ( a_manager, b_pub, conn_b) . await
679
+ } ) ;
680
+ }
681
+
682
+ #[ tokio:: test( flavor = "multi_thread" ) ]
683
+ async fn threaded_race_disconnect_accept ( ) {
684
+ race_disconnect_accept ( ) . await ;
685
+ }
686
+
687
+ #[ tokio:: test]
688
+ async fn unthreaded_race_disconnect_accept ( ) {
689
+ race_disconnect_accept ( ) . await ;
690
+ }
641
691
}
0 commit comments