Skip to content

Commit 2896bf3

Browse files
authored
Add metrics to SchemaCastScanExec and VirtualExecutionPlan (#143)
1 parent 36b0d03 commit 2896bf3

3 files changed

Lines changed: 29 additions & 4 deletions

File tree

datafusion-federation/src/schema_cast/mod.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use datafusion::arrow::datatypes::SchemaRef;
33
use datafusion::common::Statistics;
44
use datafusion::error::{DataFusionError, Result};
55
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
6+
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
67
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
78
use datafusion::physical_plan::{
89
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
@@ -24,6 +25,7 @@ pub struct SchemaCastScanExec {
2425
input: Arc<dyn ExecutionPlan>,
2526
schema: SchemaRef,
2627
properties: PlanProperties,
28+
metrics_set: ExecutionPlanMetricsSet,
2729
}
2830

2931
impl SchemaCastScanExec {
@@ -41,6 +43,7 @@ impl SchemaCastScanExec {
4143
input,
4244
schema,
4345
properties,
46+
metrics_set: ExecutionPlanMetricsSet::new(),
4447
}
4548
}
4649
}
@@ -101,14 +104,20 @@ impl ExecutionPlan for SchemaCastScanExec {
101104
) -> Result<SendableRecordBatchStream> {
102105
let mut stream = self.input.execute(partition, context)?;
103106
let schema = Arc::clone(&self.schema);
107+
let baseline_metrics = BaselineMetrics::new(&self.metrics_set, partition);
104108

105109
Ok(Box::pin(RecordBatchStreamAdapter::new(
106110
Arc::clone(&schema),
107111
{
108112
stream! {
109113
while let Some(batch) = stream.next().await {
114+
let _timer = baseline_metrics.elapsed_compute().timer();
110115
let batch = record_convert::try_cast_to(batch?, Arc::clone(&schema));
111-
yield batch.map_err(|e| { DataFusionError::External(Box::new(e)) });
116+
let batch = batch.map_err(|e| { DataFusionError::External(Box::new(e)) });
117+
if let Ok(ref b) = batch {
118+
baseline_metrics.output_rows().add(b.num_rows());
119+
}
120+
yield batch;
112121
}
113122
}
114123
},
@@ -118,4 +127,8 @@ impl ExecutionPlan for SchemaCastScanExec {
118127
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
119128
self.input.partition_statistics(partition)
120129
}
130+
131+
fn metrics(&self) -> Option<MetricsSet> {
132+
Some(self.metrics_set.clone_inner())
133+
}
121134
}

datafusion-federation/src/sql/executor.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use datafusion::{
55
common::Statistics,
66
error::Result,
77
logical_expr::LogicalPlan,
8-
physical_plan::SendableRecordBatchStream,
8+
physical_plan::{metrics::MetricsSet, SendableRecordBatchStream},
99
sql::{sqlparser::ast, unparser::dialect::Dialect},
1010
};
1111
use std::sync::Arc;
@@ -56,6 +56,11 @@ pub trait SQLExecutor: Sync + Send {
5656

5757
/// Returns the schema of table_name within this [`SQLExecutor`]
5858
async fn get_table_schema(&self, table_name: &str) -> Result<SchemaRef>;
59+
60+
/// Returns the execution metrics, if available.
61+
fn metrics(&self) -> Option<MetricsSet> {
62+
None
63+
}
5964
}
6065

6166
impl fmt::Debug for dyn SQLExecutor {

datafusion-federation/src/sql/mod.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,18 @@ use analyzer::RewriteTableScanAnalyzer;
1111
use async_trait::async_trait;
1212
use datafusion::{
1313
arrow::datatypes::{Schema, SchemaRef},
14-
common::tree_node::{Transformed, TreeNode},
15-
common::Statistics,
14+
common::{
15+
tree_node::{Transformed, TreeNode},
16+
Statistics,
17+
},
1618
error::{DataFusionError, Result},
1719
execution::{context::SessionState, TaskContext},
1820
logical_expr::{Extension, LogicalPlan},
1921
optimizer::{optimizer::Optimizer, OptimizerConfig, OptimizerRule},
2022
physical_expr::EquivalenceProperties,
2123
physical_plan::{
2224
execution_plan::{Boundedness, EmissionType},
25+
metrics::MetricsSet,
2326
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
2427
SendableRecordBatchStream,
2528
},
@@ -365,6 +368,10 @@ impl ExecutionPlan for VirtualExecutionPlan {
365368
fn partition_statistics(&self, _partition: Option<usize>) -> Result<Statistics> {
366369
Ok(self.statistics.clone())
367370
}
371+
372+
fn metrics(&self) -> Option<MetricsSet> {
373+
self.executor.metrics()
374+
}
368375
}
369376

370377
#[cfg(test)]

0 commit comments

Comments
 (0)