feat: add aggregate-stats foundation types and helpers#8112
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces infrastructure for aggregate statistics, including new data structures like FileStatsItem and StatsCandidateFile to support efficient query execution by leveraging Parquet metadata. The changes also include updates to the RowGroupPruningStatistics logic to handle various data types more robustly. The review identified a potential compilation issue with the use of is_none_or in older Rust versions and suggested refactoring the duplicated logic between aggr_stats.rs and predicate/stats.rs into a shared helper function to improve maintainability.
| let should_replace = best.as_ref().is_none_or(|current| { | ||
| value | ||
| .partial_cmp(current) | ||
| .is_some_and(|ordering| ordering == target) | ||
| }); |
There was a problem hiding this comment.
The method is_none_or is not a standard method on Option<T> in older Rust versions and will cause a compilation error. You can achieve this with map_or(true, |...|) or a match statement.
| let should_replace = best.as_ref().is_none_or(|current| { | |
| value | |
| .partial_cmp(current) | |
| .is_some_and(|ordering| ordering == target) | |
| }); | |
| let should_replace = best.as_ref().map_or(true, |current| { | |
| value | |
| .partial_cmp(current) | |
| .is_some_and(|ordering| ordering == target) | |
| }); |
| fn parquet_bound_scalar( | ||
| stats: &ParquetStats, | ||
| target: Ordering, | ||
| arrow_type: &DataType, | ||
| ) -> Option<ScalarValue> { | ||
| let use_min = target == Ordering::Less; | ||
|
|
||
| match stats { | ||
| ParquetStats::Boolean(stats) => { | ||
| if !matches!(arrow_type, DataType::Boolean) { | ||
| return None; | ||
| } | ||
| Some(ScalarValue::Boolean(Some(if use_min { | ||
| *stats.min_opt()? | ||
| } else { | ||
| *stats.max_opt()? | ||
| }))) | ||
| } | ||
| ParquetStats::Int32(stats) => { | ||
| let raw = if use_min { | ||
| *stats.min_opt()? | ||
| } else { | ||
| *stats.max_opt()? | ||
| }; | ||
| match arrow_type { | ||
| DataType::Int32 | ||
| | DataType::UInt32 | ||
| | DataType::Int16 | ||
| | DataType::UInt16 | ||
| | DataType::Int8 | ||
| | DataType::UInt8 | ||
| | DataType::Time32(_) => Some(ScalarValue::Int32(Some(raw))), | ||
| DataType::Date32 => Some(ScalarValue::Date32(Some(raw))), | ||
| _ => None, | ||
| } | ||
| } | ||
| ParquetStats::Int64(stats) => { | ||
| let raw = if use_min { | ||
| *stats.min_opt()? | ||
| } else { | ||
| *stats.max_opt()? | ||
| }; | ||
| match arrow_type { | ||
| DataType::Int64 | DataType::UInt64 => Some(ScalarValue::Int64(Some(raw))), | ||
| DataType::Timestamp(TimeUnit::Second, _) => { | ||
| Some(ScalarValue::TimestampSecond(Some(raw), None)) | ||
| } | ||
| DataType::Timestamp(TimeUnit::Millisecond, _) => { | ||
| Some(ScalarValue::TimestampMillisecond(Some(raw), None)) | ||
| } | ||
| DataType::Timestamp(TimeUnit::Microsecond, _) => { | ||
| Some(ScalarValue::TimestampMicrosecond(Some(raw), None)) | ||
| } | ||
| DataType::Timestamp(TimeUnit::Nanosecond, _) => { | ||
| Some(ScalarValue::TimestampNanosecond(Some(raw), None)) | ||
| } | ||
| DataType::Date64 => Some(ScalarValue::Date64(Some(raw))), | ||
| DataType::Duration(_) => Some(ScalarValue::Int64(Some(raw))), | ||
| _ => None, | ||
| } | ||
| } | ||
| ParquetStats::Int96(_) => None, | ||
| ParquetStats::Float(stats) => { | ||
| if !matches!(arrow_type, DataType::Float32) { | ||
| return None; | ||
| } | ||
| Some(ScalarValue::Float32(Some(if use_min { | ||
| *stats.min_opt()? | ||
| } else { | ||
| *stats.max_opt()? | ||
| }))) | ||
| } | ||
| ParquetStats::Double(stats) => { | ||
| if !matches!(arrow_type, DataType::Float64) { | ||
| return None; | ||
| } | ||
| Some(ScalarValue::Float64(Some(if use_min { | ||
| *stats.min_opt()? | ||
| } else { | ||
| *stats.max_opt()? | ||
| }))) | ||
| } | ||
| ParquetStats::ByteArray(stats) => { | ||
| let bytes = if use_min { | ||
| stats.min_bytes_opt()? | ||
| } else { | ||
| stats.max_bytes_opt()? | ||
| }; | ||
| match arrow_type { | ||
| DataType::Utf8 | DataType::LargeUtf8 => String::from_utf8(bytes.to_owned()) | ||
| .ok() | ||
| .map(|s| ScalarValue::Utf8(Some(s))), | ||
| _ => None, | ||
| } | ||
| } | ||
| ParquetStats::FixedLenByteArray(_) => None, | ||
| } | ||
| } |
There was a problem hiding this comment.
The logic in parquet_bound_scalar for converting Parquet statistics to a datafusion::scalar::ScalarValue is nearly identical to the logic inside the impl_min_max_values! macro in src/table/src/predicate/stats.rs. To improve maintainability and reduce code duplication, refactor this logic into a shared helper function.
pub fn parquet_stats_to_scalar(
stats: &ParquetStats,
data_type: &DataType,
target: Ordering,
) -> Option<ScalarValue>References
- Refactor duplicated logic into shared helper functions to improve maintainability and reduce code duplication.
270466f to
40011c5
Compare
- Add SupportedStatAggr, StatsAwareFallbackReason, StatsAwareFileDecision, StatsAwareFileDecisionSnapshot, FileStatsItem, RowGroupStatsItem, and SendableFileStatsStream to store-api as pure data types - Add aggr_stats foundation module to common-query with requirement matching, partition filtering, and column-stats collection from parquet metadata - Add RowGroupPruningStatistics to table::predicate for parquet row-group pruning - Promote table predicate stats from cfg(test) to pub(crate) - Drive parquet physical-to-logical scalar conversion with Arrow DataType - Add bounds checks for schema-evolved parquet columns This is PR1 of a multi-PR split: data types and foundation only. No scanner runtime, optimizer rewrites, or sqlness. Signed-off-by: discord9 <discord9@163.com>
40011c5 to
9b379a1
Compare
I hereby agree to the terms of the GreptimeDB CLA.
Refer to a related PR or issue link (optional)
What's changed and what's your intention?
Add the aggregate-stats type system and foundation helpers required by the aggregate-stats runtime feature (PR2/PR3). No execution path is wired yet.
store-api
New aggregate-stats data types in
region_engine.rs:SupportedStatAggr— optimizer-visible requirement enum (CountRows, CountNonNull, MinValue, MaxValue)StatsAwareFallbackReason— why a file stays on row-scan pathStatsAwareFileDecision+StatsAwareFileDecisionSnapshot— per-file decisions and display summaryFileStatsItem/RowGroupStatsItem— parquet-metadata-backed file and row-group structuresSendableFileStatsStream— type alias for async stats transportcommon-query
New
aggr_statsmodule that interprets parquet row-group metadata into typed column statistics:FileColumnStats/StatsCandidateFile— collects null counts, min/max per column from row groupsDataType-driven conversion from parquet physical stats to logical scalar values (handles Timestamp, Date32/64, etc.)table
RowGroupPruningStatistics—PruningStatisticsimplementation backed by parquet row groups#[cfg(test)]topub(crate), with matching logical-type-aware conversionWhat's NOT in this PR (deferred)
StatsAwareFileDecisionState(runtime mutex cache)Properties/PrepareRequeststats fieldsRegionScanner::scan_statstrait method and implementationsAggrStatsrewrite rulePR Checklist