Skip to content

Commit 80119fd

Browse files
ion-elgrecoLiam Brannigan
authored and
Liam Brannigan
committed
feat: cdf tableprovider
Signed-off-by: Ion Koutsouris <[email protected]>
1 parent 767fdc2 commit 80119fd

File tree

13 files changed

+323
-240
lines changed

13 files changed

+323
-240
lines changed

crates/core/src/delta_datafusion/cdf/mod.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,11 @@ use std::sync::LazyLock;
44

55
use arrow_schema::{DataType, Field, TimeUnit};
66

7-
pub(crate) use self::scan::*;
87
pub(crate) use self::scan_utils::*;
98
use crate::kernel::{Add, AddCDCFile, Remove};
109
use crate::DeltaResult;
1110

12-
mod scan;
11+
pub mod scan;
1312
mod scan_utils;
1413

1514
/// Change type column name
+93-37
Original file line numberDiff line numberDiff line change
@@ -1,63 +1,119 @@
11
use std::any::Any;
2-
use std::fmt::Formatter;
32
use std::sync::Arc;
43

5-
use arrow_schema::SchemaRef;
6-
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
7-
use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
4+
use arrow_schema::{Schema, SchemaRef};
5+
use async_trait::async_trait;
6+
use datafusion::catalog::Session;
7+
use datafusion::catalog::TableProvider;
8+
use datafusion::execution::SessionState;
9+
use datafusion_common::{exec_datafusion_err, Column, DFSchema, Result as DataFusionResult};
10+
use datafusion_expr::utils::conjunction;
11+
use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
12+
use datafusion_physical_expr::PhysicalExpr;
13+
use datafusion_physical_plan::filter::FilterExec;
14+
use datafusion_physical_plan::limit::GlobalLimitExec;
15+
use datafusion_physical_plan::projection::ProjectionExec;
16+
use datafusion_physical_plan::ExecutionPlan;
817

9-
/// Physical execution of a scan
10-
#[derive(Debug, Clone)]
11-
pub struct DeltaCdfScan {
12-
plan: Arc<dyn ExecutionPlan>,
13-
}
18+
use crate::DeltaTableError;
19+
use crate::{
20+
delta_datafusion::DataFusionMixins, operations::load_cdf::CdfLoadBuilder, DeltaResult,
21+
};
1422

15-
impl DeltaCdfScan {
16-
/// Creates a new scan
17-
pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
18-
Self { plan }
19-
}
23+
use super::ADD_PARTITION_SCHEMA;
24+
25+
fn session_state_from_session(session: &dyn Session) -> DataFusionResult<&SessionState> {
26+
session
27+
.as_any()
28+
.downcast_ref::<SessionState>()
29+
.ok_or_else(|| exec_datafusion_err!("Failed to downcast Session to SessionState"))
2030
}
2131

22-
impl DisplayAs for DeltaCdfScan {
23-
fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
24-
write!(f, "{:?}", self)
25-
}
32+
#[derive(Debug)]
33+
pub struct DeltaCdfTableProvider {
34+
cdf_builder: CdfLoadBuilder,
35+
schema: SchemaRef,
2636
}
2737

