Skip to content

Commit 1a90da0

Browse files
Get tables
1 parent 25540d7 commit 1a90da0

7 files changed

Lines changed: 412 additions & 60 deletions

File tree

crates/datafusion-app/src/catalog/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ pub fn create_app_catalog(
3838
) -> Result<Arc<dyn CatalogProvider>> {
3939
let catalog = MemoryCatalogProvider::new();
4040
let meta_schema = Arc::new(MemorySchemaProvider::new());
41-
catalog.register_schema("meta", meta_schema.clone())?;
41+
catalog.register_schema("meta", Arc::<MemorySchemaProvider>::clone(&meta_schema))?;
4242
let versions_table = try_create_meta_versions_table(app_name, app_version)?;
4343
meta_schema.register_table("versions".to_string(), versions_table)?;
4444
Ok(Arc::new(catalog))
@@ -56,7 +56,7 @@ fn try_create_meta_versions_table(app_name: &str, app_version: &str) -> Result<A
5656
let datafusion_version_arr = StringArray::from(vec![DATAFUSION_VERSION]);
5757
let datafusion_app_version_arr = StringArray::from(vec![env!("CARGO_PKG_VERSION")]);
5858
let batches = RecordBatch::try_new(
59-
schema.clone(),
59+
Arc::<Schema>::clone(&schema),
6060
vec![
6161
Arc::new(app_version_arr),
6262
Arc::new(datafusion_version_arr),

crates/datafusion-app/src/flightsql.rs

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use std::sync::Arc;
1919

2020
use arrow_flight::{
2121
decode::FlightRecordBatchStream,
22-
sql::{client::FlightSqlServiceClient, CommandGetDbSchemas},
22+
sql::{client::FlightSqlServiceClient, CommandGetDbSchemas, CommandGetTables},
2323
FlightInfo,
2424
};
2525
#[cfg(feature = "flightsql")]
@@ -226,14 +226,14 @@ impl FlightSQLContext {
226226
pub async fn get_db_schemas_flight_info(
227227
&self,
228228
catalog: Option<String>,
229-
schema_pattern: Option<String>,
229+
schema_filter_pattern: Option<String>,
230230
) -> DFResult<FlightInfo> {
231231
let client = Arc::clone(&self.client);
232232
let mut guard = client.lock().await;
233233
if let Some(client) = guard.as_mut() {
234234
let cmd = CommandGetDbSchemas {
235235
catalog,
236-
db_schema_filter_pattern: schema_pattern,
236+
db_schema_filter_pattern: schema_filter_pattern,
237237
};
238238
client
239239
.get_db_schemas(cmd)
@@ -246,6 +246,35 @@ impl FlightSQLContext {
246246
}
247247
}
248248

249+
pub async fn get_tables_flight_info(
250+
&self,
251+
catalog: Option<String>,
252+
schema_filter_pattern: Option<String>,
253+
table_name_filter_pattern: Option<String>,
254+
table_types: Vec<String>,
255+
include_schema: bool,
256+
) -> DFResult<FlightInfo> {
257+
let client = Arc::clone(&self.client);
258+
let mut guard = client.lock().await;
259+
if let Some(client) = guard.as_mut() {
260+
let cmd = CommandGetTables {
261+
catalog,
262+
db_schema_filter_pattern: schema_filter_pattern,
263+
table_name_filter_pattern,
264+
table_types,
265+
include_schema,
266+
};
267+
client
268+
.get_tables(cmd)
269+
.await
270+
.map_err(|e| DataFusionError::ArrowError(e, None))
271+
} else {
272+
Err(DataFusionError::External(
273+
"No FlightSQL client configured. Add one in `~/.config/dft/config.toml`".into(),
274+
))
275+
}
276+
}
277+
249278
pub async fn do_get(&self, flight_info: FlightInfo) -> DFResult<Vec<FlightRecordBatchStream>> {
250279
let client = Arc::clone(&self.client);
251280
let mut guard = client.lock().await;

crates/datafusion-app/src/observability/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ impl ObservabilityContext {
6161
}
6262

6363
pub fn schema(&self) -> Arc<dyn SchemaProvider> {
64-
self.schema.clone()
64+
Arc::<dyn SchemaProvider>::clone(&self.schema)
6565
}
6666

6767
/// Attempts to insert request details in `requests` table. No verification is performed on

src/args.rs

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -127,22 +127,40 @@ impl DftArgs {
127127
}
128128
}
129129

130+
/// Parameters for each command match to exactly how they are defined in specification (https://arrow.apache.org/docs/format/FlightSql.html#protocol-buffer-definitions)
130131
#[derive(Clone, Debug, Subcommand)]
131132
pub enum FlightSqlCommand {
132133
/// Executes `CommandStatementQuery` and `DoGet` to return results
133134
StatementQuery {
134135
/// The query to execute
136+
#[clap(long)]
135137
sql: String,
136138
},
137139
/// Executes `CommandGetCatalogs` and `DoGet` to return results
138140
GetCatalogs,
139141
/// Executes `CommandGetDbSchemas` and `DoGet` to return results
140142
GetDbSchemas {
141143
/// The catalog to retrieve schemas
142-
#[clap(long, short)]
144+
#[clap(long)]
143145
catalog: Option<String>,
144-
#[clap(long, short)]
145-
schema_pattern: Option<String>,
146+
/// Schema filter pattern to apply
147+
#[clap(long)]
148+
db_schema_filter_pattern: Option<String>,
149+
},
150+
/// Executes `CommandGetDbSchemas` and `DoGet` to return results
151+
GetTables {
152+
/// The catalog to retrieve schemas
153+
#[clap(long)]
154+
catalog: Option<String>,
155+
/// Schema filter pattern to apply
156+
#[clap(long)]
157+
db_schema_filter_pattern: Option<String>,
158+
/// Table name filter pattern to apply
159+
#[clap(long)]
160+
table_name_filter_pattern: Option<String>,
161+
/// Specific table types to return
162+
#[clap(long)]
163+
table_types: Option<Vec<String>>,
146164
},
147165
}
148166

src/cli/mod.rs

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,12 +106,39 @@ impl CliApp {
106106
}
107107
FlightSqlCommand::GetDbSchemas {
108108
catalog,
109-
schema_pattern,
109+
db_schema_filter_pattern,
110110
} => {
111111
let flight_info = self
112112
.app_execution
113113
.flightsql_ctx()
114-
.get_db_schemas_flight_info(catalog, schema_pattern)
114+
.get_db_schemas_flight_info(catalog, db_schema_filter_pattern)
115+
.await?;
116+
let streams = self
117+
.app_execution
118+
.flightsql_ctx()
119+
.do_get(flight_info)
120+
.await?;
121+
let flight_batch_stream = stream::select_all(streams);
122+
self.print_any_stream(flight_batch_stream).await;
123+
Ok(())
124+
}
125+
126+
FlightSqlCommand::GetTables {
127+
catalog,
128+
db_schema_filter_pattern,
129+
table_name_filter_pattern,
130+
table_types,
131+
} => {
132+
let flight_info = self
133+
.app_execution
134+
.flightsql_ctx()
135+
.get_tables_flight_info(
136+
catalog,
137+
db_schema_filter_pattern,
138+
table_name_filter_pattern,
139+
table_types.unwrap_or_default(),
140+
false,
141+
)
115142
.await?;
116143
let streams = self
117144
.app_execution

src/server/flightsql/service.rs

Lines changed: 119 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,13 @@ use arrow_flight::error::FlightError;
2121
use arrow_flight::flight_service_server::{FlightService, FlightServiceServer};
2222
use arrow_flight::sql::server::FlightSqlService;
2323
use arrow_flight::sql::{
24-
Any, CommandGetCatalogs, CommandGetDbSchemas, CommandStatementQuery, SqlInfo,
24+
Any, CommandGetCatalogs, CommandGetDbSchemas, CommandGetTables, CommandStatementQuery, SqlInfo,
2525
TicketStatementQuery,
2626
};
2727
use arrow_flight::{FlightDescriptor, FlightEndpoint, FlightInfo, Ticket};
2828
use color_eyre::Result;
2929
use datafusion::logical_expr::LogicalPlan;
30+
use datafusion::prelude::{col, lit};
3031
use datafusion::sql::parser::DFParser;
3132
use datafusion_app::local::ExecutionContext;
3233
use datafusion_app::observability::ObservabilityRequestDetails;
@@ -138,11 +139,44 @@ impl FlightSqlServiceImpl {
138139
histogram!(latency_metric).record(duration.get_milliseconds() as f64);
139140
}
140141

142+
async fn create_flight_info_for_logical_plan(
143+
&self,
144+
logical_plan: LogicalPlan,
145+
request_id: Uuid,
146+
_request: Request<FlightDescriptor>,
147+
) -> Result<Response<FlightInfo>, Status> {
148+
let schema = logical_plan.schema();
149+
let ticket = TicketStatementQuery {
150+
statement_handle: request_id.to_string().into(),
151+
};
152+
debug!("created ticket handle: {:?}", ticket.statement_handle);
153+
let mut bytes: Vec<u8> = Vec::new();
154+
if ticket.encode(&mut bytes).is_ok() {
155+
let info = FlightInfo::new()
156+
.try_with_schema(schema.as_arrow())
157+
.unwrap()
158+
.with_endpoint(FlightEndpoint::new().with_ticket(Ticket::new(bytes.clone())))
159+
.with_descriptor(FlightDescriptor::new_cmd(bytes));
160+
debug!("created flight info: {:?}", info);
161+
162+
let mut guard = self
163+
.requests
164+
.lock()
165+
.map_err(|_| Status::internal("failed to acquire lock on requests"))?;
166+
guard.insert(request_id, logical_plan);
167+
168+
Ok(Response::new(info))
169+
} else {
170+
error!("error encoding ticket");
171+
Err(Status::internal("error encoding ticket"))
172+
}
173+
}
174+
141175
async fn create_flight_info(
142176
&self,
143177
query: String,
144178
request_id: Uuid,
145-
_request: Request<FlightDescriptor>,
179+
request: Request<FlightDescriptor>,
146180
) -> Result<Response<FlightInfo>, Status> {
147181
debug!(
148182
"creating flight info for request id {request_id} with query: {:?}",
@@ -154,44 +188,15 @@ impl FlightSqlServiceImpl {
154188
let statement = statements[0].clone();
155189
let start = std::time::Instant::now();
156190

157-
let logical_plan = self.execution.statement_to_logical_plan(statement).await;
158-
159-
match logical_plan {
160-
Ok(logical_plan) => {
161-
debug!("logical planning took: {:?}", start.elapsed());
162-
let schema = logical_plan.schema();
163-
164-
let ticket = TicketStatementQuery {
165-
statement_handle: request_id.to_string().into(),
166-
};
167-
debug!("created ticket handle: {:?}", ticket.statement_handle);
168-
let mut bytes: Vec<u8> = Vec::new();
169-
if ticket.encode(&mut bytes).is_ok() {
170-
let info = FlightInfo::new()
171-
.try_with_schema(schema.as_arrow())
172-
.unwrap()
173-
.with_endpoint(
174-
FlightEndpoint::new().with_ticket(Ticket::new(bytes)),
175-
)
176-
.with_descriptor(FlightDescriptor::new_cmd(query));
177-
debug!("flight info: {:?}", info);
178-
179-
let mut guard = self.requests.lock().map_err(|_| {
180-
Status::internal("failed to acquire lock on requests")
181-
})?;
182-
guard.insert(request_id, logical_plan);
183-
184-
Ok(Response::new(info))
185-
} else {
186-
error!("error encoding ticket");
187-
Err(Status::internal("error encoding ticket"))
188-
}
189-
}
190-
Err(e) => {
191-
error!("error planning SQL query: {:?}", e);
192-
Err(Status::internal("error planning SQL query"))
193-
}
194-
}
191+
let logical_plan = self
192+
.execution
193+
.statement_to_logical_plan(statement)
194+
.await
195+
.map_err(|e| Status::internal(e.to_string()))?;
196+
197+
debug!("logical planning took: {:?}", start.elapsed());
198+
self.create_flight_info_for_logical_plan(logical_plan, request_id, request)
199+
.await
195200
}
196201
Err(e) => {
197202
error!("error parsing SQL query: {:?}", e);
@@ -228,7 +233,7 @@ impl FlightSqlService for FlightSqlServiceImpl {
228233
_query: CommandGetCatalogs,
229234
request: Request<FlightDescriptor>,
230235
) -> Result<Response<FlightInfo>, Status> {
231-
counter!("requests", "endpoint" => "get_flight_info").increment(1);
236+
counter!("requests", "endpoint" => "get_flight_info_catalogs").increment(1);
232237
let start = Timestamp::now();
233238
let request_id = uuid::Uuid::new_v4();
234239
let query = "SELECT DISTINCT table_catalog FROM information_schema.tables".to_string();
@@ -251,7 +256,7 @@ impl FlightSqlService for FlightSqlServiceImpl {
251256
command: CommandGetDbSchemas,
252257
request: Request<FlightDescriptor>,
253258
) -> Result<Response<FlightInfo>, Status> {
254-
counter!("requests", "endpoint" => "get_flight_info").increment(1);
259+
counter!("requests", "endpoint" => "get_flight_info_schemas").increment(1);
255260
let start = Timestamp::now();
256261
let request_id = uuid::Uuid::new_v4();
257262
let CommandGetDbSchemas {
@@ -264,16 +269,85 @@ impl FlightSqlService for FlightSqlServiceImpl {
264269
(Some(catalog), None) => format!("SELECT DISTINCT table_catalog, table_schema FROM information_schema.tables WHERE table_catalog = '{catalog}' ORDER BY table_catalog, table_schema"),
265270
(None, None) => "SELECT DISTINCT table_catalog, table_schema FROM information_schema.tables ORDER BY table_catalog, table_schema".to_string()
266271
};
267-
println!("QUERY: {query}");
268272
let res = self.create_flight_info(query, request_id, request).await;
269273

270274
// TODO: Move recording to after response is sent to not impact response latency
271275
self.record_request(
272276
start,
273277
Some(request_id.to_string()),
274278
res.as_ref().err(),
275-
"/get_flight_info_catalogs".to_string(),
276-
"get_flight_info_catalogs_latency_ms",
279+
"/get_flight_info_schemas".to_string(),
280+
"get_flight_info_schemas_latency_ms",
281+
)
282+
.await;
283+
res
284+
}
285+
286+
async fn get_flight_info_tables(
287+
&self,
288+
command: CommandGetTables,
289+
request: Request<FlightDescriptor>,
290+
) -> Result<Response<FlightInfo>, Status> {
291+
counter!("requests", "endpoint" => "get_flight_info_schemas").increment(1);
292+
let start = Timestamp::now();
293+
let CommandGetTables {
294+
catalog,
295+
db_schema_filter_pattern,
296+
table_name_filter_pattern,
297+
table_types,
298+
include_schema: _include_schema,
299+
} = command;
300+
let base_query = "SELECT * FROM information_schema.tables";
301+
302+
let mut df = self
303+
.execution
304+
.session_ctx()
305+
.sql(base_query)
306+
.await
307+
.map_err(|e| Status::internal(e.to_string()))?;
308+
309+
if let Some(catalog) = catalog {
310+
df = df
311+
.filter(col("table_catalog").eq(lit(catalog)))
312+
.map_err(|e| Status::internal(e.to_string()))?;
313+
};
314+
if let Some(schema_filter) = db_schema_filter_pattern {
315+
df = df
316+
.filter(col("table_schema").ilike(lit(format!("%{schema_filter}%"))))
317+
.map_err(|e| Status::internal(e.to_string()))?;
318+
};
319+
if let Some(table_filter) = table_name_filter_pattern {
320+
df = df
321+
.filter(col("table_name").ilike(lit(format!("%{table_filter}%"))))
322+
.map_err(|e| Status::internal(e.to_string()))?;
323+
};
324+
if !table_types.is_empty() {
325+
let table_exprs = table_types.iter().map(lit).collect();
326+
df = df
327+
.filter(col("table_type").in_list(table_exprs, false))
328+
.map_err(|e| Status::internal(e.to_string()))?;
329+
};
330+
df = df
331+
.sort(vec![
332+
col("table_catalog").sort(true, false),
333+
col("table_schema").sort(true, false),
334+
col("table_name").sort(true, false),
335+
])
336+
.map_err(|e| Status::internal(e.to_string()))?;
337+
let logical_plan = df.into_unoptimized_plan();
338+
339+
let request_id = uuid::Uuid::new_v4();
340+
let res = self
341+
.create_flight_info_for_logical_plan(logical_plan, request_id, request)
342+
.await;
343+
344+
// TODO: Move recording to after response is sent to not impact response latency
345+
self.record_request(
346+
start,
347+
Some(request_id.to_string()),
348+
res.as_ref().err(),
349+
"/get_flight_info_tables".to_string(),
350+
"get_flight_info_tables_latency_ms",
277351
)
278352
.await;
279353
res

0 commit comments

Comments
 (0)