Skip to content

Commit 13974c2

Browse files
askalt0x501D
authored andcommitted
physical_plan: move metrics to task context
To share physical plans across executions we need to place metrics in some other place. This patch moves them to the task context. When plan is scanned metrics set is registered in the context by the plan address. To display a plan with metrics one should provide the task context, where metrics associated with the plan are stored. Also, fmt errors are fixed and applied several clippy suggestions.
1 parent 8c80357 commit 13974c2

File tree

94 files changed

+716
-742
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

94 files changed

+716
-742
lines changed

benchmarks/src/tpch/run.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -219,11 +219,12 @@ impl RunOpt {
219219
displayable(physical_plan.as_ref()).indent(true)
220220
);
221221
}
222-
let result = collect(physical_plan.clone(), state.task_ctx()).await?;
222+
let task_ctx = state.task_ctx();
223+
let result = collect(physical_plan.clone(), Arc::clone(&task_ctx)).await?;
223224
if debug {
224225
println!(
225226
"=== Physical plan with metrics ===\n{}\n",
226-
DisplayableExecutionPlan::with_metrics(physical_plan.as_ref())
227+
DisplayableExecutionPlan::with_metrics(physical_plan.as_ref(), task_ctx)
227228
.indent(true)
228229
);
229230
if !result.is_empty() {

datafusion-examples/examples/advanced_parquet_index.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use datafusion::datasource::physical_plan::{
2828
parquet::ParquetFileReaderFactory, FileMeta, FileScanConfig,
2929
};
3030
use datafusion::datasource::TableProvider;
31+
use datafusion::execution::metrics::ExecutionPlanMetricsSet;
3132
use datafusion::execution::object_store::ObjectStoreUrl;
3233
use datafusion::parquet::arrow::arrow_reader::{
3334
ArrowReaderOptions, ParquetRecordBatchReaderBuilder, RowSelection, RowSelector,
@@ -39,7 +40,6 @@ use datafusion::parquet::file::properties::{EnabledStatistics, WriterProperties}
3940
use datafusion::parquet::schema::types::ColumnPath;
4041
use datafusion::physical_expr::PhysicalExpr;
4142
use datafusion::physical_optimizer::pruning::PruningPredicate;
42-
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
4343
use datafusion::physical_plan::ExecutionPlan;
4444
use datafusion::prelude::*;
4545
use datafusion_common::{

datafusion-examples/examples/csv_opener.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use datafusion::{
2626
physical_plan::{CsvConfig, CsvOpener, FileScanConfig, FileStream},
2727
},
2828
error::Result,
29-
physical_plan::metrics::ExecutionPlanMetricsSet,
29+
execution::metrics::ExecutionPlanMetricsSet,
3030
test_util::aggr_test_schema,
3131
};
3232

datafusion-examples/examples/json_opener.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use datafusion::{
2727
physical_plan::{FileScanConfig, FileStream, JsonOpener},
2828
},
2929
error::Result,
30-
physical_plan::metrics::ExecutionPlanMetricsSet,
30+
execution::metrics::ExecutionPlanMetricsSet,
3131
};
3232

3333
use futures::StreamExt;

datafusion-examples/examples/parquet_exec_visitor.rs

+13-9
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ use datafusion::datasource::file_format::parquet::ParquetFormat;
2121
use datafusion::datasource::listing::{ListingOptions, PartitionedFile};
2222
use datafusion::datasource::physical_plan::ParquetExec;
2323
use datafusion::execution::context::SessionContext;
24-
use datafusion::physical_plan::metrics::MetricValue;
24+
use datafusion::execution::metrics::MetricValue;
25+
use datafusion::execution::TaskContext;
2526
use datafusion::physical_plan::{
2627
execute_stream, visit_execution_plan, ExecutionPlan, ExecutionPlanVisitor,
2728
};
@@ -52,20 +53,22 @@ async fn main() {
5253
let df = ctx.sql("SELECT * FROM my_table").await.unwrap();
5354
let plan = df.create_physical_plan().await.unwrap();
5455

55-
// Create empty visitor
56-
let mut visitor = ParquetExecVisitor {
57-
file_groups: None,
58-
bytes_scanned: None,
59-
};
60-
6156
// Make sure you execute the plan to collect actual execution statistics.
6257
// For example, in this example the `file_scan_config` is known without executing
6358
// but the `bytes_scanned` would be None if we did not execute.
64-
let mut batch_stream = execute_stream(plan.clone(), ctx.task_ctx()).unwrap();
59+
let task_ctx = ctx.task_ctx();
60+
let mut batch_stream = execute_stream(plan.clone(), Arc::clone(&task_ctx)).unwrap();
6561
while let Some(batch) = batch_stream.next().await {
6662
println!("Batch rows: {}", batch.unwrap().num_rows());
6763
}
6864

65+
// Create empty visitor
66+
let mut visitor = ParquetExecVisitor {
67+
file_groups: None,
68+
bytes_scanned: None,
69+
ctx: task_ctx,
70+
};
71+
6972
visit_execution_plan(plan.as_ref(), &mut visitor).unwrap();
7073

7174
println!(
@@ -85,6 +88,7 @@ async fn main() {
8588
struct ParquetExecVisitor {
8689
file_groups: Option<Vec<Vec<PartitionedFile>>>,
8790
bytes_scanned: Option<MetricValue>,
91+
ctx: Arc<TaskContext>,
8892
}
8993

9094
impl ExecutionPlanVisitor for ParquetExecVisitor {
@@ -99,7 +103,7 @@ impl ExecutionPlanVisitor for ParquetExecVisitor {
99103
if let Some(parquet_exec) = maybe_parquet_exec {
100104
self.file_groups = Some(parquet_exec.base_config().file_groups.clone());
101105

102-
let metrics = match parquet_exec.metrics() {
106+
let metrics = match self.ctx.plan_metrics(plan.as_any()) {
103107
None => return Ok(true),
104108
Some(metrics) => metrics,
105109
};

datafusion/common/src/column.rs

+8-6
Original file line numberDiff line numberDiff line change
@@ -109,21 +109,23 @@ impl Column {
109109
/// where `"foo.BAR"` would be parsed to a reference to column named `foo.BAR`
110110
pub fn from_qualified_name(flat_name: impl Into<String>) -> Self {
111111
let flat_name = flat_name.into();
112-
Self::from_idents(&mut parse_identifiers_normalized(&flat_name, false))
113-
.unwrap_or_else(|| Self {
112+
Self::from_idents(&mut parse_identifiers_normalized(&flat_name, false)).unwrap_or(
113+
Self {
114114
relation: None,
115115
name: flat_name,
116-
})
116+
},
117+
)
117118
}
118119

119120
/// Deserialize a fully qualified name string into a column preserving column text case
120121
pub fn from_qualified_name_ignore_case(flat_name: impl Into<String>) -> Self {
121122
let flat_name = flat_name.into();
122-
Self::from_idents(&mut parse_identifiers_normalized(&flat_name, true))
123-
.unwrap_or_else(|| Self {
123+
Self::from_idents(&mut parse_identifiers_normalized(&flat_name, true)).unwrap_or(
124+
Self {
124125
relation: None,
125126
name: flat_name,
126-
})
127+
},
128+
)
127129
}
128130

129131
/// return the column's name.

datafusion/common/src/hash_utils.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ pub trait HashValue {
6363
fn hash_one(&self, state: &RandomState) -> u64;
6464
}
6565

66-
impl<'a, T: HashValue + ?Sized> HashValue for &'a T {
66+
impl<T: HashValue + ?Sized> HashValue for &'_ T {
6767
fn hash_one(&self, state: &RandomState) -> u64 {
6868
T::hash_one(self, state)
6969
}

datafusion/common/src/utils/mod.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,7 @@ pub fn longest_consecutive_prefix<T: Borrow<usize>>(
354354
count
355355
}
356356

357-
/// Array Utils
357+
// Array Utils
358358

359359
/// Wrap an array into a single element `ListArray`.
360360
/// For example `[1, 2, 3]` would be converted into `[[1, 2, 3]]`
@@ -363,7 +363,7 @@ pub fn array_into_list_array_nullable(arr: ArrayRef) -> ListArray {
363363
array_into_list_array(arr, true)
364364
}
365365

366-
/// Array Utils
366+
// Array Utils
367367

368368
/// Wrap an array into a single element `ListArray`.
369369
/// For example `[1, 2, 3]` would be converted into `[[1, 2, 3]]`
@@ -563,7 +563,7 @@ pub mod datafusion_strsim {
563563

564564
struct StringWrapper<'a>(&'a str);
565565

566-
impl<'a, 'b> IntoIterator for &'a StringWrapper<'b> {
566+
impl<'b> IntoIterator for &StringWrapper<'b> {
567567
type Item = char;
568568
type IntoIter = Chars<'b>;
569569

datafusion/core/benches/sql_planner.rs

-26
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,6 @@ use criterion::Bencher;
2727
use datafusion::datasource::MemTable;
2828
use datafusion::execution::context::SessionContext;
2929
use datafusion_common::ScalarValue;
30-
use itertools::Itertools;
31-
use std::fs::File;
32-
use std::io::{BufRead, BufReader};
33-
use std::path::PathBuf;
3430
use std::sync::Arc;
3531
use test_utils::tpcds::tpcds_schemas;
3632
use test_utils::tpch::tpch_schemas;
@@ -95,28 +91,6 @@ fn register_defs(ctx: SessionContext, defs: Vec<TableDef>) -> SessionContext {
9591
ctx
9692
}
9793

98-
fn register_clickbench_hits_table() -> SessionContext {
99-
let ctx = SessionContext::new();
100-
let rt = Runtime::new().unwrap();
101-
102-
// use an external table for clickbench benchmarks
103-
let path =
104-
if PathBuf::from(format!("{BENCHMARKS_PATH_1}{CLICKBENCH_DATA_PATH}")).exists() {
105-
format!("{BENCHMARKS_PATH_1}{CLICKBENCH_DATA_PATH}")
106-
} else {
107-
format!("{BENCHMARKS_PATH_2}{CLICKBENCH_DATA_PATH}")
108-
};
109-
110-
let sql = format!("CREATE EXTERNAL TABLE hits STORED AS PARQUET LOCATION '{path}'");
111-
112-
rt.block_on(ctx.sql(&sql)).unwrap();
113-
114-
let count =
115-
rt.block_on(async { ctx.table("hits").await.unwrap().count().await.unwrap() });
116-
assert!(count > 0);
117-
ctx
118-
}
119-
12094
/// Target of this benchmark: control that placeholders replacing does not get slower,
12195
/// if the query does not contain placeholders at all.
12296
fn benchmark_with_param_values_many_columns(ctx: &SessionContext, b: &mut Bencher) {

datafusion/core/src/datasource/file_format/arrow.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,10 @@ use datafusion_common::parsers::CompressionTypeVariant;
4646
use datafusion_common::{
4747
not_impl_err, DataFusionError, GetExt, Statistics, DEFAULT_ARROW_EXTENSION,
4848
};
49+
use datafusion_execution::metrics::MetricsSet;
4950
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
5051
use datafusion_physical_expr::PhysicalExpr;
5152
use datafusion_physical_plan::insert::{DataSink, DataSinkExec};
52-
use datafusion_physical_plan::metrics::MetricsSet;
5353

5454
use async_trait::async_trait;
5555
use bytes::Bytes;

datafusion/core/src/datasource/file_format/csv.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,9 @@ use datafusion_common::file_options::csv_writer::CsvWriterOptions;
4545
use datafusion_common::{
4646
exec_err, not_impl_err, DataFusionError, GetExt, DEFAULT_CSV_EXTENSION,
4747
};
48+
use datafusion_execution::metrics::MetricsSet;
4849
use datafusion_execution::TaskContext;
4950
use datafusion_physical_expr::PhysicalExpr;
50-
use datafusion_physical_plan::metrics::MetricsSet;
5151

5252
use async_trait::async_trait;
5353
use bytes::{Buf, Bytes};

datafusion/core/src/datasource/file_format/json.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,9 @@ use arrow_array::RecordBatch;
4545
use datafusion_common::config::{ConfigField, ConfigFileType, JsonOptions};
4646
use datafusion_common::file_options::json_writer::JsonWriterOptions;
4747
use datafusion_common::{not_impl_err, GetExt, DEFAULT_JSON_EXTENSION};
48+
use datafusion_execution::metrics::MetricsSet;
4849
use datafusion_execution::TaskContext;
4950
use datafusion_physical_expr::PhysicalExpr;
50-
use datafusion_physical_plan::metrics::MetricsSet;
5151
use datafusion_physical_plan::ExecutionPlan;
5252

5353
use async_trait::async_trait;

datafusion/core/src/datasource/file_format/options.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ pub struct CsvReadOptions<'a> {
8989
pub file_sort_order: Vec<Vec<SortExpr>>,
9090
}
9191

92-
impl<'a> Default for CsvReadOptions<'a> {
92+
impl Default for CsvReadOptions<'_> {
9393
fn default() -> Self {
9494
Self::new()
9595
}
@@ -243,7 +243,7 @@ pub struct ParquetReadOptions<'a> {
243243
pub file_sort_order: Vec<Vec<SortExpr>>,
244244
}
245245

246-
impl<'a> Default for ParquetReadOptions<'a> {
246+
impl Default for ParquetReadOptions<'_> {
247247
fn default() -> Self {
248248
Self {
249249
file_extension: DEFAULT_PARQUET_EXTENSION,
@@ -312,7 +312,7 @@ pub struct ArrowReadOptions<'a> {
312312
pub table_partition_cols: Vec<(String, DataType)>,
313313
}
314314

315-
impl<'a> Default for ArrowReadOptions<'a> {
315+
impl Default for ArrowReadOptions<'_> {
316316
fn default() -> Self {
317317
Self {
318318
schema: None,
@@ -357,7 +357,7 @@ pub struct AvroReadOptions<'a> {
357357
pub table_partition_cols: Vec<(String, DataType)>,
358358
}
359359

360-
impl<'a> Default for AvroReadOptions<'a> {
360+
impl Default for AvroReadOptions<'_> {
361361
fn default() -> Self {
362362
Self {
363363
schema: None,
@@ -409,7 +409,7 @@ pub struct NdJsonReadOptions<'a> {
409409
pub file_sort_order: Vec<Vec<SortExpr>>,
410410
}
411411

412-
impl<'a> Default for NdJsonReadOptions<'a> {
412+
impl Default for NdJsonReadOptions<'_> {
413413
fn default() -> Self {
414414
Self {
415415
schema: None,

datafusion/core/src/datasource/file_format/parquet.rs

+11-8
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,10 @@ use datafusion_common::{
5252
};
5353
use datafusion_common_runtime::SpawnedTask;
5454
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation};
55+
use datafusion_execution::metrics::MetricsSet;
5556
use datafusion_execution::TaskContext;
5657
use datafusion_functions_aggregate::min_max::{MaxAccumulator, MinAccumulator};
5758
use datafusion_physical_expr::PhysicalExpr;
58-
use datafusion_physical_plan::metrics::MetricsSet;
5959

6060
use async_trait::async_trait;
6161
use bytes::{BufMut, BytesMut};
@@ -1269,7 +1269,6 @@ mod tests {
12691269
use super::*;
12701270

12711271
use crate::datasource::file_format::parquet::test_util::store_parquet;
1272-
use crate::physical_plan::metrics::MetricValue;
12731272
use crate::prelude::{SessionConfig, SessionContext};
12741273
use arrow::array::{Array, ArrayRef, StringArray};
12751274
use arrow_array::types::Int32Type;
@@ -1283,6 +1282,7 @@ mod tests {
12831282
use datafusion_common::config::ParquetOptions;
12841283
use datafusion_common::ScalarValue;
12851284
use datafusion_common::ScalarValue::Utf8;
1285+
use datafusion_execution::metrics::MetricValue;
12861286
use datafusion_execution::object_store::ObjectStoreUrl;
12871287
use datafusion_execution::runtime_env::RuntimeEnv;
12881288
use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
@@ -1773,10 +1773,10 @@ mod tests {
17731773
let task_ctx = ctx.task_ctx();
17741774

17751775
let _ = collect(exec.clone(), task_ctx.clone()).await?;
1776-
let _ = collect(exec_projected.clone(), task_ctx).await?;
1776+
let _ = collect(exec_projected.clone(), Arc::clone(&task_ctx)).await?;
17771777

1778-
assert_bytes_scanned(exec, 671);
1779-
assert_bytes_scanned(exec_projected, 73);
1778+
assert_bytes_scanned(exec, 671, &task_ctx);
1779+
assert_bytes_scanned(exec_projected, 73, &task_ctx);
17801780

17811781
Ok(())
17821782
}
@@ -2182,9 +2182,12 @@ mod tests {
21822182
}
21832183
}
21842184

2185-
fn assert_bytes_scanned(exec: Arc<dyn ExecutionPlan>, expected: usize) {
2186-
let actual = exec
2187-
.metrics()
2185+
fn assert_bytes_scanned(
2186+
exec: Arc<dyn ExecutionPlan>,
2187+
expected: usize,
2188+
ctx: &Arc<TaskContext>,
2189+
) {
2190+
let actual = ctx.plan_metrics(exec.as_any())
21882191
.expect("Metrics not recorded")
21892192
.sum(|metric| matches!(metric.value(), MetricValue::Count { name, .. } if name == "bytes_scanned"))
21902193
.map(|t| t.as_usize())

datafusion/core/src/datasource/listing/helpers.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ pub fn split_files(
137137
partitioned_files.sort_by(|a, b| a.path().cmp(b.path()));
138138

139139
// effectively this is div with rounding up instead of truncating
140-
let chunk_size = (partitioned_files.len() + n - 1) / n;
140+
let chunk_size = partitioned_files.len().div_ceil(n);
141141
let mut chunks = Vec::with_capacity(n);
142142
let mut current_chunk = Vec::with_capacity(chunk_size);
143143
for file in partitioned_files.drain(..) {

datafusion/core/src/datasource/listing/table.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -1032,8 +1032,8 @@ impl ListingTable {
10321032
/// This method first checks if the statistics for the given file are already cached.
10331033
/// If they are, it returns the cached statistics.
10341034
/// If they are not, it infers the statistics from the file and stores them in the cache.
1035-
async fn do_collect_statistics<'a>(
1036-
&'a self,
1035+
async fn do_collect_statistics(
1036+
&self,
10371037
ctx: &SessionState,
10381038
store: &Arc<dyn ObjectStore>,
10391039
part_file: &PartitionedFile,

datafusion/core/src/datasource/listing/url.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ impl ListingTableUrl {
170170
if ignore_subdirectory {
171171
segments
172172
.next()
173-
.map_or(false, |file_name| glob.matches(file_name))
173+
.is_some_and(|file_name| glob.matches(file_name))
174174
} else {
175175
let stripped = segments.join(DELIMITER);
176176
glob.matches(&stripped)

datafusion/core/src/datasource/memory.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ use crate::physical_planner::create_physical_sort_exprs;
3838
use arrow::datatypes::SchemaRef;
3939
use arrow::record_batch::RecordBatch;
4040
use datafusion_common::{not_impl_err, plan_err, Constraints, DFSchema, SchemaExt};
41+
use datafusion_execution::metrics::MetricsSet;
4142
use datafusion_execution::TaskContext;
42-
use datafusion_physical_plan::metrics::MetricsSet;
4343

4444
use async_trait::async_trait;
4545
use datafusion_catalog::Session;

0 commit comments

Comments
 (0)