Skip to content

Commit 40011c5

Browse files
committed
feat: add aggregate-stats foundation types and helpers
- Add SupportedStatAggr, StatsAwareFallbackReason, StatsAwareFileDecision, StatsAwareFileDecisionSnapshot, FileStatsItem, RowGroupStatsItem, and SendableFileStatsStream to store-api as pure data types - Add aggr_stats foundation module to common-query with requirement matching, partition filtering, and column-stats collection from parquet metadata - Add RowGroupPruningStatistics to table::predicate for parquet row-group pruning - Promote table predicate stats from cfg(test) to pub(crate) - Drive parquet physical-to-logical scalar conversion with Arrow DataType - Add bounds checks for schema-evolved parquet columns This is PR1 of a multi-PR split: data types and foundation only. No scanner runtime, optimizer rewrites, or sqlness. Signed-off-by: discord9 <discord9@163.com>
1 parent d709fd2 commit 40011c5

7 files changed

Lines changed: 570 additions & 11 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/common/query/src/aggr_stats.rs

Lines changed: 382 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,382 @@
1+
// Copyright 2023 Greptime Team
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::cmp::Ordering;
16+
use std::collections::{HashMap, HashSet};
17+
18+
use datafusion::parquet::file::statistics::Statistics as ParquetStats;
19+
use datafusion::scalar::ScalarValue;
20+
use datafusion_common::{DataFusionError, Result};
21+
use datatypes::arrow::datatypes::{DataType, TimeUnit};
22+
use datatypes::schema::SchemaRef as RegionSchemaRef;
23+
use datatypes::value::Value;
24+
use store_api::region_engine::{FileStatsItem, SupportedStatAggr};
25+
26+
#[derive(Debug, Clone, Default, PartialEq)]
27+
pub struct FileColumnStats {
28+
pub null_count: Option<u64>,
29+
pub min_value: Option<Value>,
30+
pub max_value: Option<Value>,
31+
}
32+
33+
#[derive(Debug, Clone, Default, PartialEq)]
34+
pub struct StatsCandidateFile {
35+
pub num_rows: Option<u64>,
36+
pub column_stats: HashMap<String, FileColumnStats>,
37+
}
38+
39+
impl StatsCandidateFile {
40+
pub fn from_file_stats(
41+
file_stats: &FileStatsItem,
42+
region_partition_expr: Option<&str>,
43+
requirements: &[SupportedStatAggr],
44+
region_schema: &RegionSchemaRef,
45+
) -> Result<Option<Self>> {
46+
let column_names = required_columns(requirements);
47+
if !matches_partition_expr(
48+
file_stats.file_partition_expr.as_deref(),
49+
region_partition_expr,
50+
) {
51+
return Ok(None);
52+
}
53+
let column_stats = collect_column_stats(file_stats, region_schema, &column_names)?;
54+
55+
let candidate = Self {
56+
num_rows: file_stats.num_rows,
57+
column_stats,
58+
};
59+
for requirement in requirements {
60+
if candidate.stat_value(requirement)?.is_none() {
61+
return Ok(None);
62+
}
63+
}
64+
Ok(Some(candidate))
65+
}
66+
67+
pub fn stat_value(&self, requirement: &SupportedStatAggr) -> Result<Option<Value>> {
68+
match requirement {
69+
SupportedStatAggr::CountRows => self.num_rows.map(count_value).transpose(),
70+
SupportedStatAggr::CountNonNull { column_name } => {
71+
let Some(column_stats) = self.column_stats.get(column_name) else {
72+
return Ok(None);
73+
};
74+
let Some(num_rows) = self.num_rows else {
75+
return Ok(None);
76+
};
77+
let Some(null_count) = column_stats.null_count else {
78+
return Ok(None);
79+
};
80+
let Some(non_null_count) = num_rows.checked_sub(null_count) else {
81+
return Err(DataFusionError::Internal(format!(
82+
"StatsScanExec found null_count > num_rows for column {}",
83+
column_name
84+
)));
85+
};
86+
count_value(non_null_count).map(Some)
87+
}
88+
SupportedStatAggr::MinValue { column_name } => Ok(self
89+
.column_stats
90+
.get(column_name)
91+
.and_then(|stats| stats.min_value.clone())),
92+
SupportedStatAggr::MaxValue { column_name } => Ok(self
93+
.column_stats
94+
.get(column_name)
95+
.and_then(|stats| stats.max_value.clone())),
96+
}
97+
}
98+
}
99+
100+
fn count_value(value: u64) -> Result<Value> {
101+
let value = i64::try_from(value).map_err(|_| {
102+
DataFusionError::Internal(format!(
103+
"StatsScanExec count state exceeds Int64 range: {}",
104+
value
105+
))
106+
})?;
107+
Ok(Value::Int64(value))
108+
}
109+
110+
fn matches_partition_expr(
111+
file_partition_expr: Option<&str>,
112+
region_partition_expr: Option<&str>,
113+
) -> bool {
114+
match (file_partition_expr, region_partition_expr) {
115+
(Some(file_expr), Some(region_expr)) => file_expr == region_expr,
116+
(None, None) => true,
117+
_ => false,
118+
}
119+
}
120+
121+
fn required_columns(requirements: &[SupportedStatAggr]) -> HashSet<String> {
122+
requirements
123+
.iter()
124+
.filter_map(|requirement| match requirement {
125+
SupportedStatAggr::CountRows => None,
126+
SupportedStatAggr::CountNonNull { column_name }
127+
| SupportedStatAggr::MinValue { column_name }
128+
| SupportedStatAggr::MaxValue { column_name } => Some(column_name.clone()),
129+
})
130+
.collect()
131+
}
132+
133+
fn collect_column_stats(
134+
file_stats: &FileStatsItem,
135+
region_schema: &RegionSchemaRef,
136+
column_names: &HashSet<String>,
137+
) -> Result<HashMap<String, FileColumnStats>> {
138+
column_names
139+
.iter()
140+
.map(|column_name| {
141+
Ok((
142+
column_name.clone(),
143+
collect_one_column_stats(file_stats, region_schema, column_name)?,
144+
))
145+
})
146+
.collect()
147+
}
148+
149+
fn collect_one_column_stats(
150+
file_stats: &FileStatsItem,
151+
region_schema: &RegionSchemaRef,
152+
column_name: &str,
153+
) -> Result<FileColumnStats> {
154+
let Some(column_index) = region_schema.column_index_by_name(column_name) else {
155+
return Ok(FileColumnStats::default());
156+
};
157+
158+
let arrow_type = region_schema.arrow_schema().field(column_index).data_type();
159+
160+
Ok(FileColumnStats {
161+
null_count: sum_null_counts(file_stats, column_index)?,
162+
min_value: best_row_group_value(file_stats, column_index, Ordering::Less, arrow_type)?,
163+
max_value: best_row_group_value(file_stats, column_index, Ordering::Greater, arrow_type)?,
164+
})
165+
}
166+
167+
fn sum_null_counts(file_stats: &FileStatsItem, column_index: usize) -> Result<Option<u64>> {
168+
if file_stats.row_groups.is_empty() {
169+
return Ok(None);
170+
}
171+
172+
let mut total = 0_u64;
173+
for row_group in &file_stats.row_groups {
174+
if column_index >= row_group.metadata.num_columns() {
175+
return Ok(None);
176+
}
177+
let Some(stats) = row_group.metadata.column(column_index).statistics() else {
178+
return Ok(None);
179+
};
180+
let Some(value) = stats.null_count_opt() else {
181+
return Ok(None);
182+
};
183+
total = total.checked_add(value).ok_or_else(|| {
184+
DataFusionError::Internal("StatsScanExec null-count overflow".to_string())
185+
})?;
186+
}
187+
Ok(Some(total))
188+
}
189+
190+
fn best_row_group_value(
191+
file_stats: &FileStatsItem,
192+
column_index: usize,
193+
target: Ordering,
194+
arrow_type: &DataType,
195+
) -> Result<Option<Value>> {
196+
let mut best = None;
197+
198+
for row_group in &file_stats.row_groups {
199+
if column_index >= row_group.metadata.num_columns() {
200+
return Ok(None);
201+
}
202+
let Some(stats) = row_group.metadata.column(column_index).statistics() else {
203+
return Ok(None);
204+
};
205+
let Some(scalar) = parquet_bound_scalar(stats, target, arrow_type) else {
206+
return Ok(None);
207+
};
208+
let value = Value::try_from(scalar).map_err(|error| {
209+
DataFusionError::Internal(format!(
210+
"StatsScanExec failed to convert row-group scalar: {}",
211+
error
212+
))
213+
})?;
214+
let should_replace = best.as_ref().is_none_or(|current| {
215+
value
216+
.partial_cmp(current)
217+
.is_some_and(|ordering| ordering == target)
218+
});
219+
if should_replace {
220+
best = Some(value);
221+
}
222+
}
223+
224+
Ok(best)
225+
}
226+
227+
fn parquet_bound_scalar(
228+
stats: &ParquetStats,
229+
target: Ordering,
230+
arrow_type: &DataType,
231+
) -> Option<ScalarValue> {
232+
let use_min = target == Ordering::Less;
233+
234+
match stats {
235+
ParquetStats::Boolean(stats) => {
236+
if !matches!(arrow_type, DataType::Boolean) {
237+
return None;
238+
}
239+
Some(ScalarValue::Boolean(Some(if use_min {
240+
*stats.min_opt()?
241+
} else {
242+
*stats.max_opt()?
243+
})))
244+
}
245+
ParquetStats::Int32(stats) => {
246+
let raw = if use_min {
247+
*stats.min_opt()?
248+
} else {
249+
*stats.max_opt()?
250+
};
251+
match arrow_type {
252+
DataType::Int32
253+
| DataType::UInt32
254+
| DataType::Int16
255+
| DataType::UInt16
256+
| DataType::Int8
257+
| DataType::UInt8
258+
| DataType::Time32(_) => Some(ScalarValue::Int32(Some(raw))),
259+
DataType::Date32 => Some(ScalarValue::Date32(Some(raw))),
260+
_ => None,
261+
}
262+
}
263+
ParquetStats::Int64(stats) => {
264+
let raw = if use_min {
265+
*stats.min_opt()?
266+
} else {
267+
*stats.max_opt()?
268+
};
269+
match arrow_type {
270+
DataType::Int64 | DataType::UInt64 => Some(ScalarValue::Int64(Some(raw))),
271+
DataType::Timestamp(TimeUnit::Second, _) => {
272+
Some(ScalarValue::TimestampSecond(Some(raw), None))
273+
}
274+
DataType::Timestamp(TimeUnit::Millisecond, _) => {
275+
Some(ScalarValue::TimestampMillisecond(Some(raw), None))
276+
}
277+
DataType::Timestamp(TimeUnit::Microsecond, _) => {
278+
Some(ScalarValue::TimestampMicrosecond(Some(raw), None))
279+
}
280+
DataType::Timestamp(TimeUnit::Nanosecond, _) => {
281+
Some(ScalarValue::TimestampNanosecond(Some(raw), None))
282+
}
283+
DataType::Date64 => Some(ScalarValue::Date64(Some(raw))),
284+
DataType::Duration(_) => Some(ScalarValue::Int64(Some(raw))),
285+
_ => None,
286+
}
287+
}
288+
ParquetStats::Int96(_) => None,
289+
ParquetStats::Float(stats) => {
290+
if !matches!(arrow_type, DataType::Float32) {
291+
return None;
292+
}
293+
Some(ScalarValue::Float32(Some(if use_min {
294+
*stats.min_opt()?
295+
} else {
296+
*stats.max_opt()?
297+
})))
298+
}
299+
ParquetStats::Double(stats) => {
300+
if !matches!(arrow_type, DataType::Float64) {
301+
return None;
302+
}
303+
Some(ScalarValue::Float64(Some(if use_min {
304+
*stats.min_opt()?
305+
} else {
306+
*stats.max_opt()?
307+
})))
308+
}
309+
ParquetStats::ByteArray(stats) => {
310+
let bytes = if use_min {
311+
stats.min_bytes_opt()?
312+
} else {
313+
stats.max_bytes_opt()?
314+
};
315+
match arrow_type {
316+
DataType::Utf8 | DataType::LargeUtf8 => String::from_utf8(bytes.to_owned())
317+
.ok()
318+
.map(|s| ScalarValue::Utf8(Some(s))),
319+
_ => None,
320+
}
321+
}
322+
ParquetStats::FixedLenByteArray(_) => None,
323+
}
324+
}
325+
326+
#[cfg(test)]
327+
mod tests {
328+
use std::sync::Arc;
329+
330+
use datatypes::schema::Schema;
331+
332+
use super::*;
333+
334+
#[test]
335+
fn stats_candidate_accepts_unpartitioned_file_for_count_rows() {
336+
let file_stats = FileStatsItem {
337+
file_id: "file-1".to_string(),
338+
num_rows: Some(42),
339+
file_partition_expr: None,
340+
row_groups: vec![],
341+
};
342+
let region_schema: RegionSchemaRef = Arc::new(Schema::new(vec![]));
343+
344+
let candidate = StatsCandidateFile::from_file_stats(
345+
&file_stats,
346+
None,
347+
&[SupportedStatAggr::CountRows],
348+
&region_schema,
349+
)
350+
.expect("count rows stats should be readable");
351+
352+
assert!(candidate.is_some());
353+
assert_eq!(
354+
candidate
355+
.unwrap()
356+
.stat_value(&SupportedStatAggr::CountRows)
357+
.expect("count rows should be convertible"),
358+
Some(Value::Int64(42))
359+
);
360+
}
361+
362+
#[test]
363+
fn stats_candidate_rejects_partition_mismatch() {
364+
let file_stats = FileStatsItem {
365+
file_id: "file-2".to_string(),
366+
num_rows: Some(42),
367+
file_partition_expr: Some("host = 'a'".to_string()),
368+
row_groups: vec![],
369+
};
370+
let region_schema: RegionSchemaRef = Arc::new(Schema::new(vec![]));
371+
372+
let candidate = StatsCandidateFile::from_file_stats(
373+
&file_stats,
374+
Some("host = 'b'"),
375+
&[SupportedStatAggr::CountRows],
376+
&region_schema,
377+
)
378+
.expect("count rows stats should be readable");
379+
380+
assert!(candidate.is_none());
381+
}
382+
}

src/common/query/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
pub mod aggr_stats;
1516
pub mod columnar_value;
1617
pub mod error;
1718
pub mod logical_plan;

0 commit comments

Comments
 (0)