Skip to content

Commit 2578059

Browse files
authored
feat(query): add clustering depth percentiles and optimize overlap an… (#20023)
* feat(query): add clustering depth percentiles and optimize overlap analysis * fix * fix
1 parent c30bbf4 commit 2578059

15 files changed

Lines changed: 298 additions & 98 deletions

File tree

src/query/expression/src/kernels/sort.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,15 +110,19 @@ impl DataBlock {
110110
}
111111
}
112112

113-
pub fn compare_scalars(rows: Vec<Vec<Scalar>>, data_types: &[DataType]) -> Result<Vec<u32>> {
113+
/// Sort scalar rows without taking ownership of the row values.
114+
pub fn compare_scalars<T: AsRef<[Scalar]>>(
115+
rows: &[T],
116+
data_types: &[DataType],
117+
) -> Result<Vec<u32>> {
114118
let length = rows.len();
115119
let mut columns = data_types
116120
.iter()
117121
.map(|ty| ColumnBuilder::with_capacity(ty, length))
118122
.collect::<Vec<_>>();
119123

120-
for row in rows.into_iter() {
121-
for (field, column) in row.into_iter().zip(columns.iter_mut()) {
124+
for row in rows {
125+
for (field, column) in row.as_ref().iter().zip(columns.iter_mut()) {
122126
column.push(field.as_ref());
123127
}
124128
}

src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@ use databend_common_expression::ColumnRef;
2323
use databend_common_expression::DataBlock;
2424
use databend_common_expression::Expr;
2525
use databend_common_expression::Scalar;
26-
use databend_common_expression::TableSchema;
27-
use databend_common_expression::TableSchemaRef;
2826
use databend_common_expression::types::DataType;
2927
use databend_common_expression::types::NumberDataType;
3028
use databend_common_storages_fuse::FuseBlockPartInfo;
@@ -285,7 +283,7 @@ async fn target_select_segment_locations_with_mode(
285283
max_segments: usize,
286284
mode: ReclusterMode,
287285
) -> anyhow::Result<(usize, u64, ReclusterParts)> {
288-
let schema = TableSchemaRef::new(TableSchema::empty());
286+
let schema = TestFixture::default_table_schema();
289287
let segment_locations = create_segment_location_vector(segment_locations, None);
290288
let compact_segments = FuseTable::segment_pruning(
291289
&ctx,
@@ -457,7 +455,7 @@ async fn test_recluster_mutator_selects_multiple_segment_windows() -> anyhow::Re
457455
)
458456
.await?;
459457

460-
let schema = TableSchemaRef::new(TableSchema::empty());
458+
let schema = TestFixture::default_table_schema();
461459
let ctx: Arc<dyn TableContext> = ctx.clone();
462460
let segment_locations = create_segment_location_vector(segment_locations, None);
463461
let compact_segments = FuseTable::segment_pruning(

src/query/storages/fuse/src/operations/common/processors/transform_mutation_aggregator.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ use crate::operations::mutation::BlockIndex;
7070
use crate::operations::mutation::SegmentIndex;
7171
use crate::statistics::VirtualColumnAccumulator;
7272
use crate::statistics::get_min_max_stats;
73+
use crate::statistics::prepare_cluster_key_exprs;
7374
use crate::statistics::reducers::merge_statistics_mut;
7475
use crate::statistics::reducers::reduce_block_metas;
7576
use crate::statistics::sort_by_cluster_stats;
@@ -925,12 +926,12 @@ fn fill_missing_segment_cluster_stats(
925926
return;
926927
}
927928

929+
let prepared_cluster_key_exprs = prepare_cluster_key_exprs(cluster_key_exprs, schema);
928930
let (min, max) = get_min_max_stats(
929-
cluster_key_exprs,
931+
&prepared_cluster_key_exprs,
930932
&summary.col_stats,
931933
None,
932934
Some(cluster_key_id),
933-
schema,
934935
);
935936
summary.cluster_stats = Some(ClusterStatistics::new(cluster_key_id, min, max, 0, None));
936937
}

src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,9 @@ use crate::SegmentLocation;
6464
use crate::io::MetaReaders;
6565
use crate::operations::ReclusterMode;
6666
use crate::operations::common::BlockMetaIndex as BlockIndex;
67+
use crate::statistics::PreparedClusterKeyExpr;
6768
use crate::statistics::get_min_max_stats;
69+
use crate::statistics::prepare_cluster_key_exprs;
6870
use crate::statistics::reducers::merge_statistics_mut;
6971

7072
/// Maximum recluster depth allowed when only two blocks remain.
@@ -162,7 +164,7 @@ pub struct ReclusterMutator {
162164
pub(crate) schema: TableSchemaRef,
163165
pub(crate) max_tasks: usize,
164166
pub(crate) memory_threshold: usize,
165-
pub(crate) cluster_key_exprs: Vec<Expr<usize>>,
167+
pub(crate) prepared_cluster_key_exprs: Vec<PreparedClusterKeyExpr>,
166168
pub(crate) cluster_key_types: Vec<DataType>,
167169
}
168170

@@ -199,6 +201,8 @@ impl ReclusterMutator {
199201
.iter()
200202
.map(|v| v.data_type().clone())
201203
.collect::<Vec<_>>();
204+
let prepared_cluster_key_exprs =
205+
prepare_cluster_key_exprs(&cluster_key_exprs, schema.as_ref());
202206

203207
Ok(Self {
204208
ctx,
@@ -209,7 +213,7 @@ impl ReclusterMutator {
209213
cluster_key_id,
210214
max_tasks,
211215
memory_threshold,
212-
cluster_key_exprs,
216+
prepared_cluster_key_exprs,
213217
cluster_key_types,
214218
})
215219
}
@@ -234,6 +238,8 @@ impl ReclusterMutator {
234238
.iter()
235239
.map(|expr| expr.data_type().clone())
236240
.collect();
241+
let prepared_cluster_key_exprs =
242+
prepare_cluster_key_exprs(&cluster_key_exprs, schema.as_ref());
237243
let memory_threshold = ctx
238244
.get_settings()
239245
.get_recluster_block_size()
@@ -248,7 +254,7 @@ impl ReclusterMutator {
248254
cluster_key_id,
249255
max_tasks,
250256
memory_threshold,
251-
cluster_key_exprs,
257+
prepared_cluster_key_exprs,
252258
cluster_key_types,
253259
}
254260
}
@@ -870,7 +876,7 @@ impl ReclusterMutator {
870876
// flush, only bounded anchors remain eligible for the next window.
871877
let mut seen_in_hot_range = IndexSet::new();
872878
let (keys, values): (Vec<_>, Vec<_>) = segment_points.into_iter().unzip();
873-
let sorted_indices = compare_scalars(keys, &self.cluster_key_types)?;
879+
let sorted_indices = compare_scalars(&keys, &self.cluster_key_types)?;
874880

875881
for idx in sorted_indices {
876882
let start = &values[idx as usize].0;
@@ -1046,11 +1052,10 @@ impl ReclusterMutator {
10461052
}
10471053

10481054
let (min_stats, max_stats) = get_min_max_stats(
1049-
&self.cluster_key_exprs,
1055+
&self.prepared_cluster_key_exprs,
10501056
col_stats,
10511057
cluster_stats,
10521058
Some(self.cluster_key_id),
1053-
self.schema.as_ref(),
10541059
);
10551060

10561061
ClusterStatistics::new(self.cluster_key_id, min_stats, max_stats, 0, None)
@@ -1180,7 +1185,7 @@ impl ReclusterMutator {
11801185
let mut point_overlaps: Vec<Vec<usize>> = Vec::with_capacity(points_map.len());
11811186
let mut unfinished_intervals = BTreeMap::new();
11821187
let (keys, values): (Vec<_>, Vec<_>) = points_map.into_iter().unzip();
1183-
let indices = compare_scalars(keys, &self.cluster_key_types)?;
1188+
let indices = compare_scalars(&keys, &self.cluster_key_types)?;
11841189
for (i, idx) in indices.into_iter().enumerate() {
11851190
let start = &values[idx as usize].0;
11861191
let end = &values[idx as usize].1;

0 commit comments

Comments
 (0)