Skip to content

feat(storage): hilbert recluster support stream block writer #17904

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 16 commits into
base: main
Choose a base branch
from
20 changes: 18 additions & 2 deletions src/query/ee/src/hilbert_clustering/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ impl HilbertClusteringHandler for RealHilbertClusteringHandler {
let mut checker = ReclusterChecker::new(
cluster_key_id,
hilbert_min_bytes,
block_per_seg,
push_downs.as_ref().is_none_or(|v| v.filters.is_none()),
);
'FOR: for chunk in segment_locations.chunks(chunk_size) {
Expand Down Expand Up @@ -139,19 +140,29 @@ struct ReclusterChecker {
hilbert_min_bytes: usize,
total_bytes: usize,

hilbert_min_blocks: usize,
total_blocks: usize,

finished: bool,
// Whether the target segments is at the head of snapshot.
head_of_snapshot: bool,
}

impl ReclusterChecker {
fn new(default_cluster_id: u32, hilbert_min_bytes: usize, head_of_snapshot: bool) -> Self {
fn new(
default_cluster_id: u32,
hilbert_min_bytes: usize,
hilbert_min_blocks: usize,
head_of_snapshot: bool,
) -> Self {
Self {
segments: vec![],
last_segment: None,
default_cluster_id,
hilbert_min_blocks,
hilbert_min_bytes,
total_bytes: 0,
total_blocks: 0,
finished: false,
head_of_snapshot,
}
Expand All @@ -164,10 +175,14 @@ impl ReclusterChecker {

if segment_should_recluster || !self.head_of_snapshot {
self.total_bytes += segment.summary.uncompressed_byte_size as usize;
self.total_blocks += segment.summary.block_count as usize;
self.segments.push((location.clone(), segment.clone()));
}

if !segment_should_recluster || self.total_bytes >= self.hilbert_min_bytes {
if !segment_should_recluster
|| (self.total_bytes >= self.hilbert_min_bytes
&& self.total_blocks >= self.hilbert_min_blocks)
{
if self.check_for_recluster() {
self.finished = true;
return true;
Expand Down Expand Up @@ -208,6 +223,7 @@ impl ReclusterChecker {

fn reset(&mut self) {
self.total_bytes = 0;
self.total_blocks = 0;
self.head_of_snapshot = false;
self.segments.clear();
}
Expand Down
51 changes: 50 additions & 1 deletion src/query/expression/src/utils/block_thresholds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ impl BlockThresholds {

let bytes_per_block = total_bytes.div_ceil(block_num_by_compressed);
// Adjust the number of blocks based on block size thresholds.
let max_bytes_per_block = (4 * self.min_bytes_per_block).min(400 * 1024 * 1024);
let max_bytes_per_block = self.max_bytes_per_block.min(400 * 1024 * 1024);
let min_bytes_per_block = (self.min_bytes_per_block / 2).min(50 * 1024 * 1024);
let block_nums = if bytes_per_block > max_bytes_per_block {
// Case 1: If the block size is too bigger.
Expand All @@ -166,4 +166,53 @@ impl BlockThresholds {
};
total_rows.div_ceil(block_nums.max(1)).max(1)
}

/// Calculates the optimal number of partitions (blocks) based on total data size and row count.
///
/// # Parameters
/// - `total_rows`: The total number of rows in the data.
/// - `total_bytes`: The total uncompressed size of the data in bytes.
/// - `total_compressed`: The total compressed size of the data in bytes.
///
/// # Returns
/// - The calculated number of partitions (blocks) needed.
#[inline]
pub fn calc_partitions_for_recluster(
&self,
total_rows: usize,
total_bytes: usize,
total_compressed: usize,
) -> usize {
// If the data is already compact enough, return a single partition.
if self.check_for_compact(total_rows, total_bytes)
&& total_compressed < 2 * self.min_compressed_per_block
{
return 1;
}

// Estimate the number of blocks based on row count and compressed size.
let by_rows = std::cmp::max(total_rows / self.max_rows_per_block, 1);
let by_compressed = total_compressed / self.max_compressed_per_block;
// If row-based block count is greater, use max rows per block as limit.
if by_rows >= by_compressed {
return by_rows;
}

// Adjust block count based on byte size thresholds.
let bytes_per_block = total_bytes.div_ceil(by_compressed);
let max_bytes = self.max_bytes_per_block.min(400 * 1024 * 1024);
let min_bytes = (self.min_bytes_per_block / 2).min(50 * 1024 * 1024);
let total_partitions = if bytes_per_block > max_bytes {
// Block size is too large.
total_bytes / max_bytes
} else if bytes_per_block < min_bytes {
// Block size is too small.
total_bytes / min_bytes
} else {
// Block size is acceptable.
by_compressed
};

std::cmp::max(total_partitions, 1)
}
}
32 changes: 29 additions & 3 deletions src/query/expression/tests/it/block_thresholds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use databend_common_expression::BlockThresholds;

fn default_thresholds() -> BlockThresholds {
BlockThresholds::new(1000, 1_000_000, 100_000, 4)
BlockThresholds::new(1_000, 1_000_000, 100_000, 4)
}

#[test]
Expand Down Expand Up @@ -101,8 +101,8 @@ fn test_calc_rows_for_recluster() {
);

// Case 1: If the block size is too bigger.
let result = t.calc_rows_for_recluster(4_000, 30_000_000, 600_000);
assert_eq!(result, 400);
let result = t.calc_rows_for_recluster(4_500, 30_000_000, 600_000);
assert_eq!(result, 300);

// Case 2: If the block size is too smaller.
let result = t.calc_rows_for_recluster(4_000, 2_000_000, 600_000);
Expand All @@ -112,3 +112,29 @@ fn test_calc_rows_for_recluster() {
let result = t.calc_rows_for_recluster(4_000, 10_000_000, 600_000);
assert_eq!(result, 667);
}

#[test]
fn test_calc_partitions_for_recluster() {
let t = default_thresholds();

// compact enough to skip further calculations
assert_eq!(t.calc_partitions_for_recluster(1000, 500_000, 100_000), 1);

// row-based block count exceeds compressed-based block count, use max rows per block.
assert_eq!(
t.calc_partitions_for_recluster(10_000, 2_000_000, 100_000),
10
);

// Case 1: If the block size is too bigger.
let result = t.calc_partitions_for_recluster(4_500, 30_000_000, 600_000);
assert_eq!(result, 15);

// Case 2: If the block size is too smaller.
let result = t.calc_partitions_for_recluster(4_000, 800_000, 800_000);
assert_eq!(result, 2);

// Case 3: use the compressed-based block count.
let result = t.calc_partitions_for_recluster(4_000, 10_000_000, 600_000);
assert_eq!(result, 6);
}
95 changes: 93 additions & 2 deletions src/query/functions/src/scalars/hilbert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,32 @@ use databend_common_expression::types::BinaryType;
use databend_common_expression::types::DataType;
use databend_common_expression::types::GenericType;
use databend_common_expression::types::NullableType;
use databend_common_expression::types::NumberDataType;
use databend_common_expression::types::NumberType;
use databend_common_expression::types::ReturnType;
use databend_common_expression::types::StringType;
use databend_common_expression::types::ValueType;
use databend_common_expression::types::ALL_NUMERICS_TYPES;
use databend_common_expression::vectorize_with_builder_1_arg;
use databend_common_expression::vectorize_with_builder_2_arg;
use databend_common_expression::with_number_mapped_type;
use databend_common_expression::Column;
use databend_common_expression::FixedLengthEncoding;
use databend_common_expression::Function;
use databend_common_expression::FunctionDomain;
use databend_common_expression::FunctionEval;
use databend_common_expression::FunctionProperty;
use databend_common_expression::FunctionRegistry;
use databend_common_expression::FunctionSignature;
use databend_common_expression::ScalarRef;
use databend_common_expression::Value;
use rand::rngs::SmallRng;
use rand::Rng;
use rand::SeedableRng;

/// Registers Hilbert curve related functions with the function registry.
pub fn register(registry: &mut FunctionRegistry) {
// Register the hilbert_range_index function that calculates Hilbert indices for multi-dimensional data
// Register the hilbert_range_index function that calculates Hilbert indices for multidimensional data
registry.register_function_factory("hilbert_range_index", |_, args_type| {
let args_num = args_type.len();
// The function supports 2, 3, 4, or 5 dimensions (each dimension requires 2 arguments)
Expand Down Expand Up @@ -96,7 +105,7 @@ pub fn register(registry: &mut FunctionRegistry) {
points.push(key);
}

// Convert the multi-dimensional point to a Hilbert index
// Convert the multidimensional point to a Hilbert index
// This maps the n-dimensional point to a 1-dimensional value
let points = points
.iter()
Expand Down Expand Up @@ -151,6 +160,88 @@ pub fn register(registry: &mut FunctionRegistry) {
builder.push(id);
}),
);

// We use true randomness by appending a random u8 value at the end of the binary key.
// This introduces noise to break tie cases in clustering keys that are not uniformly distributed.
// Although this may slightly affect the accuracy of range_bound estimation,
// it ensures that Hilbert index + scatter will no longer suffer from data skew.
// Moreover, since the noise is added at the tail, the original order of the keys is preserved.
registry.properties.insert(
"add_noise".to_string(),
FunctionProperty::default().non_deterministic(),
);

registry.register_passthrough_nullable_1_arg::<StringType, BinaryType, _, _>(
"add_noise",
|_, _| FunctionDomain::Full,
vectorize_with_builder_1_arg::<StringType, BinaryType>(|val, builder, _| {
let mut bytes = val.as_bytes().to_vec();
let mut rng = SmallRng::from_entropy();
bytes.push(rng.gen::<u8>());
builder.put_slice(&bytes);
builder.commit_row();
}),
);

for ty in ALL_NUMERICS_TYPES {
with_number_mapped_type!(|NUM_TYPE| match ty {
NumberDataType::NUM_TYPE => {
registry
.register_passthrough_nullable_1_arg::<NumberType<NUM_TYPE>, BinaryType, _, _>(
"add_noise",
|_, _| FunctionDomain::Full,
vectorize_with_builder_1_arg::<NumberType<NUM_TYPE>, BinaryType>(
|val, builder, _| {
let mut encoded = val.encode().to_vec();
let mut rng = SmallRng::from_entropy();
encoded.push(rng.gen::<u8>());
builder.put_slice(&encoded);
builder.commit_row();
},
),
);
}
})
}

registry.register_passthrough_nullable_2_arg::<StringType, NumberType<u64>, BinaryType, _, _>(
"add_noise",
|_, _, _| FunctionDomain::Full,
vectorize_with_builder_2_arg::<StringType, NumberType<u64>, BinaryType>(
|val, level, builder, _| {
let mut bytes = val.as_bytes().to_vec();
let mut rng = SmallRng::from_entropy();
for _ in 0..level {
bytes.push(rng.gen::<u8>());
}
builder.put_slice(&bytes);
builder.commit_row();
},
),
);

for ty in ALL_NUMERICS_TYPES {
with_number_mapped_type!(|NUM_TYPE| match ty {
NumberDataType::NUM_TYPE => {
registry
.register_passthrough_nullable_2_arg::<NumberType<NUM_TYPE>, NumberType<u64>, BinaryType, _, _>(
"add_noise",
|_, _, _| FunctionDomain::Full,
vectorize_with_builder_2_arg::<NumberType<NUM_TYPE>, NumberType<u64>, BinaryType>(
|val, level, builder, _| {
let mut encoded = val.encode().to_vec();
let mut rng = SmallRng::from_entropy();
for _ in 0..level {
encoded.push(rng.gen::<u8>());
}
builder.put_slice(&encoded);
builder.commit_row();
},
),
);
}
})
}
}

/// Calculates the partition ID for a value based on range boundaries.
Expand Down
Loading
Loading