Skip to content

Commit 6a4d14f

Browse files
Test working
1 parent d60f666 commit 6a4d14f

6 files changed

Lines changed: 79 additions & 14 deletions

File tree

crates/datafusion-app/src/local.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use futures::TryFutureExt;
2727
use log::{debug, error, info};
2828

2929
use crate::catalog::create_app_catalog;
30-
use crate::config::ExecutionConfig;
30+
use crate::config::{ExecutionConfig, ObservabilityConfig};
3131
use crate::{ExecOptions, ExecResult};
3232
use color_eyre::eyre::{self, Result};
3333
use datafusion::common::Result as DFResult;
@@ -72,6 +72,24 @@ pub struct ExecutionContext {
7272
observability: ObservabilityContext,
7373
}
7474

75+
impl Default for ExecutionContext {
76+
fn default() -> Self {
77+
let cfg = SessionConfig::new().with_information_schema(true);
78+
let session_ctx = SessionContext::new_with_config(cfg);
79+
#[cfg(feature = "observability")]
80+
let observability =
81+
ObservabilityContext::try_new(ObservabilityConfig::default(), "test").unwrap();
82+
Self {
83+
config: ExecutionConfig::default(),
84+
session_ctx,
85+
ddl_path: None,
86+
executor: None,
87+
#[cfg(feature = "observability")]
88+
observability,
89+
}
90+
}
91+
}
92+
7593
impl std::fmt::Debug for ExecutionContext {
7694
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
7795
f.debug_struct("ExecutionContext").finish()

src/args.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ pub enum FlightSqlCommand {
132132
/// Executes `GetFlightInfo` and `DoGet` on the provided SQL query
133133
StatementQuery { sql: String },
134134
/// Executes `GetCatalogsFlightInfo` and `DoGet`
135-
Catalogs,
135+
GetCatalogs,
136136
}
137137

138138
#[derive(Clone, Debug, Subcommand)]

src/cli/mod.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
// under the License.
1717
//! [`CliApp`]: Command Line User Interface
1818
19-
use crate::args::Command;
2019
use crate::config::AppConfig;
2120
use crate::db::register_db;
2221
use crate::{args::DftArgs, execution::AppExecution};
@@ -40,8 +39,7 @@ use std::io::Write;
4039
use std::path::{Path, PathBuf};
4140
#[cfg(feature = "flightsql")]
4241
use {
43-
crate::args::FlightSqlCommand,
44-
arrow_flight::FlightInfo,
42+
crate::args::{Command, FlightSqlCommand},
4543
datafusion_app::{
4644
config::{AuthConfig, FlightSQLConfig},
4745
flightsql::FlightSQLContext,
@@ -87,14 +85,24 @@ impl CliApp {
8785

8886
#[cfg(feature = "flightsql")]
8987
async fn handle_flightsql_command(&self, command: FlightSqlCommand) -> color_eyre::Result<()> {
88+
use futures::stream;
89+
9090
match command {
9191
FlightSqlCommand::StatementQuery { sql } => self.exec_from_flightsql(sql, 0).await,
92-
FlightSqlCommand::Catalogs => {
92+
FlightSqlCommand::GetCatalogs => {
9393
let flight_info = self
9494
.app_execution
9595
.flightsql_ctx()
9696
.get_catalogs_flight_info()
9797
.await?;
98+
let streams = self
99+
.app_execution
100+
.flightsql_ctx()
101+
.do_get(flight_info)
102+
.await?;
103+
let flight_batch_stream = stream::select_all(streams);
104+
self.print_any_stream(flight_batch_stream).await;
105+
98106
Ok(())
99107
}
100108
}

src/main.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,14 @@ async fn app_entry_point(cli: DftArgs) -> Result<()> {
6767
}
6868

6969
#[cfg(feature = "flightsql")]
70-
if let Some(Command::ServeFlightSql { .. }) = cli.command {
71-
server::flightsql::try_run(cli.clone(), cfg.clone()).await?;
72-
return Ok(());
70+
{
71+
if matches!(cli.command, Some(Command::FlightSql { .. })) {
72+
cli::try_run(cli, cfg).await?;
73+
return Ok(());
74+
} else if let Some(Command::ServeFlightSql { .. }) = cli.command {
75+
server::flightsql::try_run(cli.clone(), cfg.clone()).await?;
76+
return Ok(());
77+
}
7378
}
7479
#[cfg(feature = "http")]
7580
{
@@ -91,10 +96,7 @@ async fn app_entry_point(cli: DftArgs) -> Result<()> {
9196
}
9297
}
9398

94-
if !cli.files.is_empty()
95-
|| !cli.commands.is_empty()
96-
|| matches!(cli.command, Some(Command::FlightSql { .. }))
97-
{
99+
if !cli.files.is_empty() || !cli.commands.is_empty() {
98100
cli::try_run(cli, cfg).await?;
99101
} else {
100102
tui::try_run(cli, cfg).await?;

src/server/flightsql/service.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,7 @@ impl FlightSqlService for FlightSqlServiceImpl {
229229
let start = Timestamp::now();
230230
let request_id = uuid::Uuid::new_v4();
231231
let query = "SELECT DISTINCT table_catalog FROM information_schema.tables".to_string();
232+
// let query = "SELECT DISTINCT table_catalog FROM information_schema.tables;".to_string();
232233
let res = self
233234
.get_flight_info_statement_handler(query, request_id, request)
234235
.await;

tests/extension_cases/flightsql.rs

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,12 @@
1818
use std::{io::Read, time::Duration};
1919

2020
use assert_cmd::Command;
21-
use datafusion_dft::test_utils::fixture::{TestFixture, TestFlightSqlServiceImpl};
21+
use datafusion_app::local::ExecutionContext;
22+
use datafusion_dft::{
23+
execution::AppExecution,
24+
server::flightsql::service::FlightSqlServiceImpl,
25+
test_utils::fixture::{TestFixture, TestFlightSqlServiceImpl},
26+
};
2227

2328
use crate::{
2429
cli_cases::{contains_str, sql_in_file},
@@ -665,3 +670,34 @@ async fn test_flightsql_query_command() {
665670

666671
fixture.shutdown_and_wait().await;
667672
}
673+
674+
#[tokio::test]
675+
async fn test_flightsql_get_catalogs() {
676+
let ctx = ExecutionContext::default();
677+
let exec = AppExecution::new(ctx);
678+
let test_server = FlightSqlServiceImpl::new(exec);
679+
let fixture = TestFixture::new(test_server.service(), "127.0.0.1:50051").await;
680+
681+
let assert = tokio::task::spawn_blocking(|| {
682+
Command::cargo_bin("dft")
683+
.unwrap()
684+
.arg("flightsql")
685+
.arg("get-catalogs")
686+
.timeout(Duration::from_secs(5))
687+
.assert()
688+
.success()
689+
})
690+
.await
691+
.unwrap();
692+
693+
let expected = r#"
694+
+---------------+
695+
| table_catalog |
696+
+---------------+
697+
| datafusion |
698+
+---------------+"#;
699+
700+
assert.stdout(contains_str(expected));
701+
702+
fixture.shutdown_and_wait().await;
703+
}

0 commit comments

Comments
 (0)