Skip to content

Commit b86ea0b

Browse files
TPCH queries (#322)
1 parent 55f066e commit b86ea0b

5 files changed

Lines changed: 776 additions & 25 deletions

File tree

src/server/http/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
// under the License.
1717

1818
mod router;
19+
mod tpch;
1920

2021
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
2122

src/server/http/router.rs

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,12 @@ use log::error;
3232
use serde::{Deserialize, Serialize};
3333
use tokio_stream::StreamExt;
3434
use tower_http::{timeout::TimeoutLayer, trace::TraceLayer};
35-
use tracing::info;
35+
use tracing::debug;
3636

3737
use crate::{config::HttpServerConfig, execution::AppExecution};
3838

39+
use super::tpch;
40+
3941
#[derive(Debug)]
4042
struct ExecRequest {
4143
path: String,
@@ -67,6 +69,7 @@ pub fn create_router(execution: AppExecution, config: HttpServerConfig) -> Route
6769
)
6870
.route("/sql", post(post_sql_handler))
6971
.route("/catalog", get(get_catalog_handler))
72+
.route("/tpch/:number", get(get_tpch_query_handler))
7073
.route("/table/:catalog/:schema/:table", get(get_table_handler))
7174
.layer((
7275
TraceLayer::new_for_http(),
@@ -167,12 +170,34 @@ async fn get_table_handler(
167170
create_response(&state, req, opts).await
168171
}
169172

173+
#[derive(Deserialize, Serialize)]
174+
struct GetTpchPathParams {
175+
number: usize,
176+
}
177+
178+
async fn get_tpch_query_handler(
179+
state: State<ExecutionState>,
180+
Path(path): Path<GetTpchPathParams>,
181+
OriginalUri(uri): OriginalUri,
182+
) -> Response {
183+
if let Some(sql) = tpch::sql_for_tpch_query(path.number) {
184+
let req = ExecRequest {
185+
path: uri.path().to_string(),
186+
sql: sql.to_string(),
187+
};
188+
let opts = ExecOptions::new(None, false);
189+
create_response(&state, req, opts).await
190+
} else {
191+
(StatusCode::BAD_REQUEST, "Unknown TPC-H query number").into_response()
192+
}
193+
}
194+
170195
async fn response_for_sql(
171196
State(state): &State<ExecutionState>,
172197
sql: String,
173198
opts: ExecOptions,
174199
) -> (Response, ResponseDetails) {
175-
info!("Executing sql: {sql}");
200+
debug!("Executing sql: {sql}");
176201
match state.execution.execute_sql_with_opts(&sql, opts).await {
177202
Ok(ExecResult::RecordBatchStream(stream)) => batch_stream_to_response(stream).await,
178203
Ok(_) => {

0 commit comments

Comments
 (0)