Skip to content

Commit 029593c

Browse files
authored
feat: add rowset planning and pruning (#574)
* feat: add rowset planning and pruning * fix: skip fully pruned sst scans * feat: wire parquet pushdown execution (#575) * feat: wire parquet pushdown execution * fix: keep residual predicate for schema-mismatched ssts * refactor: avoid predicate columns in sst-only scans * fix: keep delete sidecars when data pruned
1 parent 4c4384f commit 029593c

File tree

8 files changed

+1534
-145
lines changed

8 files changed

+1534
-145
lines changed

src/db/scan.rs

Lines changed: 208 additions & 72 deletions
Large diffs are not rendered by default.

src/db/tests/core/scan.rs

Lines changed: 545 additions & 10 deletions
Large diffs are not rendered by default.

src/inmem/immutable/memtable.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,10 @@ impl ImmutableMemTable {
128128
ImmutableRowIter::new(self)
129129
}
130130

131+
pub(crate) fn entry_count(&self) -> usize {
132+
self.index.len()
133+
}
134+
131135
pub(crate) fn min_key(&self) -> Option<KeyOwned> {
132136
self.index.keys().next().map(|view| view.key().to_owned())
133137
}
@@ -143,6 +147,22 @@ impl ImmutableMemTable {
143147
&self.mvcc
144148
}
145149

150+
pub(crate) fn commit_ts_bounds(&self) -> Option<(Timestamp, Timestamp)> {
151+
let mut iter = self.mvcc.commit_ts.iter();
152+
let first = iter.next()?;
153+
let mut min_ts = *first;
154+
let mut max_ts = *first;
155+
for ts in iter {
156+
if *ts < min_ts {
157+
min_ts = *ts;
158+
}
159+
if *ts > max_ts {
160+
max_ts = *ts;
161+
}
162+
}
163+
Some((min_ts, max_ts))
164+
}
165+
146166
fn mvcc_row(&self, row: u32) -> (Timestamp, bool) {
147167
let idx = row as usize;
148168
(self.mvcc.commit_ts[idx], self.mvcc.tombstone[idx])

src/inmem/immutable/mod.rs

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,32 @@ pub(crate) mod memtable;
88
pub(crate) type ImmutableSegment = memtable::ImmutableMemTable;
99

1010
/// Lightweight pruning helper; currently returns all segment indexes.
11-
pub(crate) fn prune_segments(segments: &[&ImmutableSegment]) -> Vec<usize> {
12-
(0..segments.len()).collect()
11+
pub(crate) fn prune_segments(
12+
segments: &[&ImmutableSegment],
13+
key_bounds: Option<&crate::query::scan::KeyBounds>,
14+
read_ts: crate::mvcc::Timestamp,
15+
) -> Vec<usize> {
16+
segments
17+
.iter()
18+
.enumerate()
19+
.filter_map(|(idx, segment)| {
20+
if segment.entry_count() == 0 {
21+
return None;
22+
}
23+
if let Some(bounds) = key_bounds {
24+
let (Some(min_key), Some(max_key)) = (segment.min_key(), segment.max_key()) else {
25+
return Some(idx);
26+
};
27+
if !bounds.overlaps(&min_key, &max_key) {
28+
return None;
29+
}
30+
}
31+
if let Some((min_commit_ts, _)) = segment.commit_ts_bounds()
32+
&& min_commit_ts > read_ts
33+
{
34+
return None;
35+
}
36+
Some(idx)
37+
})
38+
.collect()
1339
}

src/inmem/mutable/memtable.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -493,6 +493,41 @@ impl DynMemInner {
493493
}
494494
}
495495

496+
pub(crate) fn key_bounds(&self) -> (Option<KeyOwned>, Option<KeyOwned>) {
497+
let min = self
498+
.index
499+
.iter()
500+
.next()
501+
.map(|entry| entry.key().key().clone());
502+
let max = self
503+
.index
504+
.iter()
505+
.next_back()
506+
.map(|entry| entry.key().key().clone());
507+
(min, max)
508+
}
509+
510+
pub(crate) fn commit_ts_bounds(&self) -> Option<(Timestamp, Timestamp)> {
511+
let mut iter = self.index.iter();
512+
let first = iter.next()?;
513+
let mut min_ts = first.key().timestamp();
514+
let mut max_ts = min_ts;
515+
for entry in iter {
516+
let ts = entry.key().timestamp();
517+
if ts < min_ts {
518+
min_ts = ts;
519+
}
520+
if ts > max_ts {
521+
max_ts = ts;
522+
}
523+
}
524+
Some((min_ts, max_ts))
525+
}
526+
527+
pub(crate) fn row_count(&self) -> usize {
528+
self.index.len()
529+
}
530+
496531
#[cfg(all(test, feature = "tokio"))]
497532
pub(crate) fn inspect_versions(&self, key: &KeyOwned) -> Option<Vec<(Timestamp, bool)>> {
498533
let key_owned = key.clone();
@@ -689,6 +724,18 @@ impl DynMem {
689724
self.inner.read().has_conflict(key, snapshot_ts)
690725
}
691726

727+
pub(crate) fn key_bounds(&self) -> (Option<KeyOwned>, Option<KeyOwned>) {
728+
self.inner.read().key_bounds()
729+
}
730+
731+
pub(crate) fn commit_ts_bounds(&self) -> Option<(Timestamp, Timestamp)> {
732+
self.inner.read().commit_ts_bounds()
733+
}
734+
735+
pub(crate) fn row_count(&self) -> usize {
736+
self.inner.read().row_count()
737+
}
738+
692739
/// Access the extractor.
693740
pub(crate) fn extractor(&self) -> &Arc<dyn KeyProjection> {
694741
&self.extractor

src/ondisk/sstable.rs

Lines changed: 149 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,12 @@ pub enum SsTableError {
382382
/// Details about why the row filter predicate was rejected.
383383
reason: String,
384384
},
385+
/// Row selection length mismatch.
386+
#[error("row selection length mismatch: {reason}")]
387+
RowSelection {
388+
/// Details about why the row selection was invalid.
389+
reason: String,
390+
},
385391
/// Invalid path component produced while building an SSTable destination.
386392
#[error("invalid sstable path component: {0}")]
387393
InvalidPath(String),
@@ -1079,15 +1085,36 @@ impl SsTableReader {
10791085
}
10801086
}
10811087

1082-
fn row_filter_expr(predicate: &Expr, schema: &SchemaRef) -> Result<Option<Expr>, SsTableError> {
1088+
#[derive(Debug, Clone)]
1089+
pub(crate) struct PredicateSplit {
1090+
pub(crate) pushdown: Option<Expr>,
1091+
pub(crate) residual: Option<Expr>,
1092+
}
1093+
1094+
pub(crate) fn split_predicate_for_row_filter(
1095+
predicate: &Expr,
1096+
schema: &SchemaRef,
1097+
) -> PredicateSplit {
10831098
match predicate {
1084-
Expr::True => Ok(None),
1085-
Expr::False => Ok(Some(Expr::False)),
1099+
Expr::True => PredicateSplit {
1100+
pushdown: None,
1101+
residual: None,
1102+
},
1103+
Expr::False => PredicateSplit {
1104+
pushdown: Some(Expr::False),
1105+
residual: None,
1106+
},
10861107
Expr::Cmp { column, value, .. } => {
10871108
if scalar_matches_column(schema, column, value) {
1088-
Ok(Some(predicate.clone()))
1109+
PredicateSplit {
1110+
pushdown: Some(predicate.clone()),
1111+
residual: None,
1112+
}
10891113
} else {
1090-
Ok(None)
1114+
PredicateSplit {
1115+
pushdown: None,
1116+
residual: Some(predicate.clone()),
1117+
}
10911118
}
10921119
}
10931120
Expr::Between {
@@ -1096,79 +1123,156 @@ fn row_filter_expr(predicate: &Expr, schema: &SchemaRef) -> Result<Option<Expr>,
10961123
if scalar_matches_column(schema, column, low)
10971124
&& scalar_matches_column(schema, column, high)
10981125
{
1099-
Ok(Some(predicate.clone()))
1126+
PredicateSplit {
1127+
pushdown: Some(predicate.clone()),
1128+
residual: None,
1129+
}
11001130
} else {
1101-
Ok(None)
1131+
PredicateSplit {
1132+
pushdown: None,
1133+
residual: Some(predicate.clone()),
1134+
}
11021135
}
11031136
}
11041137
Expr::InList { column, values } => {
11051138
if scalars_match_column(schema, column, values) {
1106-
Ok(Some(predicate.clone()))
1139+
PredicateSplit {
1140+
pushdown: Some(predicate.clone()),
1141+
residual: None,
1142+
}
11071143
} else {
1108-
Ok(None)
1144+
PredicateSplit {
1145+
pushdown: None,
1146+
residual: Some(predicate.clone()),
1147+
}
11091148
}
11101149
}
11111150
Expr::StartsWith { column, .. } => {
11121151
if is_string_column(schema, column) {
1113-
Ok(Some(predicate.clone()))
1152+
PredicateSplit {
1153+
pushdown: Some(predicate.clone()),
1154+
residual: None,
1155+
}
11141156
} else {
1115-
Ok(None)
1157+
PredicateSplit {
1158+
pushdown: None,
1159+
residual: Some(predicate.clone()),
1160+
}
11161161
}
11171162
}
11181163
Expr::IsNull { column, .. } => {
11191164
if column_type(schema, column).is_some() {
1120-
Ok(Some(predicate.clone()))
1165+
PredicateSplit {
1166+
pushdown: Some(predicate.clone()),
1167+
residual: None,
1168+
}
11211169
} else {
1122-
Ok(None)
1170+
PredicateSplit {
1171+
pushdown: None,
1172+
residual: Some(predicate.clone()),
1173+
}
11231174
}
11241175
}
1125-
Expr::BloomFilterEq { column, .. } => Err(SsTableError::RowFilterPredicate {
1126-
reason: format!(
1127-
"BloomFilterEq predicate on column '{column}' is not supported for row filtering"
1128-
),
1129-
}),
1130-
Expr::BloomFilterInList { column, .. } => Err(SsTableError::RowFilterPredicate {
1131-
reason: format!(
1132-
"BloomFilterInList predicate on column '{column}' is not supported for row \
1133-
filtering"
1134-
),
1135-
}),
1176+
Expr::BloomFilterEq { .. } | Expr::BloomFilterInList { .. } => PredicateSplit {
1177+
pushdown: None,
1178+
residual: Some(predicate.clone()),
1179+
},
11361180
Expr::And(children) => {
1137-
let mut supported = Vec::new();
1181+
let mut pushdown = Vec::new();
1182+
let mut residual = Vec::new();
11381183
for child in children {
1139-
if let Some(expr) = row_filter_expr(child, schema)? {
1140-
supported.push(expr);
1184+
let split = split_predicate_for_row_filter(child, schema);
1185+
if let Some(expr) = split.pushdown {
1186+
pushdown.push(expr);
1187+
}
1188+
if let Some(expr) = split.residual {
1189+
residual.push(expr);
11411190
}
11421191
}
1143-
Ok(match supported.len() {
1144-
0 => None,
1145-
1 => Some(supported.remove(0)),
1146-
_ => Some(Expr::And(supported)),
1147-
})
1192+
PredicateSplit {
1193+
pushdown: combine_and(pushdown),
1194+
residual: combine_and(residual),
1195+
}
11481196
}
11491197
Expr::Or(children) => {
1150-
let mut supported = Vec::new();
1198+
if children.is_empty() {
1199+
return PredicateSplit {
1200+
pushdown: Some(Expr::False),
1201+
residual: None,
1202+
};
1203+
}
1204+
let mut pushdown = Vec::new();
11511205
for child in children {
1152-
match row_filter_expr(child, schema)? {
1153-
Some(expr) => supported.push(expr),
1154-
None => return Ok(None),
1206+
let split = split_predicate_for_row_filter(child, schema);
1207+
if split.residual.is_some() {
1208+
return PredicateSplit {
1209+
pushdown: None,
1210+
residual: Some(predicate.clone()),
1211+
};
1212+
}
1213+
match split.pushdown {
1214+
Some(expr) => pushdown.push(expr),
1215+
None => {
1216+
// Child is fully supported but represents no filter (True).
1217+
return PredicateSplit {
1218+
pushdown: None,
1219+
residual: None,
1220+
};
1221+
}
11551222
}
11561223
}
1157-
Ok(match supported.len() {
1158-
0 => None,
1159-
1 => Some(supported.remove(0)),
1160-
_ => Some(Expr::Or(supported)),
1161-
})
1224+
PredicateSplit {
1225+
pushdown: combine_or(pushdown),
1226+
residual: None,
1227+
}
11621228
}
11631229
Expr::Not(child) => {
1164-
Ok(row_filter_expr(child, schema)?.map(|expr| Expr::Not(Box::new(expr))))
1230+
let split = split_predicate_for_row_filter(child, schema);
1231+
if split.residual.is_some() {
1232+
return PredicateSplit {
1233+
pushdown: None,
1234+
residual: Some(predicate.clone()),
1235+
};
1236+
}
1237+
if split.pushdown.is_none() {
1238+
return PredicateSplit {
1239+
pushdown: Some(Expr::False),
1240+
residual: None,
1241+
};
1242+
}
1243+
let pushdown = split.pushdown.map(|expr| Expr::Not(Box::new(expr)));
1244+
PredicateSplit {
1245+
pushdown,
1246+
residual: None,
1247+
}
11651248
}
1166-
other => Err(SsTableError::RowFilterPredicate {
1167-
reason: format!("unsupported predicate variant: {other:?}"),
1168-
}),
1249+
_ => PredicateSplit {
1250+
pushdown: None,
1251+
residual: Some(predicate.clone()),
1252+
},
1253+
}
1254+
}
1255+
1256+
fn combine_and(mut parts: Vec<Expr>) -> Option<Expr> {
1257+
match parts.len() {
1258+
0 => None,
1259+
1 => Some(parts.remove(0)),
1260+
_ => Some(Expr::And(parts)),
11691261
}
11701262
}
11711263

1264+
fn combine_or(mut parts: Vec<Expr>) -> Option<Expr> {
1265+
match parts.len() {
1266+
0 => None,
1267+
1 => Some(parts.remove(0)),
1268+
_ => Some(Expr::Or(parts)),
1269+
}
1270+
}
1271+
1272+
fn row_filter_expr(predicate: &Expr, schema: &SchemaRef) -> Result<Option<Expr>, SsTableError> {
1273+
Ok(split_predicate_for_row_filter(predicate, schema).pushdown)
1274+
}
1275+
11721276
async fn build_projection_mask_for_names<E>(
11731277
fs: Arc<dyn DynFs>,
11741278
path: &Path,

0 commit comments

Comments
 (0)