Skip to content

Commit 29a7437

Browse files
Expose Substrait execution metadata
1 parent 5d24f57 commit 29a7437

3 files changed

Lines changed: 240 additions & 17 deletions

File tree

quickwit/quickwit-df-core/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ pub use data_source::{
4343
};
4444
pub use datafusion::execution::SendableRecordBatchStream;
4545
pub use datafusion_distributed::{Worker, WorkerResolver};
46-
pub use service::DataFusionService;
46+
pub use service::{DataFusionService, SubstraitExecution, SubstraitExecutionMetadata};
4747
pub use session::DataFusionSessionBuilder;
4848
pub use task_estimator::DataSourceExecPartitionEstimator;
4949
pub use worker::build_worker;

quickwit/quickwit-df-core/src/service.rs

Lines changed: 154 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -44,15 +44,41 @@
4444
4545
use std::collections::HashMap;
4646
use std::sync::Arc;
47+
use std::time::{Duration, Instant};
4748

4849
use datafusion::error::{DataFusionError, Result as DFResult};
4950
use datafusion::execution::SendableRecordBatchStream;
51+
use datafusion::physical_plan::ExecutionPlan;
52+
use datafusion::physical_plan::display::DisplayableExecutionPlan;
5053
use datafusion::prelude::SessionContext;
5154
use datafusion::sql::sqlparser::dialect::dialect_from_str;
5255
use datafusion::sql::sqlparser::tokenizer::{Token, Tokenizer};
5356

5457
use crate::session::DataFusionSessionBuilder;
5558