28-
impl ExecutionPlan for DeltaCdfScan {
29-
fn name(&self) -> &str {
30-
Self::static_name()
38+
impl DeltaCdfTableProvider {
39+
/// Build a DeltaCDFTableProvider
40+
pub fn try_new(cdf_builder: CdfLoadBuilder) -> DeltaResult<Self> {
41+
let mut fields = cdf_builder.snapshot.input_schema()?.fields().to_vec();
42+
for f in ADD_PARTITION_SCHEMA.clone() {
43+
fields.push(f.into());
44+
}
45+
Ok(DeltaCdfTableProvider {
46+
cdf_builder,
47+
schema: Schema::new(fields).into(),
48+
})
3149
}
50+
}
3251

52+
#[async_trait]
53+
impl TableProvider for DeltaCdfTableProvider {
3354
fn as_any(&self) -> &dyn Any {
3455
self
3556
}
3657

3758
fn schema(&self) -> SchemaRef {
38-
self.plan.schema().clone()
59+
self.schema.clone()
3960
}
4061

41-
fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
42-
self.plan.properties()
62+
fn table_type(&self) -> TableType {
63+
TableType::Base
4364
}
4465

45-
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
46-
vec![]
47-
}
66+
async fn scan(
67+
&self,
68+
session: &dyn Session,
69+
projection: Option<&Vec<usize>>,
70+
filters: &[Expr],
71+
limit: Option<usize>,
72+
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
73+
let session_state = session_state_from_session(session)?;
74+
let mut plan = self.cdf_builder.build(session_state).await?;
75+
76+
let df_schema: DFSchema = plan.schema().try_into()?;
77+
78+
if let Some(filter_expr) = conjunction(filters.iter().cloned()) {
79+
let physical_expr = session.create_physical_expr(filter_expr, &df_schema)?;
80+
plan = Arc::new(FilterExec::try_new(physical_expr, plan)?);
81+
}
82+
83+
if let Some(projection) = projection {
84+
let current_projection = (0..plan.schema().fields().len()).collect::<Vec<usize>>();
85+
if projection != &current_projection {
86+
let fields: DeltaResult<Vec<(Arc<dyn PhysicalExpr>, String)>> = projection
87+
.iter()
88+
.map(|i| {
89+
let (table_ref, field) = df_schema.qualified_field(*i);
90+
session
91+
.create_physical_expr(
92+
Expr::Column(Column::from((table_ref, field))),
93+
&df_schema,
94+
)
95+
.map(|expr| (expr, field.name().clone()))
96+
.map_err(DeltaTableError::from)
97+
})
98+
.collect();
99+
let fields = fields?;
100+
plan = Arc::new(ProjectionExec::try_new(fields, plan)?);
101+
}
102+
}
48103

49-
fn with_new_children(
50-
self: Arc<Self>,
51-
_children: Vec<Arc<dyn ExecutionPlan>>,
52-
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
53-
self.plan.clone().with_new_children(_children)
104+
if let Some(limit) = limit {
105+
plan = Arc::new(GlobalLimitExec::new(plan, 0, Some(limit)))
106+
};
107+
Ok(plan)
54108
}
55109

56-
fn execute(
110+
fn supports_filters_pushdown(
57111
&self,
58-
partition: usize,
59-
context: Arc<TaskContext>,
60-
) -> datafusion_common::Result<SendableRecordBatchStream> {
61-
self.plan.execute(partition, context)
112+
filter: &[&Expr],
113+
) -> DataFusionResult<Vec<TableProviderFilterPushDown>> {
114+
Ok(filter
115+
.iter()
116+
.map(|_| TableProviderFilterPushDown::Exact) // maybe exact
117+
.collect())
62118
}
63119
}

crates/core/src/delta_datafusion/expr.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ impl ContextProvider for DeltaContextProvider<'_> {
255255
}
256256

257257
/// Parse a string predicate into an `Expr`
258-
pub(crate) fn parse_predicate_expression(
258+
pub fn parse_predicate_expression(
259259
schema: &DFSchema,
260260
expr: impl AsRef<str>,
261261
df_state: &SessionState,

crates/core/src/delta_datafusion/mod.rs

+2
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,8 @@ pub mod logical;
9898
pub mod physical;
9999
pub mod planner;
100100

101+
pub use cdf::scan::DeltaCdfTableProvider;
102+
101103
mod schema_adapter;
102104

103105
impl From<DeltaTableError> for DataFusionError {

crates/core/src/operations/delete.rs

+12-15
Original file line numberDiff line numberDiff line change
@@ -456,7 +456,6 @@ impl std::future::IntoFuture for DeleteBuilder {
456456

457457
#[cfg(test)]
458458
mod tests {
459-
use crate::delta_datafusion::cdf::DeltaCdfScan;
460459
use crate::kernel::DataType as DeltaDataType;
461460
use crate::operations::collect_sendable_stream;
462461
use crate::operations::DeltaOps;
@@ -975,9 +974,8 @@ mod tests {
975974
let ctx = SessionContext::new();
976975
let table = DeltaOps(table)
977976
.load_cdf()
978-
.with_session_ctx(ctx.clone())
979977
.with_starting_version(0)
980-
.build()
978+
.build(&ctx.state())
981979
.await
982980
.expect("Failed to load CDF");
983981

@@ -1060,9 +1058,8 @@ mod tests {
10601058
let ctx = SessionContext::new();
10611059
let table = DeltaOps(table)
10621060
.load_cdf()
1063-
.with_session_ctx(ctx.clone())
10641061
.with_starting_version(0)
1065-
.build()
1062+
.build(&ctx.state())
10661063
.await
10671064
.expect("Failed to load CDF");
10681065

@@ -1075,23 +1072,23 @@ mod tests {
10751072
.expect("Failed to collect batches");
10761073

10771074
// The batches will contain a current _commit_timestamp which shouldn't be check_append_only
1078-
let _: Vec<_> = batches.iter_mut().map(|b| b.remove_column(3)).collect();
1075+
let _: Vec<_> = batches.iter_mut().map(|b| b.remove_column(4)).collect();
10791076

10801077
assert_batches_sorted_eq! {[
1081-
"+-------+--------------+-----------------+------+",
1082-
"| value | _change_type | _commit_version | year |",
1083-
"+-------+--------------+-----------------+------+",
1084-
"| 1 | insert | 1 | 2020 |",
1085-
"| 2 | delete | 2 | 2020 |",
1086-
"| 2 | insert | 1 | 2020 |",
1087-
"| 3 | insert | 1 | 2024 |",
1088-
"+-------+--------------+-----------------+------+",
1078+
"+-------+------+--------------+-----------------+",
1079+
"| value | year | _change_type | _commit_version |",
1080+
"+-------+------+--------------+-----------------+",
1081+
"| 1 | 2020 | insert | 1 |",
1082+
"| 2 | 2020 | delete | 2 |",
1083+
"| 2 | 2020 | insert | 1 |",
1084+
"| 3 | 2024 | insert | 1 |",
1085+
"+-------+------+--------------+-----------------+",
10891086
], &batches }
10901087
}
10911088

10921089
async fn collect_batches(
10931090
num_partitions: usize,
1094-
stream: DeltaCdfScan,
1091+
stream: Arc<dyn ExecutionPlan>,
10951092
ctx: SessionContext,
10961093
) -> Result<Vec<RecordBatch>, Box<dyn std::error::Error>> {
10971094
let mut batches = vec![];

0 commit comments

Comments
 (0)