@@ -30,10 +30,11 @@ use object_store::ObjectStore;
30
30
use observability_deps:: tracing:: * ;
31
31
use panic_logging:: SendPanicsToTracing ;
32
32
use parquet_file:: storage:: { ParquetStorage , StorageId } ;
33
+ use socket2:: { Domain , Type } ;
33
34
use std:: { collections:: HashMap , path:: Path , str:: FromStr } ;
34
35
use std:: { num:: NonZeroUsize , sync:: Arc } ;
35
36
use thiserror:: Error ;
36
- use tokio:: net:: TcpListener ;
37
+ use tokio:: net:: TcpListener as TokioTcpListener ;
37
38
use tokio_util:: sync:: CancellationToken ;
38
39
use trace_exporters:: TracingConfig ;
39
40
use trace_http:: ctx:: TraceHeaderParser ;
@@ -460,6 +461,10 @@ pub async fn command(config: Config) -> Result<()> {
460
461
Arc :: clone ( & telemetry_store) ,
461
462
) ?;
462
463
464
+ let sock_addr: std:: net:: SocketAddr = * config. http_bind_address ;
465
+
466
+ let listener = setup_tokio_tcp_listener ( sock_addr) ?;
467
+
463
468
let query_executor = Arc :: new ( QueryExecutorImpl :: new (
464
469
write_buffer. catalog ( ) ,
465
470
Arc :: clone ( & write_buffer) ,
@@ -471,10 +476,6 @@ pub async fn command(config: Config) -> Result<()> {
471
476
Arc :: clone ( & telemetry_store) ,
472
477
) ) ;
473
478
474
- let listener = TcpListener :: bind ( * config. http_bind_address )
475
- . await
476
- . map_err ( Error :: BindAddress ) ?;
477
-
478
479
let builder = ServerBuilder :: new ( common_state)
479
480
. max_request_size ( config. max_http_request_size )
480
481
. write_buffer ( write_buffer)
@@ -552,3 +553,27 @@ fn parse_datafusion_config(
552
553
553
554
Ok ( out)
554
555
}
556
+
557
+ #[ cfg( windows) ]
558
+ fn setup_tokio_tcp_listener ( sock_addr : std:: net:: SocketAddr ) -> Result < TokioTcpListener > {
559
+ let socket = socket2:: Socket :: new ( Domain :: IPV4 , Type :: STREAM , None ) . expect ( "create socket" ) ;
560
+ socket. bind ( & sock_addr. into ( ) ) . expect ( "bind socket" ) ;
561
+ socket. listen ( 1 ) . expect ( "listening on socket" ) ;
562
+
563
+ let listener: std:: net:: TcpListener = socket. into ( ) ;
564
+ let listener = TokioTcpListener :: from_std ( listener) ;
565
+ listener. map_err ( Error :: BindAddress )
566
+ }
567
+
568
+ #[ cfg( not( windows) ) ]
569
+ fn setup_tokio_tcp_listener ( sock_addr : std:: net:: SocketAddr ) -> Result < TokioTcpListener > {
570
+ let socket = socket2:: Socket :: new ( Domain :: IPV4 , Type :: STREAM , None ) . expect ( "create socket" ) ;
571
+ socket. bind ( & sock_addr. into ( ) ) . expect ( "bind socket" ) ;
572
+ socket. set_reuse_address ( true ) . expect ( "setup reuse addr" ) ;
573
+ socket. set_reuse_port ( true ) . expect ( "setup reuse port" ) ;
574
+ socket. listen ( 1 ) . expect ( "listening on socket" ) ;
575
+
576
+ let listener: std:: net:: TcpListener = socket. into ( ) ;
577
+ let listener = TokioTcpListener :: from_std ( listener) ;
578
+ listener. map_err ( Error :: BindAddress )
579
+ }
0 commit comments