Skip to content

Commit 7fce5bb

Browse files
authored
Merge branch 'main' into limit_pushdown
2 parents 025dc61 + 0c408a7 commit 7fce5bb

8 files changed

Lines changed: 302 additions & 43 deletions

File tree

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ keywords = ["arrow", "arrow-rs", "datafusion"]
2828
rust-version = "1.85.1"
2929

3030
[dependencies]
31+
aquamarine = "0.6.0"
3132
arrow = "56.0.0"
3233
arrow-schema = "56.0.0"
3334
async-trait = "0.1.89"

src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@
4242
pub mod materialized;
4343

4444
/// An implementation of Query Rewriting, an optimization that rewrites queries to make use of materialized views.
45+
///
46+
/// The implementation is based heavily on [this paper](https://dsg.uwaterloo.ca/seminars/notes/larson-paper.pdf),
47+
/// *Optimizing Queries Using Materialized Views: A Practical, Scalable Solution*.
4548
pub mod rewrite;
4649

4750
/// Configuration options for materialized view related features.

src/materialized.rs

Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
/// Track dependencies of materialized data in object storage
1918
pub mod dependencies;
2019

2120
/// Pluggable metadata sources for incremental view maintenance
@@ -41,6 +40,7 @@ use datafusion::{
4140
catalog::TableProvider,
4241
datasource::listing::{ListingTable, ListingTableUrl},
4342
};
43+
use datafusion_common::DataFusionError;
4444
use datafusion_expr::LogicalPlan;
4545
use itertools::Itertools;
4646

@@ -110,6 +110,14 @@ pub trait Materialized: ListingTableLike {
110110
fn config(&self) -> MaterializedConfig {
111111
MaterializedConfig::default()
112112
}
113+
114+
/// Which partition columns are 'static'.
115+
/// Static partition columns are those that are used in incremental view maintenance.
116+
/// These should be a prefix of the full set of partition columns returned by [`ListingTableLike::partition_columns`].
117+
/// The rest of the partition columns are 'dynamic' and their values will be generated at runtime during incremental refresh.
118+
fn static_partition_columns(&self) -> Vec<String> {
119+
<Self as ListingTableLike>::partition_columns(self)
120+
}
113121
}
114122

115123
/// Register a [`Materialized`] implementation in this registry.
@@ -122,13 +130,38 @@ pub fn register_materialized<T: Materialized>() {
122130
}
123131

124132
/// Attempt to cast the given TableProvider into a [`Materialized`].
125-
/// If the table's type has not been registered using [`register_materialized`], will return `None`.
126-
pub fn cast_to_materialized(table: &dyn TableProvider) -> Option<&dyn Materialized> {
127-
TABLE_TYPE_REGISTRY.cast_to_materialized(table).or_else(|| {
128-
TABLE_TYPE_REGISTRY
129-
.cast_to_decorator(table)
130-
.and_then(|decorator| cast_to_materialized(decorator.base()))
131-
})
133+
/// If the table's type has not been registered using [`register_materialized`], will return `Ok(None)`.
134+
///
135+
/// Does a runtime check on some invariants of `Materialized` and returns an error if they are violated.
136+
/// In particular, checks that the static partition columns are a prefix of all partition columns.
137+
pub fn cast_to_materialized(
138+
table: &dyn TableProvider,
139+
) -> Result<Option<&dyn Materialized>, DataFusionError> {
140+
let materialized = match TABLE_TYPE_REGISTRY
141+
.cast_to_materialized(table)
142+
.map(Ok)
143+
.or_else(|| {
144+
TABLE_TYPE_REGISTRY
145+
.cast_to_decorator(table)
146+
.and_then(|decorator| cast_to_materialized(decorator.base()).transpose())
147+
})
148+
.transpose()?
149+
{
150+
None => return Ok(None),
151+
Some(m) => m,
152+
};
153+
154+
let static_partition_cols = materialized.static_partition_columns();
155+
let all_partition_cols = materialized.partition_columns();
156+
157+
if materialized.partition_columns()[..static_partition_cols.len()] != static_partition_cols[..]
158+
{
159+
return Err(DataFusionError::Plan(format!(
160+
"Materialized view's static partition columns ({static_partition_cols:?}) must be a prefix of all partition columns ({all_partition_cols:?})"
161+
)));
162+
}
163+
164+
Ok(Some(materialized))
132165
}
133166

134167
/// A `TableProvider` that decorates other `TableProvider`s.

src/materialized/dependencies.rs

Lines changed: 226 additions & 30 deletions
Large diffs are not rendered by default.

src/materialized/file_metadata.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -722,7 +722,7 @@ impl FileMetadataBuilder {
722722
}
723723
}
724724

