Skip to content

Commit 824f642

Browse files
authored
perf: don't construct a tokio runtime for each query (#1226)
1 parent 10aef7b commit 824f642

File tree

6 files changed

+53
-44
lines changed

6 files changed

+53
-44
lines changed

src/alerts/alerts_utils.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use tracing::trace;
3131

3232
use crate::{
3333
alerts::AggregateCondition,
34+
parseable::PARSEABLE,
3435
query::{TableScanVisitor, QUERY_SESSION},
3536
rbac::{
3637
map::SessionKey,
@@ -137,8 +138,9 @@ async fn execute_base_query(
137138
AlertError::CustomError(format!("Table name not found in query- {}", original_query))
138139
})?;
139140

141+
let time_partition = PARSEABLE.get_stream(&stream_name)?.get_time_partition();
140142
query
141-
.get_dataframe(stream_name)
143+
.get_dataframe(time_partition.as_ref())
142144
.await
143145
.map_err(|err| AlertError::CustomError(err.to_string()))
144146
}

src/alerts/mod.rs

+7-5
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use ulid::Ulid;
3737
pub mod alerts_utils;
3838
pub mod target;
3939

40-
use crate::parseable::PARSEABLE;
40+
use crate::parseable::{StreamNotFound, PARSEABLE};
4141
use crate::query::{TableScanVisitor, QUERY_SESSION};
4242
use crate::rbac::map::SessionKey;
4343
use crate::storage;
@@ -514,17 +514,16 @@ impl AlertConfig {
514514

515515
// for now proceed in a similar fashion as we do in query
516516
// TODO: in case of multiple table query does the selection of time partition make a difference? (especially when the tables don't have overlapping data)
517-
let stream_name = if let Some(stream_name) = query.first_table_name() {
518-
stream_name
519-
} else {
517+
let Some(stream_name) = query.first_table_name() else {
520518
return Err(AlertError::CustomError(format!(
521519
"Table name not found in query- {}",
522520
self.query
523521
)));
524522
};
525523

524+
let time_partition = PARSEABLE.get_stream(&stream_name)?.get_time_partition();
526525
let base_df = query
527-
.get_dataframe(stream_name)
526+
.get_dataframe(time_partition.as_ref())
528527
.await
529528
.map_err(|err| AlertError::CustomError(err.to_string()))?;
530529

@@ -704,6 +703,8 @@ pub enum AlertError {
704703
CustomError(String),
705704
#[error("Invalid State Change: {0}")]
706705
InvalidStateChange(String),
706+
#[error("{0}")]
707+
StreamNotFound(#[from] StreamNotFound),
707708
}
708709

709710
impl actix_web::ResponseError for AlertError {
@@ -717,6 +718,7 @@ impl actix_web::ResponseError for AlertError {
717718
Self::DatafusionError(_) => StatusCode::INTERNAL_SERVER_ERROR,
718719
Self::CustomError(_) => StatusCode::BAD_REQUEST,
719720
Self::InvalidStateChange(_) => StatusCode::BAD_REQUEST,
721+
Self::StreamNotFound(_) => StatusCode::NOT_FOUND,
720722
}
721723
}
722724

src/handlers/airplane.rs

+4-8
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ use crate::handlers::http::query::{into_query, update_schema_when_distributed};
3838
use crate::handlers::livetail::cross_origin_config;
3939
use crate::metrics::QUERY_EXECUTE_TIME;
4040
use crate::parseable::PARSEABLE;
41-
use crate::query::{TableScanVisitor, QUERY_SESSION};
41+
use crate::query::{execute, TableScanVisitor, QUERY_SESSION};
4242
use crate::utils::arrow::flight::{
4343
append_temporary_events, get_query_from_ticket, into_flight_data, run_do_get_rpc,
4444
send_to_ingester,
@@ -216,13 +216,9 @@ impl FlightService for AirServiceImpl {
216216
})?;
217217
let time = Instant::now();
218218

219-
let stream_name_clone = stream_name.clone();
220-
let (records, _) =
221-
match tokio::task::spawn_blocking(move || query.execute(stream_name_clone)).await {
222-
Ok(Ok((records, fields))) => (records, fields),
223-
Ok(Err(e)) => return Err(Status::internal(e.to_string())),
224-
Err(err) => return Err(Status::internal(err.to_string())),
225-
};
219+
let (records, _) = execute(query, &stream_name)
220+
.await
221+
.map_err(|err| Status::internal(err.to_string()))?;
226222

227223
/*
228224
* INFO: No returning the schema with the data.

src/handlers/http/query.rs

+6-15
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
use actix_web::http::header::ContentType;
2020
use actix_web::web::{self, Json};
2121
use actix_web::{FromRequest, HttpRequest, HttpResponse, Responder};
22-
use arrow_array::RecordBatch;
2322
use chrono::{DateTime, Utc};
2423
use datafusion::common::tree_node::TreeNode;
2524
use datafusion::error::DataFusionError;
@@ -40,9 +39,9 @@ use crate::handlers::http::fetch_schema;
4039
use crate::event::commit_schema;
4140
use crate::metrics::QUERY_EXECUTE_TIME;
4241
use crate::option::Mode;
43-
use crate::parseable::PARSEABLE;
42+
use crate::parseable::{StreamNotFound, PARSEABLE};
4443
use crate::query::error::ExecuteError;
45-
use crate::query::{CountsRequest, CountsResponse, Query as LogicalQuery};
44+
use crate::query::{execute, CountsRequest, CountsResponse, Query as LogicalQuery};
4645
use crate::query::{TableScanVisitor, QUERY_SESSION};
4746
use crate::rbac::Users;
4847
use crate::response::QueryResponse;
@@ -131,7 +130,8 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
131130

132131
return Ok(HttpResponse::Ok().json(response));
133132
}
134-
let (records, fields) = execute_query(query, table_name.clone()).await?;
133+
134+
let (records, fields) = execute(query, &table_name).await?;
135135

136136
let response = QueryResponse {
137137
records,
@@ -150,17 +150,6 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
150150
Ok(response)
151151
}
152152

153-
async fn execute_query(
154-
query: LogicalQuery,
155-
stream_name: String,
156-
) -> Result<(Vec<RecordBatch>, Vec<String>), QueryError> {
157-
match tokio::task::spawn_blocking(move || query.execute(stream_name)).await {
158-
Ok(Ok(result)) => Ok(result),
159-
Ok(Err(e)) => Err(QueryError::Execute(e)),
160-
Err(e) => Err(QueryError::Anyhow(anyhow::Error::msg(e.to_string()))),
161-
}
162-
}
163-
164153
pub async fn get_counts(
165154
req: HttpRequest,
166155
counts_request: Json<CountsRequest>,
@@ -330,6 +319,8 @@ Description: {0}"#
330319
ActixError(#[from] actix_web::Error),
331320
#[error("Error: {0}")]
332321
Anyhow(#[from] anyhow::Error),
322+
#[error("Error: {0}")]
323+
StreamNotFound(#[from] StreamNotFound),
333324
}
334325

335326
impl actix_web::ResponseError for QueryError {

src/parseable/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use http::{header::CONTENT_TYPE, HeaderName, HeaderValue, StatusCode};
2828
use once_cell::sync::Lazy;
2929
pub use staging::StagingError;
3030
use streams::StreamRef;
31-
pub use streams::{StreamNotFound, Streams};
31+
pub use streams::{Stream, StreamNotFound, Streams};
3232
use tracing::error;
3333

3434
#[cfg(feature = "kafka")]

src/query/mod.rs

+32-14
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ use std::ops::Bound;
4242
use std::sync::Arc;
4343
use stream_schema_provider::collect_manifest_files;
4444
use sysinfo::System;
45+
use tokio::runtime::Runtime;
4546

4647
use self::error::ExecuteError;
4748
use self::stream_schema_provider::GlobalSchemaProvider;
@@ -60,6 +61,24 @@ use crate::utils::time::TimeRange;
6061
pub static QUERY_SESSION: Lazy<SessionContext> =
6162
Lazy::new(|| Query::create_session_context(PARSEABLE.storage()));
6263

64+
/// Dedicated multi-threaded runtime to run all queries on
65+
pub static QUERY_RUNTIME: Lazy<Runtime> =
66+
Lazy::new(|| Runtime::new().expect("Runtime should be constructible"));
67+
68+
69+
/// This function executes a query on the dedicated runtime, ensuring that the query is not isolated to a single thread/CPU
70+
/// at a time and has access to the entire thread pool, enabling better concurrent processing, and thus quicker results.
71+
pub async fn execute(
72+
query: Query,
73+
stream_name: &str,
74+
) -> Result<(Vec<RecordBatch>, Vec<String>), ExecuteError> {
75+
let time_partition = PARSEABLE.get_stream(stream_name)?.get_time_partition();
76+
QUERY_RUNTIME
77+
.spawn(async move { query.execute(time_partition.as_ref()).await })
78+
.await
79+
.expect("The Join should have been successful")
80+
}
81+
6382
// A query request by client
6483
#[derive(Debug)]
6584
pub struct Query {
@@ -129,15 +148,12 @@ impl Query {
129148
SessionContext::new_with_state(state)
130149
}
131150

132-
#[tokio::main(flavor = "multi_thread")]
133151
pub async fn execute(
134152
&self,
135-
stream_name: String,
153+
time_partition: Option<&String>,
136154
) -> Result<(Vec<RecordBatch>, Vec<String>), ExecuteError> {
137-
let time_partition = PARSEABLE.get_stream(&stream_name)?.get_time_partition();
138-
139155
let df = QUERY_SESSION
140-
.execute_logical_plan(self.final_logical_plan(&time_partition))
156+
.execute_logical_plan(self.final_logical_plan(time_partition))
141157
.await?;
142158

143159
let fields = df
@@ -153,21 +169,23 @@ impl Query {
153169
}
154170

155171
let results = df.collect().await?;
172+
156173
Ok((results, fields))
157174
}
158175

159-
pub async fn get_dataframe(&self, stream_name: String) -> Result<DataFrame, ExecuteError> {
160-
let time_partition = PARSEABLE.get_stream(&stream_name)?.get_time_partition();
161-
176+
pub async fn get_dataframe(
177+
&self,
178+
time_partition: Option<&String>,
179+
) -> Result<DataFrame, ExecuteError> {
162180
let df = QUERY_SESSION
163-
.execute_logical_plan(self.final_logical_plan(&time_partition))
181+
.execute_logical_plan(self.final_logical_plan(time_partition))
164182
.await?;
165183

166184
Ok(df)
167185
}
168186

169187
/// return logical plan with all time filters applied through
170-
fn final_logical_plan(&self, time_partition: &Option<String>) -> LogicalPlan {
188+
fn final_logical_plan(&self, time_partition: Option<&String>) -> LogicalPlan {
171189
// see https://github.com/apache/arrow-datafusion/pull/8400
172190
// this can be eliminated in later version of datafusion but with slight caveat
173191
// transform cannot modify stringified plans by itself
@@ -487,7 +505,7 @@ fn transform(
487505
plan: LogicalPlan,
488506
start_time: NaiveDateTime,
489507
end_time: NaiveDateTime,
490-
time_partition: &Option<String>,
508+
time_partition: Option<&String>,
491509
) -> Transformed<LogicalPlan> {
492510
plan.transform(&|plan| match plan {
493511
LogicalPlan::TableScan(table) => {
@@ -545,7 +563,7 @@ fn transform(
545563

546564
fn table_contains_any_time_filters(
547565
table: &datafusion::logical_expr::TableScan,
548-
time_partition: &Option<String>,
566+
time_partition: Option<&String>,
549567
) -> bool {
550568
table
551569
.filters
@@ -559,8 +577,8 @@ fn table_contains_any_time_filters(
559577
})
560578
.any(|expr| {
561579
matches!(&*expr.left, Expr::Column(Column { name, .. })
562-
if ((time_partition.is_some() && name == time_partition.as_ref().unwrap()) ||
563-
(!time_partition.is_some() && name == event::DEFAULT_TIMESTAMP_KEY)))
580+
if (time_partition.is_some_and(|field| field == name) ||
581+
(time_partition.is_none() && name == event::DEFAULT_TIMESTAMP_KEY)))
564582
})
565583
}
566584

0 commit comments

Comments
 (0)