Skip to content

Commit 57147ad

Browse files
author
wiedld
committed
fix: maintain ordering within partition, when a single partition
1 parent daac003 commit 57147ad

1 file changed

Lines changed: 69 additions & 5 deletions

File tree

datafusion/datasource/src/memory.rs

Lines changed: 69 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -774,8 +774,6 @@ impl MemorySourceConfig {
774774
) -> Result<Option<Vec<Vec<RecordBatch>>>> {
775775
if !self.eq_properties().ordering_satisfy(&output_ordering) {
776776
Ok(None)
777-
} else if self.partitions.len() == 1 {
778-
self.repartition_evenly_by_size(target_partitions)
779777
} else {
780778
let total_num_batches =
781779
self.partitions.iter().map(|b| b.len()).sum::<usize>();
@@ -1298,6 +1296,8 @@ mod tests {
12981296
.try_with_sort_information(sort_information)
12991297
}
13001298

1299+
/// Batches of different sizes, with batches ordered by size (100_000, 10_000, 100, 1)
1300+
/// in the Memtable partition (a.k.a. vector of batches).
13011301
fn memorysrcconfig_1_partition_with_different_sized_batches(
13021302
sort_information: Vec<LexOrdering>,
13031303
) -> Result<MemorySourceConfig> {
@@ -1306,6 +1306,17 @@ mod tests {
13061306
.try_with_sort_information(sort_information)
13071307
}
13081308

1309+
/// Same as [`memorysrcconfig_1_partition_with_different_sized_batches`],
1310+
/// but the batches are ordered differently (not by size)
1311+
/// in the Memtable partition (a.k.a. vector of batches).
1312+
fn memorysrcconfig_1_partition_with_ordering_not_matching_size(
1313+
sort_information: Vec<LexOrdering>,
1314+
) -> Result<MemorySourceConfig> {
1315+
let partitions = vec![vec![batch(100_000), batch(1), batch(100), batch(10_000)]];
1316+
MemorySourceConfig::try_new(&partitions, schema(), None)?
1317+
.try_with_sort_information(sort_information)
1318+
}
1319+
13091320
fn memorysrcconfig_2_partition_with_different_sized_batches(
13101321
sort_information: Vec<LexOrdering>,
13111322
) -> Result<MemorySourceConfig> {
@@ -1474,9 +1485,14 @@ mod tests {
14741485
assert_partitioning(partitioned_datasrc.clone(), Some(2));
14751486
// Starting = batch(100_000), batch(10_000), batch(100), batch(1).
14761487
// It should have split as p1=batch(100_000), p2=[batch(10_000), batch(100), batch(1)]
1477-
let repartitioned_raw_batches = mem_src_config
1478-
.repartition_evenly_by_size(target_partitions)?
1479-
.unwrap();
1488+
let partitioned_datasrc = partitioned_datasrc.unwrap();
1489+
let Some(mem_src_config) = partitioned_datasrc
1490+
.as_any()
1491+
.downcast_ref::<MemorySourceConfig>()
1492+
else {
1493+
unreachable!()
1494+
};
1495+
let repartitioned_raw_batches = mem_src_config.partitions.clone();
14801496
assert_eq!(repartitioned_raw_batches.len(), 2);
14811497
let [ref p1, ref p2] = repartitioned_raw_batches[..] else {
14821498
unreachable!()
@@ -1645,6 +1661,54 @@ mod tests {
16451661
assert_eq!(p3.len(), 2);
16461662
assert_eq!(p3[0].num_rows(), 2_000);
16471663
assert_eq!(p3[1].num_rows(), 20);
1664+
1665+
Ok(())
1666+
}
1667+
1668+
#[test]
1669+
fn test_repartition_with_batch_ordering_not_matching_sizing() -> Result<()> {
1670+
let schema = schema();
1671+
let sort_key = LexOrdering::new(vec![PhysicalSortExpr {
1672+
expr: col("c", &schema).unwrap(),
1673+
options: SortOptions::default(),
1674+
}]);
1675+
let has_sort = vec![sort_key.clone()];
1676+
let output_ordering = Some(sort_key);
1677+
1678+
// src has 1 partition with many batches of lopsided sizes
1679+
// note that the input vector of batches are not ordered by decreasing size
1680+
let target_partitions = 2;
1681+
let mem_src_config =
1682+
memorysrcconfig_1_partition_with_ordering_not_matching_size(has_sort)?;
1683+
let partitioned_datasrc = mem_src_config.clone().repartitioned(
1684+
target_partitions,
1685+
usize::MAX,
1686+
output_ordering,
1687+
)?;
1688+
assert_partitioning(partitioned_datasrc.clone(), Some(2));
1689+
// Starting = batch(100_000), batch(1), batch(100), batch(10_000).
1690+
// It should have split as p1=batch(100_000), p2=[batch(1), batch(100), batch(10_000)]
1691+
let partitioned_datasrc = partitioned_datasrc.unwrap();
1692+
let Some(mem_src_config) = partitioned_datasrc
1693+
.as_any()
1694+
.downcast_ref::<MemorySourceConfig>()
1695+
else {
1696+
unreachable!()
1697+
};
1698+
let repartitioned_raw_batches = mem_src_config.partitions.clone();
1699+
assert_eq!(repartitioned_raw_batches.len(), 2);
1700+
let [ref p1, ref p2] = repartitioned_raw_batches[..] else {
1701+
unreachable!()
1702+
};
1703+
// p1=batch(100_000)
1704+
assert_eq!(p1.len(), 1);
1705+
assert_eq!(p1[0].num_rows(), 100_000);
1706+
// p2=[batch(1), batch(100), batch(10_000)] -- **this is preserving the partition order**
1707+
assert_eq!(p2.len(), 3);
1708+
assert_eq!(p2[0].num_rows(), 1);
1709+
assert_eq!(p2[1].num_rows(), 100);
1710+
assert_eq!(p2[2].num_rows(), 10_000);
1711+
16481712
Ok(())
16491713
}
16501714
}

0 commit comments

Comments
 (0)