59+
pub struct SubstraitExecution {
60+
pub stream: SendableRecordBatchStream,
61+
pub metadata: SubstraitExecutionMetadata,
62+
}
63+
64+
pub struct SubstraitExecutionMetadata {
65+
pub substrait_plan_json: String,
66+
pub logical_plan: String,
67+
pub physical_plan: Arc<dyn ExecutionPlan>,
68+
pub substrait_decode_duration: Duration,
69+
pub substrait_to_logical_duration: Duration,
70+
pub logical_to_physical_duration: Duration,
71+
pub stream_creation_duration: Duration,
72+
}
73+
74+
impl SubstraitExecutionMetadata {
75+
pub fn physical_plan_display(&self) -> String {
76+
DisplayableExecutionPlan::with_metrics(self.physical_plan.as_ref())
77+
.indent(true)
78+
.to_string()
79+
}
80+
}
81+
5682
/// Split a SQL string into top-level statements using the configured dialect's
5783
/// tokenizer rather than raw string splitting.
5884
pub fn split_sql_statements(ctx: &SessionContext, sql: &str) -> DFResult<Vec<String>> {
@@ -144,6 +170,17 @@ impl DataFusionService {
144170
plan_bytes: &[u8],
145171
properties: &HashMap<String, String>,
146172
) -> DFResult<SendableRecordBatchStream> {
173+
Ok(self
174+
.execute_substrait_with_metadata(plan_bytes, properties)
175+
.await?
176+
.stream)
177+
}
178+
179+
pub async fn execute_substrait_with_metadata(
180+
&self,
181+
plan_bytes: &[u8],
182+
properties: &HashMap<String, String>,
183+
) -> DFResult<SubstraitExecution> {
147184
tracing::info!(
148185
plan_bytes_len = plan_bytes.len(),
149186
num_properties = properties.len(),
@@ -152,10 +189,23 @@ impl DataFusionService {
152189
use datafusion_substrait::substrait::proto::Plan;
153190
use prost::Message;
154191

192+
let substrait_decode_start = Instant::now();
155193
let plan = Plan::decode(plan_bytes)
156194
.map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?;
195+
let substrait_decode_duration = substrait_decode_start.elapsed();
196+
let substrait_plan_json = serde_json::to_string(&plan).map_err(|e| {
197+
datafusion::error::DataFusionError::Plan(format!(
198+
"failed to serialize Substrait plan JSON: {e}"
199+
))
200+
})?;
157201

158-
self.execute_substrait_plan(&plan, properties).await
202+
self.execute_substrait_plan_with_metadata(
203+
&plan,
204+
properties,
205+
substrait_plan_json,
206+
substrait_decode_duration,
207+
)
208+
.await
159209
}
160210

161211
/// Execute a Substrait plan from its proto3 JSON representation.
@@ -171,27 +221,60 @@ impl DataFusionService {
171221
plan_json: &str,
172222
properties: &HashMap<String, String>,
173223
) -> DFResult<SendableRecordBatchStream> {
224+
Ok(self
225+
.execute_substrait_json_with_metadata(plan_json, properties)
226+
.await?
227+
.stream)
228+
}
229+
230+
pub async fn execute_substrait_json_with_metadata(
231+
&self,
232+
plan_json: &str,
233+
properties: &HashMap<String, String>,
234+
) -> DFResult<SubstraitExecution> {
174235
use datafusion_substrait::substrait::proto::Plan;
175236

237+
let substrait_decode_start = Instant::now();
176238
let plan: Plan = serde_json::from_str(plan_json).map_err(|e| {
177239
datafusion::error::DataFusionError::Plan(format!("invalid Substrait plan JSON: {e}"))
178240
})?;
241+
let substrait_decode_duration = substrait_decode_start.elapsed();
179242

180-
self.execute_substrait_plan(&plan, properties).await
243+
self.execute_substrait_plan_with_metadata(
244+
&plan,
245+
properties,
246+
plan_json.to_string(),
247+
substrait_decode_duration,
248+
)
249+
.await
181250
}
182251

183-
async fn execute_substrait_plan(
252+
async fn execute_substrait_plan_with_metadata(
184253
&self,
185254
plan: &datafusion_substrait::substrait::proto::Plan,
186255
properties: &HashMap<String, String>,
187-
) -> DFResult<SendableRecordBatchStream> {
256+
substrait_plan_json: String,
257+
substrait_decode_duration: Duration,
258+
) -> DFResult<SubstraitExecution> {
188259
let ctx = self.builder.build_session_with_properties(properties)?;
189-
crate::substrait::execute_substrait_plan_streaming(
260+
let prepared = crate::substrait::execute_substrait_plan_streaming_with_metadata(
190261
plan,
191262
&ctx,
192263
self.builder.substrait_extensions(),
193264
)
194-
.await
265+
.await?;
266+
Ok(SubstraitExecution {
267+
stream: prepared.stream,
268+
metadata: SubstraitExecutionMetadata {
269+
substrait_plan_json,
270+
logical_plan: prepared.metadata.logical_plan,
271+
physical_plan: prepared.metadata.physical_plan,
272+
substrait_decode_duration,
273+
substrait_to_logical_duration: prepared.metadata.substrait_to_logical_duration,
274+
logical_to_physical_duration: prepared.metadata.logical_to_physical_duration,
275+
stream_creation_duration: prepared.metadata.stream_creation_duration,
276+
},
277+
})
195278
}
196279

197280
/// Like [`execute_substrait`], but returns the EXPLAIN output instead of
@@ -202,11 +285,37 @@ impl DataFusionService {
202285
plan_bytes: &[u8],
203286
properties: &HashMap<String, String>,
204287
) -> DFResult<SendableRecordBatchStream> {
288+
Ok(self
289+
.explain_substrait_with_metadata(plan_bytes, properties)
290+
.await?
291+
.stream)
292+
}
293+
294+
pub async fn explain_substrait_with_metadata(
295+
&self,
296+
plan_bytes: &[u8],
297+
properties: &HashMap<String, String>,
298+
) -> DFResult<SubstraitExecution> {
205299
use datafusion_substrait::substrait::proto::Plan;
206300
use prost::Message;
301+
302+
let substrait_decode_start = Instant::now();
207303
let plan = Plan::decode(plan_bytes)
208304
.map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?;
209-
self.explain_substrait_plan(&plan, properties).await
305+
let substrait_decode_duration = substrait_decode_start.elapsed();
306+
let substrait_plan_json = serde_json::to_string(&plan).map_err(|e| {
307+
datafusion::error::DataFusionError::Plan(format!(
308+
"failed to serialize Substrait plan JSON: {e}"
309+
))
310+
})?;
311+
312+
self.explain_substrait_plan_with_metadata(
313+
&plan,
314+
properties,
315+
substrait_plan_json,
316+
substrait_decode_duration,
317+
)
318+
.await
210319
}
211320

212321
/// JSON variant of [`explain_substrait`] — same semantics, proto3-JSON input.
@@ -215,25 +324,58 @@ impl DataFusionService {
215324
plan_json: &str,
216325
properties: &HashMap<String, String>,
217326
) -> DFResult<SendableRecordBatchStream> {
327+
Ok(self
328+
.explain_substrait_json_with_metadata(plan_json, properties)
329+
.await?
330+
.stream)
331+
}
332+
333+
pub async fn explain_substrait_json_with_metadata(
334+
&self,
335+
plan_json: &str,
336+
properties: &HashMap<String, String>,
337+
) -> DFResult<SubstraitExecution> {
218338
use datafusion_substrait::substrait::proto::Plan;
339+
let substrait_decode_start = Instant::now();
219340
let plan: Plan = serde_json::from_str(plan_json).map_err(|e| {
220341
datafusion::error::DataFusionError::Plan(format!("invalid Substrait plan JSON: {e}"))
221342
})?;
222-
self.explain_substrait_plan(&plan, properties).await
343+
let substrait_decode_duration = substrait_decode_start.elapsed();
344+
self.explain_substrait_plan_with_metadata(
345+
&plan,
346+
properties,
347+
plan_json.to_string(),
348+
substrait_decode_duration,
349+
)
350+
.await
223351
}
224352

225-
async fn explain_substrait_plan(
353+
async fn explain_substrait_plan_with_metadata(
226354
&self,
227355
plan: &datafusion_substrait::substrait::proto::Plan,
228356
properties: &HashMap<String, String>,
229-
) -> DFResult<SendableRecordBatchStream> {
357+
substrait_plan_json: String,
358+
substrait_decode_duration: Duration,
359+
) -> DFResult<SubstraitExecution> {
230360
let ctx = self.builder.build_session_with_properties(properties)?;
231-
crate::substrait::explain_substrait_plan_streaming(
361+
let prepared = crate::substrait::explain_substrait_plan_streaming_with_metadata(
232362
plan,
233363
&ctx,
234364
self.builder.substrait_extensions(),
235365
)
236-
.await
366+
.await?;
367+
Ok(SubstraitExecution {
368+
stream: prepared.stream,
369+
metadata: SubstraitExecutionMetadata {
370+
substrait_plan_json,
371+
logical_plan: prepared.metadata.logical_plan,
372+
physical_plan: prepared.metadata.physical_plan,
373+
substrait_decode_duration,
374+
substrait_to_logical_duration: prepared.metadata.substrait_to_logical_duration,
375+
logical_to_physical_duration: prepared.metadata.logical_to_physical_duration,
376+
stream_creation_duration: prepared.metadata.stream_creation_duration,
377+
},
378+
})
237379
}
238380

239381
/// Execute one or more SQL statements from a single SQL string.

0 commit comments

Comments
 (0)