Skip to content

Commit 6632011

Browse files
authored
Add alternate analysis for MVs with no partition columns (#39)
* port alternate analysis for no partitions * fix
1 parent 600a4d8 commit 6632011

File tree

1 file changed

+90
-10
lines changed

1 file changed

+90
-10
lines changed

src/materialized/dependencies.rs

+90-10
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,11 @@ impl TableFunctionImpl for FileDependenciesUdtf {
112112
))?;
113113

114114
Ok(Arc::new(ViewTable::try_new(
115-
mv_dependencies_plan(mv, self.row_metadata_registry.clone(), &self.config_options)?,
115+
mv_dependencies_plan(
116+
mv,
117+
self.row_metadata_registry.as_ref(),
118+
&self.config_options,
119+
)?,
116120
None,
117121
)?))
118122
}
@@ -230,7 +234,7 @@ fn get_table_name(args: &[Expr]) -> Result<&String> {
230234
/// for this materialized view, together with the dependencies for each target.
231235
pub fn mv_dependencies_plan(
232236
materialized_view: &dyn Materialized,
233-
row_metadata_registry: Arc<RowMetadataRegistry>,
237+
row_metadata_registry: &RowMetadataRegistry,
234238
config_options: &ConfigOptions,
235239
) -> Result<LogicalPlan> {
236240
use datafusion_expr::logical_plan::*;
@@ -249,12 +253,19 @@ pub fn mv_dependencies_plan(
249253
// First expand all wildcards
250254
let plan = ExpandWildcardRule {}.analyze(plan, config_options)?;
251255

252-
// Prune non-partition columns from all table scans
253-
let pruned_plan = pushdown_projection_inexact(plan, &partition_col_indices)?;
256+
let pruned_plan_with_source_files = if partition_cols.is_empty() {
257+
get_source_files_all_partitions(
258+
materialized_view,
259+
&config_options.catalog,
260+
row_metadata_registry,
261+
)
262+
} else {
263+
// Prune non-partition columns from all table scans
264+
let pruned_plan = pushdown_projection_inexact(plan, &partition_col_indices)?;
254265

255-
// Now bubble up file metadata to the top of the plan
256-
let pruned_plan_with_source_files =
257-
push_up_file_metadata(pruned_plan, &config_options.catalog, row_metadata_registry)?;
266+
// Now bubble up file metadata to the top of the plan
267+
push_up_file_metadata(pruned_plan, &config_options.catalog, row_metadata_registry)
268+
}?;
258269

259270
// We now have data in the following form:
260271
// (partition_col0, partition_col1, ..., __meta)
@@ -722,13 +733,13 @@ fn project_dfschema(schema: &DFSchema, indices: &HashSet<usize>) -> Result<DFSch
722733
fn push_up_file_metadata(
723734
plan: LogicalPlan,
724735
catalog_options: &CatalogOptions,
725-
row_metadata_registry: Arc<RowMetadataRegistry>,
736+
row_metadata_registry: &RowMetadataRegistry,
726737
) -> Result<LogicalPlan> {
727738
let alias_generator = AliasGenerator::new();
728739
plan.transform_up(|plan| {
729740
match plan {
730741
LogicalPlan::TableScan(scan) => {
731-
scan_columns_from_row_metadata(scan, catalog_options, row_metadata_registry.clone())
742+
scan_columns_from_row_metadata(scan, catalog_options, row_metadata_registry)
732743
}
733744
plan => project_row_metadata_from_input(plan, &alias_generator),
734745
}
@@ -800,7 +811,7 @@ fn project_row_metadata_from_input(
800811
fn scan_columns_from_row_metadata(
801812
scan: TableScan,
802813
catalog_options: &CatalogOptions,
803-
row_metadata_registry: Arc<RowMetadataRegistry>,
814+
row_metadata_registry: &RowMetadataRegistry,
804815
) -> Result<LogicalPlan> {
805816
let table_ref = scan.table_name.clone().resolve(
806817
&catalog_options.default_catalog,
@@ -832,6 +843,75 @@ fn scan_columns_from_row_metadata(
832843
.build()
833844
}
834845

846+
/// Assemble sources irrespective of partitions
847+
/// This is more efficient when the materialized view has no partitions,
848+
/// but less intelligent -- it may return additional dependencies not present in the
849+
/// usual algorithm.
850+
//
851+
// TODO: see if we can optimize the normal logic for no partitions.
852+
// It seems that joins get transformed into cross joins, which can become extremely inefficient.
853+
// Hence we had to implement this alternate, simpler but less precise algorithm.
854+
// Notably, it may include more false positives.
855+
fn get_source_files_all_partitions(
856+
materialized_view: &dyn Materialized,
857+
catalog_options: &CatalogOptions,
858+
row_metadata_registry: &RowMetadataRegistry,
859+
) -> Result<LogicalPlan> {
860+
use datafusion_common::tree_node::TreeNodeRecursion;
861+
862+
let mut tables = std::collections::HashMap::<TableReference, _>::new();
863+
864+
materialized_view
865+
.query()
866+
.apply(|plan| {
867+
if let LogicalPlan::TableScan(scan) = plan {
868+
tables.insert(scan.table_name.clone(), Arc::clone(&scan.source));
869+
}
870+
871+
Ok(TreeNodeRecursion::Continue)
872+
})
873+
.unwrap();
874+
875+
tables
876+
.into_iter()
877+
.try_fold(
878+
None::<LogicalPlanBuilder>,
879+
|maybe_plan, (table_ref, source)| {
880+
let resolved_ref = table_ref.clone().resolve(
881+
&catalog_options.default_catalog,
882+
&catalog_options.default_schema,
883+
);
884+
885+
let row_metadata = row_metadata_registry.get_source(&resolved_ref)?;
886+
let row_metadata_scan = row_metadata
887+
.row_metadata(
888+
resolved_ref,
889+
&TableScan {
890+
table_name: table_ref.clone(),
891+
source,
892+
projection: Some(vec![]), // no columns relevant
893+
projected_schema: Arc::new(DFSchema::empty()),
894+
filters: vec![],
895+
fetch: None,
896+
},
897+
)?
898+
.build()?;
899+
900+
if let Some(previous) = maybe_plan {
901+
previous.union(row_metadata_scan)
902+
} else {
903+
Ok(LogicalPlanBuilder::from(row_metadata_scan))
904+
}
905+
.map(Some)
906+
},
907+
)?
908+
.ok_or_else(|| DataFusionError::Plan("materialized view has no source tables".into()))?
909+
// [`RowMetadataSource`] returns a Struct,
910+
// but the MV algorithm expects a list of structs at each node in the plan.
911+
.project(vec![make_array(vec![col(META_COLUMN)]).alias(META_COLUMN)])?
912+
.build()
913+
}
914+
835915
#[cfg(test)]
836916
mod test {
837917
use std::{any::Any, collections::HashSet, sync::Arc};

0 commit comments

Comments
 (0)