Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
280 changes: 208 additions & 72 deletions src/db/scan.rs

Large diffs are not rendered by default.

555 changes: 545 additions & 10 deletions src/db/tests/core/scan.rs

Large diffs are not rendered by default.

20 changes: 20 additions & 0 deletions src/inmem/immutable/memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ impl ImmutableMemTable {
ImmutableRowIter::new(self)
}

pub(crate) fn entry_count(&self) -> usize {
self.index.len()
}

pub(crate) fn min_key(&self) -> Option<KeyOwned> {
self.index.keys().next().map(|view| view.key().to_owned())
}
Expand All @@ -143,6 +147,22 @@ impl ImmutableMemTable {
&self.mvcc
}

pub(crate) fn commit_ts_bounds(&self) -> Option<(Timestamp, Timestamp)> {
let mut iter = self.mvcc.commit_ts.iter();
let first = iter.next()?;
let mut min_ts = *first;
let mut max_ts = *first;
for ts in iter {
if *ts < min_ts {
min_ts = *ts;
}
if *ts > max_ts {
max_ts = *ts;
}
}
Some((min_ts, max_ts))
}

fn mvcc_row(&self, row: u32) -> (Timestamp, bool) {
let idx = row as usize;
(self.mvcc.commit_ts[idx], self.mvcc.tombstone[idx])
Expand Down
30 changes: 28 additions & 2 deletions src/inmem/immutable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,32 @@ pub(crate) mod memtable;
pub(crate) type ImmutableSegment = memtable::ImmutableMemTable;

/// Lightweight pruning helper; currently returns all segment indexes.
pub(crate) fn prune_segments(segments: &[&ImmutableSegment]) -> Vec<usize> {
(0..segments.len()).collect()
pub(crate) fn prune_segments(
segments: &[&ImmutableSegment],
key_bounds: Option<&crate::query::scan::KeyBounds>,
read_ts: crate::mvcc::Timestamp,
) -> Vec<usize> {
segments
.iter()
.enumerate()
.filter_map(|(idx, segment)| {
if segment.entry_count() == 0 {
return None;
}
if let Some(bounds) = key_bounds {
let (Some(min_key), Some(max_key)) = (segment.min_key(), segment.max_key()) else {
return Some(idx);
};
if !bounds.overlaps(&min_key, &max_key) {
return None;
}
}
if let Some((min_commit_ts, _)) = segment.commit_ts_bounds()
&& min_commit_ts > read_ts
{
return None;
}
Some(idx)
})
.collect()
}
47 changes: 47 additions & 0 deletions src/inmem/mutable/memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,41 @@ impl DynMemInner {
}
}

pub(crate) fn key_bounds(&self) -> (Option<KeyOwned>, Option<KeyOwned>) {
let min = self
.index
.iter()
.next()
.map(|entry| entry.key().key().clone());
let max = self
.index
.iter()
.next_back()
.map(|entry| entry.key().key().clone());
(min, max)
}

pub(crate) fn commit_ts_bounds(&self) -> Option<(Timestamp, Timestamp)> {
let mut iter = self.index.iter();
let first = iter.next()?;
let mut min_ts = first.key().timestamp();
let mut max_ts = min_ts;
for entry in iter {
let ts = entry.key().timestamp();
if ts < min_ts {
min_ts = ts;
}
if ts > max_ts {
max_ts = ts;
}
}
Some((min_ts, max_ts))
}

pub(crate) fn row_count(&self) -> usize {
self.index.len()
}

#[cfg(all(test, feature = "tokio"))]
pub(crate) fn inspect_versions(&self, key: &KeyOwned) -> Option<Vec<(Timestamp, bool)>> {
let key_owned = key.clone();
Expand Down Expand Up @@ -689,6 +724,18 @@ impl DynMem {
self.inner.read().has_conflict(key, snapshot_ts)
}

pub(crate) fn key_bounds(&self) -> (Option<KeyOwned>, Option<KeyOwned>) {
self.inner.read().key_bounds()
}

pub(crate) fn commit_ts_bounds(&self) -> Option<(Timestamp, Timestamp)> {
self.inner.read().commit_ts_bounds()
}

pub(crate) fn row_count(&self) -> usize {
self.inner.read().row_count()
}

/// Access the extractor.
pub(crate) fn extractor(&self) -> &Arc<dyn KeyProjection> {
&self.extractor
Expand Down
194 changes: 149 additions & 45 deletions src/ondisk/sstable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,12 @@ pub enum SsTableError {
/// Details about why the row filter predicate was rejected.
reason: String,
},
/// Row selection length mismatch.
#[error("row selection length mismatch: {reason}")]
RowSelection {
/// Details about why the row selection was invalid.
reason: String,
},
/// Invalid path component produced while building an SSTable destination.
#[error("invalid sstable path component: {0}")]
InvalidPath(String),
Expand Down Expand Up @@ -1079,15 +1085,36 @@ impl SsTableReader {
}
}

fn row_filter_expr(predicate: &Expr, schema: &SchemaRef) -> Result<Option<Expr>, SsTableError> {
#[derive(Debug, Clone)]
pub(crate) struct PredicateSplit {
pub(crate) pushdown: Option<Expr>,
pub(crate) residual: Option<Expr>,
}

pub(crate) fn split_predicate_for_row_filter(
predicate: &Expr,
schema: &SchemaRef,
) -> PredicateSplit {
match predicate {
Expr::True => Ok(None),
Expr::False => Ok(Some(Expr::False)),
Expr::True => PredicateSplit {
pushdown: None,
residual: None,
},
Expr::False => PredicateSplit {
pushdown: Some(Expr::False),
residual: None,
},
Expr::Cmp { column, value, .. } => {
if scalar_matches_column(schema, column, value) {
Ok(Some(predicate.clone()))
PredicateSplit {
pushdown: Some(predicate.clone()),
residual: None,
}
} else {
Ok(None)
PredicateSplit {
pushdown: None,
residual: Some(predicate.clone()),
}
}
}
Expr::Between {
Expand All @@ -1096,79 +1123,156 @@ fn row_filter_expr(predicate: &Expr, schema: &SchemaRef) -> Result<Option<Expr>,
if scalar_matches_column(schema, column, low)
&& scalar_matches_column(schema, column, high)
{
Ok(Some(predicate.clone()))
PredicateSplit {
pushdown: Some(predicate.clone()),
residual: None,
}
} else {
Ok(None)
PredicateSplit {
pushdown: None,
residual: Some(predicate.clone()),
}
}
}
Expr::InList { column, values } => {
if scalars_match_column(schema, column, values) {
Ok(Some(predicate.clone()))
PredicateSplit {
pushdown: Some(predicate.clone()),
residual: None,
}
} else {
Ok(None)
PredicateSplit {
pushdown: None,
residual: Some(predicate.clone()),
}
}
}
Expr::StartsWith { column, .. } => {
if is_string_column(schema, column) {
Ok(Some(predicate.clone()))
PredicateSplit {
pushdown: Some(predicate.clone()),
residual: None,
}
} else {
Ok(None)
PredicateSplit {
pushdown: None,
residual: Some(predicate.clone()),
}
}
}
Expr::IsNull { column, .. } => {
if column_type(schema, column).is_some() {
Ok(Some(predicate.clone()))
PredicateSplit {
pushdown: Some(predicate.clone()),
residual: None,
}
} else {
Ok(None)
PredicateSplit {
pushdown: None,
residual: Some(predicate.clone()),
}
}
}
Expr::BloomFilterEq { column, .. } => Err(SsTableError::RowFilterPredicate {
reason: format!(
"BloomFilterEq predicate on column '{column}' is not supported for row filtering"
),
}),
Expr::BloomFilterInList { column, .. } => Err(SsTableError::RowFilterPredicate {
reason: format!(
"BloomFilterInList predicate on column '{column}' is not supported for row \
filtering"
),
}),
Expr::BloomFilterEq { .. } | Expr::BloomFilterInList { .. } => PredicateSplit {
pushdown: None,
residual: Some(predicate.clone()),
},
Expr::And(children) => {
let mut supported = Vec::new();
let mut pushdown = Vec::new();
let mut residual = Vec::new();
for child in children {
if let Some(expr) = row_filter_expr(child, schema)? {
supported.push(expr);
let split = split_predicate_for_row_filter(child, schema);
if let Some(expr) = split.pushdown {
pushdown.push(expr);
}
if let Some(expr) = split.residual {
residual.push(expr);
}
}
Ok(match supported.len() {
0 => None,
1 => Some(supported.remove(0)),
_ => Some(Expr::And(supported)),
})
PredicateSplit {
pushdown: combine_and(pushdown),
residual: combine_and(residual),
}
}
Expr::Or(children) => {
let mut supported = Vec::new();
if children.is_empty() {
return PredicateSplit {
pushdown: Some(Expr::False),
residual: None,
};
}
let mut pushdown = Vec::new();
for child in children {
match row_filter_expr(child, schema)? {
Some(expr) => supported.push(expr),
None => return Ok(None),
let split = split_predicate_for_row_filter(child, schema);
if split.residual.is_some() {
return PredicateSplit {
pushdown: None,
residual: Some(predicate.clone()),
};
}
match split.pushdown {
Some(expr) => pushdown.push(expr),
None => {
// Child is fully supported but represents no filter (True).
return PredicateSplit {
pushdown: None,
residual: None,
};
}
}
}
Ok(match supported.len() {
0 => None,
1 => Some(supported.remove(0)),
_ => Some(Expr::Or(supported)),
})
PredicateSplit {
pushdown: combine_or(pushdown),
residual: None,
}
}
Expr::Not(child) => {
Ok(row_filter_expr(child, schema)?.map(|expr| Expr::Not(Box::new(expr))))
let split = split_predicate_for_row_filter(child, schema);
if split.residual.is_some() {
return PredicateSplit {
pushdown: None,
residual: Some(predicate.clone()),
};
}
if split.pushdown.is_none() {
return PredicateSplit {
pushdown: Some(Expr::False),
residual: None,
};
}
let pushdown = split.pushdown.map(|expr| Expr::Not(Box::new(expr)));
PredicateSplit {
pushdown,
residual: None,
}
}
other => Err(SsTableError::RowFilterPredicate {
reason: format!("unsupported predicate variant: {other:?}"),
}),
_ => PredicateSplit {
pushdown: None,
residual: Some(predicate.clone()),
},
}
}

fn combine_and(mut parts: Vec<Expr>) -> Option<Expr> {
match parts.len() {
0 => None,
1 => Some(parts.remove(0)),
_ => Some(Expr::And(parts)),
}
}

fn combine_or(mut parts: Vec<Expr>) -> Option<Expr> {
match parts.len() {
0 => None,
1 => Some(parts.remove(0)),
_ => Some(Expr::Or(parts)),
}
}

fn row_filter_expr(predicate: &Expr, schema: &SchemaRef) -> Result<Option<Expr>, SsTableError> {
Ok(split_predicate_for_row_filter(predicate, schema).pushdown)
}

async fn build_projection_mask_for_names<E>(
fs: Arc<dyn DynFs>,
path: &Path,
Expand Down
Loading
Loading