feat: aggr stat query at runtime#8046
Conversation
Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: discord9 <discord9@163.com>
There was a problem hiding this comment.
Code Review
This pull request introduces a significant optimization for aggregate queries by leveraging file statistics (min/max, null counts, row counts) to compute partial aggregates without scanning actual row data. This is achieved through a new physical optimizer rule, AggrStatsPhysicalRule, which rewrites aggregate query plans into a StatsScanExec for stats-based aggregation and a fallback RegionScanExec for files not covered by statistics. The RegionScanner trait and its implementations are extended to provide file statistics and manage the stats-aware skip logic. Review comments suggest improvements for correctness and efficiency in handling Parquet byte array statistics, better compatibility with older Rust toolchains by replacing is_none_or with map_or, and refining partition expression matching to support unpartitioned regions. Additionally, code hygiene improvements were suggested by removing unnecessary #[allow(dead_code)] and #[allow(unused)] attributes.
| ParquetStats::ByteArray(stats) => { | ||
| let bytes = if use_min { | ||
| stats.min_bytes_opt()? | ||
| } else { | ||
| stats.max_bytes_opt()? | ||
| }; | ||
| Some(ScalarValue::Utf8(String::from_utf8(bytes.to_owned()).ok())) | ||
| } |
There was a problem hiding this comment.
There are two issues here:
- Correctness: If
String::from_utf8fails (e.g., for binary data), it returnsNone, resulting inScalarValue::Utf8(None). This represents a SQLNULL. Since the file actually contains data, returningNULLis incorrect. It should returnNonefromparquet_bound_scalarto trigger a fallback to row scanning. - Efficiency:
bytes.to_owned()performs an unnecessary allocation. Usingstd::str::from_utf8is more efficient as it validates the bytes in place.
| ParquetStats::ByteArray(stats) => { | |
| let bytes = if use_min { | |
| stats.min_bytes_opt()? | |
| } else { | |
| stats.max_bytes_opt()? | |
| }; | |
| Some(ScalarValue::Utf8(String::from_utf8(bytes.to_owned()).ok())) | |
| } | |
| ParquetStats::ByteArray(stats) => { | |
| let bytes = if use_min { | |
| stats.min_bytes_opt()? | |
| } else { | |
| stats.max_bytes_opt()? | |
| }; | |
| let s = std::str::from_utf8(bytes).ok()?; | |
| Some(ScalarValue::Utf8(Some(s.to_string()))) | |
| } |
| fn matches_partition_expr( | ||
| file_partition_expr: Option<&str>, | ||
| region_partition_expr: Option<&str>, | ||
| ) -> bool { | ||
| matches!( | ||
| (file_partition_expr, region_partition_expr), | ||
| (Some(file_expr), Some(region_expr)) if file_expr == region_expr | ||
| ) | ||
| } |
There was a problem hiding this comment.
The current implementation of matches_partition_expr only returns true if both expressions are Some. This prevents stats-aware skip from working on regions that are not partitioned (where region_partition_expr is None). Comparing the Options directly allows the optimization to work when both are None.
fn matches_partition_expr(
file_partition_expr: Option<&str>,
region_partition_expr: Option<&str>,
) -> bool {
file_partition_expr == region_partition_expr
}| let should_replace = best.as_ref().is_none_or(|current| { | ||
| value | ||
| .partial_cmp(current) | ||
| .is_some_and(|ordering| ordering == target) | ||
| }); |
There was a problem hiding this comment.
is_none_or was stabilized in Rust 1.82. To maintain compatibility with older toolchains (e.g., 1.80 or 1.81), consider using map_or(true, ...) or a match statement.
| let should_replace = best.as_ref().is_none_or(|current| { | |
| value | |
| .partial_cmp(current) | |
| .is_some_and(|ordering| ordering == target) | |
| }); | |
| let should_replace = best.as_ref().map_or(true, |current| { | |
| value | |
| .partial_cmp(current) | |
| .is_some_and(|ordering| ordering == target) | |
| }); |
| // See the License for the specific language governing permissions and | ||
| // limitations under the License. | ||
|
|
||
| #![allow(dead_code)] |
| } | ||
| } | ||
|
|
||
| if columns.first().is_none_or(|column| column.is_empty()) { |
There was a problem hiding this comment.
| } | ||
|
|
||
| /// TODO(discord9): support more kind of aggr | ||
| #[allow(unused)] |
I hereby agree to the terms of the GreptimeDB CLA.
Refer to a related PR or issue link (optional)
What's changed and what's your intention?
PR Checklist
Please convert it to a draft if some of the following conditions are not met.