Skip to content

Commit ee2eefa

Browse files
authored
New crate datafusion-flightsql (spiceai#10201)
* add datafusion-flightsql * formatting * clippy * implement do_put; handle handshake/sessions
1 parent 2c2c43c commit ee2eefa

26 files changed

Lines changed: 3521 additions & 0 deletions

Cargo.lock

Lines changed: 28 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ members = [
3535
"crates/runtime-api-types",
3636
"crates/runtime-auth",
3737
"crates/runtime-async",
38+
"crates/datafusion-flightsql",
3839
"crates/runtime-datafusion",
3940
"crates/runtime-datafusion-index",
4041
"crates/runtime-datafusion-udfs",
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
[package]
2+
name = "datafusion-flightsql"
3+
version = "0.1.0"
4+
edition.workspace = true
5+
rust-version = "1.93.1"
6+
license = "Apache-2.0"
7+
description = "Arrow Flight SQL service backed by an arbitrary DataFusion SessionContext"
8+
9+
[dependencies]
10+
arrow = { workspace = true }
11+
arrow-flight = { workspace = true, features = ["flight-sql-experimental"] }
12+
arrow-ipc = { workspace = true }
13+
arrow-schema = { workspace = true }
14+
arrow_tools = { path = "../arrow_tools" }
15+
async-stream = { workspace = true }
16+
bytes = { workspace = true }
17+
datafusion = { workspace = true }
18+
futures = { workspace = true }
19+
http = { workspace = true }
20+
moka = { workspace = true, features = ["sync"] }
21+
postcard = { workspace = true }
22+
prost = { workspace = true }
23+
serde = { workspace = true, features = ["derive"] }
24+
snafu = { workspace = true }
25+
tokio-stream = { workspace = true }
26+
tonic = { workspace = true }
27+
tracing = { workspace = true }
28+
uuid = { workspace = true, features = ["v7"] }
29+
util = { path = "../util" }
30+
31+
[dev-dependencies]
32+
tokio = { workspace = true }
33+
tracing-subscriber = { workspace = true }
34+
35+
[[example]]
36+
name = "serve_and_query"
37+
required-features = []
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*
2+
Copyright 2026 The Spice.ai OSS Authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
https://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
//! Starts a Flight SQL server backed by an in-memory `DataFusion`
18+
//! [`SessionContext`] and blocks until Ctrl-C.
19+
//!
20+
//! ```text
21+
//! cargo run -p datafusion-flightsql --example serve_and_query
22+
//! ```
23+
//!
24+
//! Once running, connect with any Arrow Flight SQL client. For example,
25+
//! using the Python `flightsql-dbapi` library:
26+
//!
27+
//! ```python
28+
//! from flightsql import FlightSQLClient
29+
//! client = FlightSQLClient(host="localhost", port=50051, insecure=True)
30+
//! reader = client.execute("SELECT * FROM employees ORDER BY salary DESC")
31+
//! print(reader.read_all().to_pandas())
32+
//! ```
33+
//!
34+
//! Or with the `arrow-flight` Rust client:
35+
//!
36+
//! ```rust,no_run
37+
//! use arrow_flight::sql::client::FlightSqlServiceClient;
38+
//! use futures::TryStreamExt;
39+
//! use tonic::transport::Channel;
40+
//!
41+
//! #[tokio::main]
42+
//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
43+
//! let channel = Channel::from_static("http://localhost:50051").connect().await?;
44+
//! let mut client = FlightSqlServiceClient::new(channel);
45+
//! client.handshake("", "").await?;
46+
//!
47+
//! let mut stmt = client.prepare("SELECT * FROM employees".to_string(), None).await?;
48+
//! let info = stmt.execute().await?;
49+
//! for endpoint in info.endpoint {
50+
//! if let Some(ticket) = endpoint.ticket {
51+
//! let batches: Vec<_> = client.do_get(ticket).await?.try_collect().await?;
52+
//! println!("{}", arrow::util::pretty::pretty_format_batches(&batches)?);
53+
//! }
54+
//! }
55+
//! Ok(())
56+
//! }
57+
//! ```
58+
59+
use std::sync::Arc;
60+
61+
use arrow::array::{Int32Array, StringArray};
62+
use arrow::datatypes::{DataType, Field, Schema};
63+
use arrow::record_batch::RecordBatch;
64+
use datafusion::prelude::SessionContext;
65+
use datafusion_flightsql::FlightSqlService;
66+
use tonic::transport::Server;
67+
68+
const ADDR: &str = "0.0.0.0:50051";
69+
70+
#[tokio::main]
71+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
72+
tracing_subscriber::fmt()
73+
.with_env_filter("datafusion_flightsql=debug,info")
74+
.init();
75+
76+
// ── Build the SessionContext ──────────────────────────────────────────────
77+
78+
let ctx = Arc::new(SessionContext::new());
79+
80+
let batch = RecordBatch::try_new(
81+
Arc::new(Schema::new(vec![
82+
Field::new("id", DataType::Int32, false),
83+
Field::new("name", DataType::Utf8, false),
84+
Field::new("department", DataType::Utf8, false),
85+
Field::new("salary", DataType::Int32, false),
86+
])),
87+
vec![
88+
Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])),
89+
Arc::new(StringArray::from(vec![
90+
"Alice", "Bob", "Carol", "Dave", "Eve",
91+
])),
92+
Arc::new(StringArray::from(vec![
93+
"Engineering",
94+
"Marketing",
95+
"Engineering",
96+
"HR",
97+
"Engineering",
98+
])),
99+
Arc::new(Int32Array::from(vec![
100+
120_000, 85_000, 110_000, 75_000, 130_000,
101+
])),
102+
],
103+
)?;
104+
105+
ctx.register_batch("employees", batch)?;
106+
tracing::info!("Registered 'employees' table");
107+
108+
// You can register additional tables, views, or UDFs on `ctx` here before
109+
// the server starts. Everything registered on the context is immediately
110+
// queryable by connected clients.
111+
112+
// ── Start the Flight SQL server ───────────────────────────────────────────
113+
114+
let addr = ADDR.parse()?;
115+
tracing::info!("Flight SQL server listening on {addr} (press Ctrl-C to stop)");
116+
117+
Server::builder()
118+
.add_service(FlightSqlService::new(ctx).into_server())
119+
.serve_with_shutdown(addr, shutdown_signal())
120+
.await?;
121+
122+
tracing::info!("Server stopped");
123+
Ok(())
124+
}
125+
126+
async fn shutdown_signal() {
127+
tokio::signal::ctrl_c()
128+
.await
129+
.expect("failed to listen for Ctrl-C");
130+
tracing::info!("Received Ctrl-C, shutting down");
131+
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
Copyright 2026 The Spice.ai OSS Authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
https://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
//! `DoAction` / `ListActions` for Flight SQL prepared statements.
18+
//!
19+
//! Supported actions:
20+
//! - `CreatePreparedStatement`
21+
//! - `ClosePreparedStatement`
22+
23+
use std::sync::Arc;
24+
25+
use arrow_flight::{
26+
Action, ActionType,
27+
flight_service_server::FlightService,
28+
sql::{self, Any, ProstMessageExt},
29+
};
30+
use datafusion::prelude::SessionContext;
31+
use futures::stream::BoxStream;
32+
use prost::Message;
33+
use tonic::{Request, Response, Status};
34+
35+
use crate::{FlightSqlService, flightsql::prepared_statement_query, to_tonic_err};
36+
37+
pub(crate) async fn do_action(
38+
ctx: Arc<SessionContext>,
39+
request: Request<Action>,
40+
) -> Result<Response<<FlightSqlService as FlightService>::DoActionStream>, Status> {
41+
let action = request.into_inner();
42+
tracing::trace!("do_action: type={}", action.r#type);
43+
44+
let msg: Any = Any::decode(&*action.body).map_err(to_tonic_err)?;
45+
46+
match sql::Command::try_from(msg).map_err(to_tonic_err)? {
47+
sql::Command::ActionCreatePreparedStatementRequest(stmt) => {
48+
let result =
49+
prepared_statement_query::do_action_create_prepared_statement(ctx, stmt).await?;
50+
let output = futures::stream::iter(vec![Ok(arrow_flight::Result {
51+
body: result.as_any().encode_to_vec().into(),
52+
})]);
53+
Ok(Response::new(
54+
Box::pin(output) as <FlightSqlService as FlightService>::DoActionStream
55+
))
56+
}
57+
sql::Command::ActionClosePreparedStatementRequest(handle) => {
58+
tracing::trace!("close_prepared_statement: {:?}", handle);
59+
// Prepared statement handles are self-contained serialised blobs;
60+
// there is nothing server-side to clean up.
61+
let output = futures::stream::empty();
62+
Ok(Response::new(
63+
Box::pin(output) as <FlightSqlService as FlightService>::DoActionStream
64+
))
65+
}
66+
cmd => Err(Status::invalid_argument(format!(
67+
"unsupported action command: {cmd:?}"
68+
))),
69+
}
70+
}
71+
72+
pub(crate) fn list() -> Response<BoxStream<'static, Result<ActionType, Status>>> {
73+
let actions = vec![
74+
ActionType {
75+
r#type: "CreatePreparedStatement".to_string(),
76+
description: "Creates a reusable prepared statement resource on the server. \
77+
Returns a handle identifying the prepared statement."
78+
.to_string(),
79+
},
80+
ActionType {
81+
r#type: "ClosePreparedStatement".to_string(),
82+
description: "Closes a prepared statement resource on the server.".to_string(),
83+
},
84+
];
85+
Response::new(Box::pin(futures::stream::iter(actions.into_iter().map(Ok))))
86+
}
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
Copyright 2026 The Spice.ai OSS Authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
https://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
use std::sync::Arc;
18+
19+
use arrow_flight::{
20+
Ticket,
21+
flight_service_server::FlightService,
22+
sql::{Any, Command},
23+
};
24+
use datafusion::prelude::SessionContext;
25+
use prost::Message;
26+
use tonic::{Request, Response, Status};
27+
28+
use crate::{FlightSqlService, flightsql, to_tonic_err};
29+
30+
pub(crate) async fn handle(
31+
ctx: Arc<SessionContext>,
32+
request: Request<Ticket>,
33+
) -> Result<Response<<FlightSqlService as FlightService>::DoGetStream>, Status> {
34+
let msg: Any = match Message::decode(&*request.get_ref().ticket) {
35+
Ok(msg) => msg,
36+
Err(_) => return do_get_simple(ctx, request).await,
37+
};
38+
39+
match Command::try_from(msg).map_err(to_tonic_err)? {
40+
Command::CommandStatementQuery(cmd) => flightsql::statement_query::do_get(ctx, cmd).await,
41+
Command::CommandPreparedStatementQuery(cmd) => {
42+
Box::pin(flightsql::prepared_statement_query::do_get(ctx, cmd)).await
43+
}
44+
Command::CommandPreparedStatementUpdate(cmd) => {
45+
Box::pin(flightsql::prepared_statement_update::do_get(ctx, cmd)).await
46+
}
47+
Command::CommandGetCatalogs(cmd) => flightsql::get_catalogs::do_get(&ctx, cmd),
48+
Command::CommandGetDbSchemas(cmd) => flightsql::get_schemas::do_get(&ctx, cmd),
49+
Command::CommandGetTables(cmd) => flightsql::get_tables::do_get(ctx, cmd).await,
50+
Command::CommandGetPrimaryKeys(cmd) => Ok(flightsql::get_primary_keys::do_get(&cmd)),
51+
Command::CommandGetTableTypes(cmd) => flightsql::get_table_types::do_get(cmd),
52+
Command::CommandGetSqlInfo(cmd) => flightsql::get_sql_info::do_get(cmd),
53+
Command::CommandGetXdbcTypeInfo(cmd) => flightsql::get_xdbc_type_info::do_get(cmd),
54+
Command::CommandStatementUpdate(_) => Err(Status::invalid_argument(
55+
"CommandStatementUpdate should be sent via DoPut, not DoGet",
56+
)),
57+
cmd => Err(Status::unimplemented(format!(
58+
"DoGet not implemented for command: {cmd:?}"
59+
))),
60+
}
61+
}
62+
63+
async fn do_get_simple(
64+
ctx: Arc<SessionContext>,
65+
request: Request<Ticket>,
66+
) -> Result<Response<<FlightSqlService as FlightService>::DoGetStream>, Status> {
67+
let ticket = request.into_inner();
68+
let sql = std::str::from_utf8(&ticket.ticket)
69+
.map_err(|e| Status::invalid_argument(format!("invalid ticket: {e}")))?;
70+
71+
tracing::trace!("do_get (plain SQL): {sql}");
72+
73+
let output = FlightSqlService::sql_to_flight_stream(ctx, sql, None).await?;
74+
Ok(Response::new(
75+
Box::pin(output) as <FlightSqlService as FlightService>::DoGetStream
76+
))
77+
}

0 commit comments

Comments
 (0)