Skip to content

Commit db55143

Browse files
author
wiedld
committed
chore: change variable naming, and update documentation to make clear how the datasource repartitioning is configured and performed
1 parent 20681fd commit db55143

6 files changed

Lines changed: 71 additions & 23 deletions

File tree

datafusion/common/src/config.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -632,14 +632,21 @@ config_namespace! {
632632
/// long runner execution, all types of joins may encounter out-of-memory errors.
633633
pub allow_symmetric_joins_without_pruning: bool, default = true
634634

635-
/// When set to `true`, file groups will be repartitioned to achieve maximum parallelism.
636-
/// Currently Parquet and CSV formats are supported.
635+
/// When set to `true`, datasource partitions will be repartitioned to achieve maximum parallelism.
636+
/// This applies to both in-memory partitions and FileSource's file groups (1 group is 1 partition).
637637
///
638-
/// If set to `true`, all files will be repartitioned evenly (i.e., a single large file
638+
/// For FileSources, only Parquet and CSV formats are currently supported.
639+
///
640+
/// If set to `true` for a FileSource, all files will be repartitioned evenly (i.e., a single large file
639641
/// might be partitioned into smaller chunks) for parallel scanning.
640-
/// If set to `false`, different files will be read in parallel, but repartitioning won't
642+
/// If set to `false` for a FileSource, different files will be read in parallel, but repartitioning won't
641643
/// happen within a single file.
642-
pub repartition_file_scans: bool, default = true
644+
///
645+
/// If set to `true` for an in-memory source, all memtable's partitions will have their batches
646+
/// repartitioned evenly to the desired number of `target_partitions`. Repartitioning can change
647+
/// the total number of partitions and batches per partition, but does not slice the initial
648+
/// record tables provided to the MemTable on creation.
649+
pub repartition_datasource_scans: bool, default = true
643650

644651
/// Should DataFusion repartition data using the partitions keys to execute window
645652
/// functions in parallel using the provided `target_partitions` level

datafusion/core/tests/physical_optimizer/enforce_distribution.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ fn ensure_distribution_helper(
364364
let mut config = ConfigOptions::new();
365365
config.execution.target_partitions = target_partitions;
366366
config.optimizer.enable_round_robin_repartition = true;
367-
config.optimizer.repartition_file_scans = false;
367+
config.optimizer.repartition_datasource_scans = false;
368368
config.optimizer.repartition_file_min_size = 1024;
369369
config.optimizer.prefer_existing_sort = prefer_existing_sort;
370370
ensure_distribution(distribution_context, &config).map(|item| item.data.plan)
@@ -396,7 +396,7 @@ fn test_suite_default_config_options() -> ConfigOptions {
396396
config.optimizer.prefer_existing_union = false;
397397

398398
// By default, will not repartition file scans.
399-
config.optimizer.repartition_file_scans = false;
399+
config.optimizer.repartition_datasource_scans = false;
400400
config.optimizer.repartition_file_min_size = 1024;
401401

402402
// By default, set query execution concurrency to 10.
@@ -449,7 +449,7 @@ impl TestConfig {
449449
/// If preferred, will repartition file scans.
450450
/// Accepts a minimum file size to repartition.
451451
fn with_prefer_repartition_file_scans(mut self, file_min_size: usize) -> Self {
452-
self.config.optimizer.repartition_file_scans = true;
452+
self.config.optimizer.repartition_datasource_scans = true;
453453
self.config.optimizer.repartition_file_min_size = file_min_size;
454454
self
455455
}
@@ -3483,7 +3483,7 @@ async fn test_distribute_sort_parquet() -> Result<()> {
34833483
let test_config: TestConfig =
34843484
TestConfig::default().with_prefer_repartition_file_scans(1000);
34853485
assert!(
3486-
test_config.config.optimizer.repartition_file_scans,
3486+
test_config.config.optimizer.repartition_datasource_scans,
34873487
"should enable scans to be repartitioned"
34883488
);
34893489

@@ -3524,7 +3524,7 @@ async fn test_distribute_sort_memtable() -> Result<()> {
35243524
let test_config: TestConfig =
35253525
TestConfig::default().with_prefer_repartition_file_scans(1000);
35263526
assert!(
3527-
test_config.config.optimizer.repartition_file_scans,
3527+
test_config.config.optimizer.repartition_datasource_scans,
35283528
"should enable scans to be repartitioned"
35293529
);
35303530

datafusion/datasource/src/memory.rs

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -356,7 +356,9 @@ impl MemoryExec {
356356
/// Data source configuration for reading in-memory batches of data
357357
#[derive(Clone, Debug)]
358358
pub struct MemorySourceConfig {
359-
/// The partitions to query
359+
/// The partitions to query.
360+
///
361+
/// Each partition is a `Vec<RecordBatch>`.
360362
partitions: Vec<Vec<RecordBatch>>,
361363
/// Schema representing the data before projection
362364
schema: SchemaRef,
@@ -448,14 +450,18 @@ impl DataSource for MemorySourceConfig {
448450
}
449451
}
450452

453+
/// If possible, redistribute batches across partitions according to their size.
454+
///
455+
/// Returns `Ok(None)` if unable to repartition. Preserve output ordering if exists.
456+
/// Refer to [`DataSource::repartitioned`] for further details.
451457
fn repartitioned(
452458
&self,
453459
target_partitions: usize,
454460
_repartition_file_min_size: usize,
455461
output_ordering: Option<LexOrdering>,
456462
) -> Result<Option<Arc<dyn DataSource>>> {
457463
if self.partitions.is_empty() || self.partitions.len() >= target_partitions
458-
// if already have more partitions than desired, do not merge
464+
// if have no partitions, or already have more partitions than desired, do not repartition
459465
{
460466
return Ok(None);
461467
}
@@ -758,7 +764,9 @@ impl MemorySourceConfig {
758764

759765
/// Repartition while preserving order.
760766
///
761-
/// Returns None if cannot fulfill requested repartitioning.
767+
/// Returns `Ok(None)` if cannot fulfill the requested repartitioning, such
768+
/// as having too few batches to fulfill the `target_partitions` or if unable
769+
/// to preserve output ordering.
762770
fn repartition_preserving_order(
763771
&self,
764772
target_partitions: usize,
@@ -778,6 +786,8 @@ impl MemorySourceConfig {
778786

779787
let cnt_to_repartition = target_partitions - self.partitions.len();
780788

789+
// Label the current partitions and their order.
790+
// Such that when we later split up the partitions into smaller sizes, we are maintaining the order.
781791
let to_repartition = self
782792
.partitions
783793
.iter()
@@ -789,11 +799,15 @@ impl MemorySourceConfig {
789799
})
790800
.collect_vec();
791801

792-
// split the largest partitions
802+
// Put all of the partitions into a heap ordered by `RePartition::partial_cmp`, which sizes
803+
// by count of rows.
793804
let mut max_heap = BinaryHeap::with_capacity(target_partitions);
794805
for rep in to_repartition {
795806
max_heap.push(rep);
796807
}
808+
809+
// Split the largest partitions into smaller partitions. Maintaining the output
810+
// order of the partitions & newly created partitions.
797811
let mut cannot_split_further = Vec::with_capacity(target_partitions);
798812
for _ in 0..cnt_to_repartition {
799813
loop {
@@ -814,6 +828,8 @@ impl MemorySourceConfig {
814828
}
815829
let mut partitions = max_heap.drain().collect_vec();
816830
partitions.extend(cannot_split_further);
831+
832+
// Finally, sort all partitions by the output ordering.
817833
partitions.sort_by_key(|p| p.idx);
818834
let partitions = partitions.into_iter().map(|rep| rep.batches).collect_vec();
819835

@@ -824,7 +840,8 @@ impl MemorySourceConfig {
824840
/// Repartition into evenly sized chunks (as much as possible without batch splitting),
825841
/// disregarding any ordering.
826842
///
827-
/// Returns None if cannot fulfill requested repartitioning.
843+
/// Returns `Ok(None)` if cannot fulfill the requested repartitioning, such
844+
/// as having too few batches to fulfill the `target_partitions`.
828845
fn repartition_evenly_by_size(
829846
&self,
830847
target_partitions: usize,
@@ -835,7 +852,7 @@ impl MemorySourceConfig {
835852
return Ok(None);
836853
}
837854

838-
// repartition evenly
855+
// Take all flattened batches (all in 1 partititon/vec) and divide evenly into the desired number of `target_partitions`.
839856
let total_num_rows = flatten_batches.iter().map(|b| b.num_rows()).sum::<usize>();
840857
let target_partition_size = total_num_rows.div_ceil(target_partitions);
841858
let mut partitions =
@@ -871,8 +888,12 @@ impl MemorySourceConfig {
871888
///
872889
/// Do not implement clone, in order to avoid unnecessary copying during repartitioning.
873890
struct RePartition {
891+
/// Original output ordering for the partition.
874892
idx: usize,
893+
/// Total size of the partition, for use in heap ordering
894+
/// (a.k.a. splitting up the largest partitions).
875895
row_count: usize,
896+
/// A partition containing record batches.
876897
batches: Vec<RecordBatch>,
877898
}
878899

@@ -886,12 +907,12 @@ impl RePartition {
886907
}
887908

888909
let new_0 = RePartition {
889-
idx: self.idx,
910+
idx: self.idx, // output ordering
890911
row_count: 0,
891912
batches: vec![],
892913
};
893914
let new_1 = RePartition {
894-
idx: self.idx + 1,
915+
idx: self.idx + 1, // output ordering +1
895916
row_count: 0,
896917
batches: vec![],
897918
};

datafusion/datasource/src/source.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,15 @@ pub trait DataSource: Send + Sync + Debug {
6060
/// Format this source for display in explain plans
6161
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result;
6262

63-
/// Return a copy of this DataSource with a new partitioning scheme
63+
/// Return a copy of this DataSource with a new partitioning scheme.
64+
///
65+
/// Returns `Ok(None)` (the default) if the partitioning cannot be changed.
66+
/// Refer to [`ExecutionPlan::repartitioned`] for details on when None should be returned.
67+
///
68+
/// Repartitioning should not change the output ordering, if this ordering exists.
69+
/// Refer to [`MemorySourceConfig::repartitioned`](crate::memory::MemorySourceConfig) and the FileSource's
70+
/// [`FileGroupPartitioner::repartition_file_groups`](crate::file_groups::FileGroupPartitioner::repartition_file_groups)
71+
/// for examples.
6472
fn repartitioned(
6573
&self,
6674
_target_partitions: usize,
@@ -148,6 +156,10 @@ impl ExecutionPlan for DataSourceExec {
148156
Ok(self)
149157
}
150158

159+
/// Implementation of [`ExecutionPlan::repartitioned`] which relies upon the inner [`DataSource::repartitioned`].
160+
///
161+
/// If the data source does not support changing its partitioning, returns `Ok(None)` (the default). Refer
162+
/// to [`ExecutionPlan::repartitioned`] for more details.
151163
fn repartitioned(
152164
&self,
153165
target_partitions: usize,

datafusion/execution/src/config.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,7 @@ impl SessionConfig {
307307

308308
/// Enables or disables the use of repartitioning for file scans
309309
pub fn with_repartition_file_scans(mut self, enabled: bool) -> Self {
310-
self.options.optimizer.repartition_file_scans = enabled;
310+
self.options.optimizer.repartition_datasource_scans = enabled;
311311
self
312312
}
313313

datafusion/physical-optimizer/src/enforce_distribution.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1155,6 +1155,10 @@ fn get_repartition_requirement_status(
11551155
/// operators to satisfy distribution requirements. Since this function
11561156
/// takes care of such requirements, we should avoid manually adding data
11571157
/// exchange operators in other places.
1158+
///
1159+
/// This function is intended to be used in a bottom up traversal, as it
1160+
/// can first repartition (or newly partition) at the datasources -- these
1161+
/// source partitions may be later repartitioned with additional data exchange operators.
11581162
pub fn ensure_distribution(
11591163
dist_context: DistributionContext,
11601164
config: &ConfigOptions,
@@ -1168,7 +1172,7 @@ pub fn ensure_distribution(
11681172
let target_partitions = config.execution.target_partitions;
11691173
// When `false`, round robin repartition will not be added to increase parallelism
11701174
let enable_round_robin = config.optimizer.enable_round_robin_repartition;
1171-
let repartition_file_scans = config.optimizer.repartition_file_scans;
1175+
let repartition_datasource_scans = config.optimizer.repartition_datasource_scans;
11721176
let batch_size = config.execution.batch_size;
11731177
let should_use_estimates = config
11741178
.execution
@@ -1242,9 +1246,13 @@ pub fn ensure_distribution(
12421246
// Unless partitioning increases the partition count, it is not beneficial:
12431247
&& child.plan.output_partitioning().partition_count() < target_partitions;
12441248

1245-
// When `repartition_file_scans` is set, attempt to increase
1249+
// When `repartition_datasource_scans` is set, attempt to increase
12461250
// parallelism at the source.
1247-
if repartition_file_scans && roundrobin_beneficial_stats {
1251+
//
1252+
// If repartitioning is not possible (a.k.a. None is returned from `ExecutionPlan::repartitioned`)
1253+
// then no repartitioning will have occurred. As the default implementation returns None, it is only
1254+
// specific physical plan nodes, such as certain datasources, which are repartitioned.
1255+
if repartition_datasource_scans && roundrobin_beneficial_stats {
12481256
if let Some(new_child) =
12491257
child.plan.repartitioned(target_partitions, config)?
12501258
{

0 commit comments

Comments
 (0)