Skip to content

Commit e60c66e

Browse files
Start adding more flightsql functionality
1 parent f897723 commit e60c66e

5 files changed

Lines changed: 85 additions & 6 deletions

File tree

crates/datafusion-app/src/flightsql.rs

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,17 @@
1717

1818
use std::sync::Arc;
1919

20-
use arrow_flight::sql::client::FlightSqlServiceClient;
20+
use arrow_flight::{
21+
decode::FlightRecordBatchStream, sql::client::FlightSqlServiceClient, FlightInfo,
22+
};
2123
#[cfg(feature = "flightsql")]
2224
use base64::engine::{general_purpose::STANDARD, Engine as _};
2325
use datafusion::{
2426
error::{DataFusionError, Result as DFResult},
2527
physical_plan::stream::RecordBatchStreamAdapter,
2628
sql::parser::DFParser,
2729
};
28-
use log::{error, info, warn};
30+
use log::{debug, error, info, warn};
2931

3032
use color_eyre::eyre::{self, Result};
3133
use tokio::sync::Mutex;
@@ -203,4 +205,43 @@ impl FlightSQLContext {
203205
return Err(DataFusionError::External("Missing client".into()));
204206
}
205207
}
208+
209+
pub async fn get_catalogs_flight_info(&self) -> DFResult<FlightInfo> {
210+
let client = self.client.clone();
211+
let mut guard = client.lock().await;
212+
if let Some(client) = guard.as_mut() {
213+
client
214+
.get_catalogs()
215+
.await
216+
.map_err(|e| DataFusionError::ArrowError(e, None))
217+
} else {
218+
Err(DataFusionError::External(
219+
"No FlightSQL client configured. Add one in `~/.config/dft/config.toml`".into(),
220+
))
221+
}
222+
}
223+
224+
pub async fn do_get(&self, flight_info: FlightInfo) -> DFResult<Vec<FlightRecordBatchStream>> {
225+
let client = self.client.clone();
226+
let mut guard = client.lock().await;
227+
if let Some(client) = guard.as_mut() {
228+
let mut streams = Vec::new();
229+
for endpoint in flight_info.endpoint {
230+
if let Some(ticket) = endpoint.ticket {
231+
let stream = client
232+
.do_get(ticket.into_request())
233+
.await
234+
.map_err(|e| DataFusionError::ArrowError(e, None))?;
235+
streams.push(stream);
236+
} else {
237+
debug!("No ticket for endpoint: {endpoint}");
238+
}
239+
}
240+
Ok(streams)
241+
} else {
242+
Err(DataFusionError::External(
243+
"No FlightSQL client configured. Add one in `~/.config/dft/config.toml`".into(),
244+
))
245+
}
246+
}
206247
}

src/args.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,9 @@ impl DftArgs {
130130
#[derive(Clone, Debug, Subcommand)]
131131
pub enum FlightSqlCommand {
132132
/// Executes `GetFlightInfo` and `DoGet` on the provided SQL query
133-
Query { sql: String },
133+
Statement { sql: String },
134+
/// Executes `GetCatalogsFlightInfo` and `DoGet`
135+
Catalogs,
134136
}
135137

136138
#[derive(Clone, Debug, Subcommand)]

src/cli/mod.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ use std::path::{Path, PathBuf};
4141
#[cfg(feature = "flightsql")]
4242
use {
4343
crate::args::FlightSqlCommand,
44+
arrow_flight::FlightInfo,
4445
datafusion_app::{
4546
config::{AuthConfig, FlightSQLConfig},
4647
flightsql::FlightSQLContext,
@@ -87,7 +88,15 @@ impl CliApp {
8788
#[cfg(feature = "flightsql")]
8889
async fn handle_flightsql_command(&self, command: FlightSqlCommand) -> color_eyre::Result<()> {
8990
match command {
90-
FlightSqlCommand::Query { sql } => self.exec_from_flightsql(sql, 0).await,
91+
FlightSqlCommand::Statement { sql } => self.exec_from_flightsql(sql, 0).await,
92+
FlightSqlCommand::Catalogs => {
93+
let flight_info = self
94+
.app_execution
95+
.flightsql_ctx()
96+
.get_catalogs_flight_info()
97+
.await?;
98+
Ok(())
99+
}
91100
}
92101
}
93102

src/server/flightsql/service.rs

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ use arrow_flight::encode::FlightDataEncoderBuilder;
2020
use arrow_flight::error::FlightError;
2121
use arrow_flight::flight_service_server::{FlightService, FlightServiceServer};
2222
use arrow_flight::sql::server::FlightSqlService;
23-
use arrow_flight::sql::{Any, CommandStatementQuery, SqlInfo, TicketStatementQuery};
23+
use arrow_flight::sql::{
24+
Any, CommandGetCatalogs, CommandStatementQuery, SqlInfo, TicketStatementQuery,
25+
};
2426
use arrow_flight::{FlightDescriptor, FlightEndpoint, FlightInfo, Ticket};
2527
use color_eyre::Result;
2628
use datafusion::logical_expr::LogicalPlan;
@@ -218,6 +220,31 @@ impl FlightSqlServiceImpl {
218220
impl FlightSqlService for FlightSqlServiceImpl {
219221
type FlightService = FlightSqlServiceImpl;
220222

223+
async fn get_flight_info_catalogs(
224+
&self,
225+
_query: CommandGetCatalogs,
226+
request: Request<FlightDescriptor>,
227+
) -> Result<Response<FlightInfo>, Status> {
228+
counter!("requests", "endpoint" => "get_flight_info").increment(1);
229+
let start = Timestamp::now();
230+
let request_id = uuid::Uuid::new_v4();
231+
let query = "SELECT DISTINCT table_catalog FROM information_schema.tables".to_string();
232+
let res = self
233+
.get_flight_info_statement_handler(query, request_id, request)
234+
.await;
235+
236+
// TODO: Move recording to after response is sent to not impact response latency
237+
self.record_request(
238+
start,
239+
Some(request_id.to_string()),
240+
res.as_ref().err(),
241+
"/get_flight_info_catalogs".to_string(),
242+
"get_flight_info_catalogs_latency_ms",
243+
)
244+
.await;
245+
res
246+
}
247+
221248
async fn get_flight_info_statement(
222249
&self,
223250
query: CommandStatementQuery,

tests/extension_cases/flightsql.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -645,7 +645,7 @@ async fn test_flightsql_query_command() {
645645
Command::cargo_bin("dft")
646646
.unwrap()
647647
.arg("flightsql")
648-
.arg("query")
648+
.arg("statement")
649649
.arg(sql.clone())
650650
.timeout(Duration::from_secs(5))
651651
.assert()

0 commit comments

Comments
 (0)