725-
/// Provides [`ObjectMetadata`] data to the [`FileMetadata`] table provider.
725+
/// Provides [`ObjectMeta`] data to the [`FileMetadata`] table provider.
726726
#[async_trait]
727727
pub trait FileMetadataProvider: std::fmt::Debug + Send + Sync {
728728
/// List all files in the store for the given `url` prefix.

src/rewrite.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
use datafusion::{common::extensions_options, config::ConfigExtension};
1919

20-
/// Implements a query rewriting optimizer, also known as "view exploitation"
21-
/// in some academic sources.
2220
pub mod exploitation;
2321

2422
pub mod normal_form;

src/rewrite/exploitation.rs

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,34 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
/*!
19+
20+
This module implements a query rewriting optimizer, also known as "view exploitation"
21+
in some academic sources. The "view matching" subproblem is implemented in the [`SpjNormalForm`] code,
22+
which is used by the [`ViewMatcher`] logical optimizer to compare queries with materialized views.
23+
24+
The query rewriting process spans both the logical and physical planning phases and can be described as follows:
25+
26+
1. During logical optimization, the [`ViewMatcher`] rule scans all available materialized views
27+
and attempts to match them against each sub-expression of the query plan by comparing their SPJ normal forms.
28+
If a match is found, the sub-expression is replaced with a [`OneOf`] node, which contains the original sub-expression
29+
and one or more candidate rewrites using materialized views.
30+
2. During physical planning, the [`ViewExploitationPlanner`] identifies [`OneOf`] nodes and generates a [`OneOfExec`]
31+
physical plan node, which contains all candidate physical plans corresponding to the logical plans in the original [`OneOf`] node.
32+
3. DataFusion is allowed to run its usual physical optimization rules, which may add additional operators such as sorts or repartitions
33+
to the candidate plans. Filter, sort, and projection pushdown into the `OneOfExec` nodes are important as these can affect cost
34+
estimations in the next phase.
35+
4. Finally, a user-defined cost function is used to choose the "best" candidate within each `OneOfExec` node.
36+
The [`PruneCandidates`] physical optimizer rule is used to finalize the choice by replacing each `OneOfExec` node
37+
with its selected best candidate plan.
38+
39+
In the [reference paper](https://dsg.uwaterloo.ca/seminars/notes/larson-paper.pdf) for this implementation, the authors mention
40+
that the database's builtin cost optimizer takes care of selecting the best rewrite. However, DataFusion lacks cost-based optimization.
41+
While we do use a user-defined cost function to select the best candidate at each `OneOfExec`, this requires cooperation from the planner
42+
to push down relevant information such as projections, sorts, and filters into the `OneOfExec` nodes.
43+
44+
*/
45+
1846
use std::collections::HashMap;
1947
use std::{collections::HashSet, sync::Arc};
2048

@@ -59,7 +87,7 @@ impl ViewMatcher {
5987
for (resolved_table_ref, table) in
6088
super::util::list_tables(session_state.catalog_list().as_ref()).await?
6189
{
62-
let Some(mv) = cast_to_materialized(table.as_ref()) else {
90+
let Some(mv) = cast_to_materialized(table.as_ref())? else {
6391
continue;
6492
};
6593

src/rewrite/normal_form.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
/*!
1919
20-
This module contains code primarily used for view matching. We implement the view matching algorithm from [this paper](https://courses.cs.washington.edu/courses/cse591d/01sp/opt_views.pdf),
20+
This module contains code primarily used for view matching. We implement the view matching algorithm from [this paper](https://dsg.uwaterloo.ca/seminars/notes/larson-paper.pdf),
2121
which provides a method for determining when one Select-Project-Join query can be rewritten in terms of another Select-Project-Join query.
2222
2323
The implementation is contained in [`SpjNormalForm::rewrite_from`]. The method can be summarized as follows:

0 commit comments

Comments
 (0)