1717
1818pub mod service;
1919
20- use crate :: args:: DftArgs ;
20+ use crate :: args:: { Command , DftArgs } ;
2121use crate :: config:: AppConfig ;
2222use crate :: execution:: AppExecution ;
2323use color_eyre:: { eyre:: eyre, Result } ;
@@ -26,7 +26,7 @@ use datafusion_app::extensions::DftSessionStateBuilder;
2626use datafusion_app:: local:: ExecutionContext ;
2727use log:: info;
2828use service:: FlightSqlServiceImpl ;
29- use std:: net:: SocketAddr ;
29+ use std:: net:: { IpAddr , Ipv4Addr , SocketAddr } ;
3030use std:: time:: Duration ;
3131use tokio:: net:: TcpListener ;
3232use tokio:: sync:: oneshot;
@@ -38,7 +38,6 @@ use tower_http::validate_request::ValidateRequestHeaderLayer;
3838use super :: try_start_metrics_server;
3939
4040const DEFAULT_TIMEOUT_SECONDS : u64 = 60 ;
41- const DEFAULT_SERVER_ADDRESS : & str = "127.0.0.1:50051" ;
4241
4342pub fn create_server_handle (
4443 config : & AppConfig ,
@@ -121,8 +120,8 @@ impl FlightSqlApp {
121120 pub async fn try_new (
122121 app_execution : AppExecution ,
123122 config : & AppConfig ,
124- addr : & str ,
125- metrics_addr : & str ,
123+ addr : SocketAddr ,
124+ metrics_addr : SocketAddr ,
126125 ) -> Result < Self > {
127126 info ! ( "Listening to FlightSQL on {addr}" ) ;
128127 let flightsql = service:: FlightSqlServiceImpl :: new ( app_execution) ;
@@ -132,7 +131,6 @@ impl FlightSqlApp {
132131 let ( tx, rx) = tokio:: sync:: oneshot:: channel ( ) ;
133132 let handle = create_server_handle ( config, flightsql, listener, rx) ?;
134133
135- let metrics_addr: SocketAddr = metrics_addr. parse ( ) ?;
136134 try_start_metrics_server ( metrics_addr) ?;
137135
138136 let app = Self {
@@ -188,13 +186,41 @@ pub async fn try_run(cli: DftArgs, config: AppConfig) -> Result<()> {
188186 execution_ctx. execute_ddl ( ) . await ;
189187 }
190188 let app_execution = AppExecution :: new ( execution_ctx) ;
191- let app = FlightSqlApp :: try_new (
192- app_execution,
193- & config,
194- & cli. host . unwrap_or ( DEFAULT_SERVER_ADDRESS . to_string ( ) ) ,
195- & config. flightsql_server . server_metrics_port ,
196- )
197- . await ?;
189+
190+ let ( addr, metrics_addr) = if let Some ( cmd) = cli. command . clone ( ) {
191+ match cmd {
192+ Command :: ServeFlightSql {
193+ addr : Some ( addr) ,
194+ metrics_addr : Some ( metrics_addr) ,
195+ ..
196+ } => ( addr, metrics_addr) ,
197+ Command :: ServeFlightSql {
198+ addr : Some ( addr) ,
199+ metrics_addr : None ,
200+ ..
201+ } => ( addr, config. flightsql_server . server_metrics_addr ) ,
202+ Command :: ServeFlightSql {
203+ addr : None ,
204+ metrics_addr : Some ( metrics_addr) ,
205+ ..
206+ } => (
207+ SocketAddr :: new ( IpAddr :: V4 ( Ipv4Addr :: new ( 127 , 0 , 0 , 1 ) ) , 50051 ) ,
208+ metrics_addr,
209+ ) ,
210+
211+ _ => (
212+ SocketAddr :: new ( IpAddr :: V4 ( Ipv4Addr :: new ( 127 , 0 , 0 , 1 ) ) , 50051 ) ,
213+ config. flightsql_server . server_metrics_addr ,
214+ ) ,
215+ }
216+ } else {
217+ (
218+ SocketAddr :: new ( IpAddr :: V4 ( Ipv4Addr :: new ( 127 , 0 , 0 , 1 ) ) , 50051 ) ,
219+ config. flightsql_server . server_metrics_addr ,
220+ )
221+ } ;
222+
223+ let app = FlightSqlApp :: try_new ( app_execution, & config, addr, metrics_addr) . await ?;
198224 app. run ( ) . await ;
199225 Ok ( ( ) )
200226}
0 commit comments