Skip to content

Commit 0691426

Browse files
committed
Properly handle DML
1 parent 42245bd commit 0691426

2 files changed

Lines changed: 281 additions & 0 deletions

File tree

datafusion-federation/src/analyzer/mod.rs

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,15 @@ impl AnalyzerRule for FederationAnalyzerRule {
3535
// TableScans from the same FederationProvider.
3636
// There 'largest sub-trees' are passed to their respective FederationProvider.optimizer.
3737
fn analyze(&self, plan: LogicalPlan, config: &ConfigOptions) -> Result<LogicalPlan> {
38+
// DML plans must not be federated: the SQL unparser's dml_to_sql is
39+
// unimplemented, and wrapping DML in a FederatedPlanNode hides the Dml
40+
// node from write-permission validators (security bypass). Leave DML
41+
// plans unwrapped so validators see them and DataFusion's physical
42+
// planner can dispatch delete_from/update to the table provider.
43+
if matches!(plan, LogicalPlan::Dml(_)) {
44+
return Ok(plan);
45+
}
46+
3847
if !contains_federated_table(&plan)? {
3948
return Ok(plan);
4049
}
@@ -511,6 +520,66 @@ fn get_leaf_provider(
511520
}
512521
}
513522

523+
#[cfg(test)]
524+
mod tests {
525+
use super::*;
526+
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
527+
use datafusion::config::ConfigOptions;
528+
use datafusion::logical_expr::{DmlStatement, EmptyRelation, WriteOp};
529+
use datafusion::optimizer::analyzer::AnalyzerRule;
530+
use datafusion::sql::TableReference;
531+
use std::sync::Arc;
532+
533+
// Minimal TableSource needed to construct a DmlStatement.
534+
#[derive(Debug)]
535+
struct MockTableSource {
536+
schema: SchemaRef,
537+
}
538+
539+
impl MockTableSource {
540+
fn new() -> Self {
541+
Self {
542+
schema: Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])),
543+
}
544+
}
545+
}
546+
547+
impl datafusion::logical_expr::TableSource for MockTableSource {
548+
fn as_any(&self) -> &dyn std::any::Any {
549+
self
550+
}
551+
fn schema(&self) -> SchemaRef {
552+
Arc::clone(&self.schema)
553+
}
554+
}
555+
556+
#[test]
557+
fn dml_plan_is_returned_unchanged() {
558+
let rule = FederationAnalyzerRule::new();
559+
let config = ConfigOptions::default();
560+
561+
let empty = Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
562+
produce_one_row: false,
563+
schema: Arc::new(datafusion::common::DFSchema::empty()),
564+
}));
565+
let source = Arc::new(MockTableSource::new());
566+
let dml = LogicalPlan::Dml(DmlStatement::new(
567+
TableReference::bare("t"),
568+
source,
569+
WriteOp::Delete,
570+
empty,
571+
));
572+
573+
let result = rule.analyze(dml.clone(), &config).unwrap();
574+
575+
// The plan must come back as-is — Dml, not wrapped in a FederatedPlanNode.
576+
assert!(
577+
matches!(result, LogicalPlan::Dml(_)),
578+
"expected Dml plan, got {result:?}"
579+
);
580+
}
581+
}
582+
514583
#[allow(clippy::missing_errors_doc)]
515584
pub fn get_table_source(
516585
source: &Arc<dyn TableSource>,

datafusion-federation/src/table_provider.rs

Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,218 @@ impl TableProvider for FederatedTableProviderAdaptor {
137137
"FederatedTableProviderAdaptor cannot insert_into".to_string(),
138138
))
139139
}
140+
141+
async fn delete_from(
142+
&self,
143+
state: &dyn Session,
144+
filters: Vec<Expr>,
145+
) -> Result<Arc<dyn ExecutionPlan>> {
146+
if let Some(table_provider) = &self.table_provider {
147+
return table_provider.delete_from(state, filters).await;
148+
}
149+
150+
Err(DataFusionError::NotImplemented(
151+
"FederatedTableProviderAdaptor cannot delete_from".to_string(),
152+
))
153+
}
154+
155+
async fn update(
156+
&self,
157+
state: &dyn Session,
158+
assignments: Vec<(String, Expr)>,
159+
filters: Vec<Expr>,
160+
) -> Result<Arc<dyn ExecutionPlan>> {
161+
if let Some(table_provider) = &self.table_provider {
162+
return table_provider.update(state, assignments, filters).await;
163+
}
164+
165+
Err(DataFusionError::NotImplemented(
166+
"FederatedTableProviderAdaptor cannot update".to_string(),
167+
))
168+
}
169+
}
170+
171+
#[cfg(test)]
172+
mod tests {
173+
use super::*;
174+
use async_trait::async_trait;
175+
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
176+
use datafusion::catalog::Session;
177+
use datafusion::error::DataFusionError;
178+
use datafusion::logical_expr::{dml::InsertOp, Expr, TableType};
179+
use datafusion::physical_plan::ExecutionPlan;
180+
use std::any::Any;
181+
182+
// Minimal FederatedTableSource implementation for tests that don't need DML.
183+
#[derive(Debug)]
184+
struct NoOpSource {
185+
schema: SchemaRef,
186+
}
187+
188+
impl NoOpSource {
189+
fn new() -> Self {
190+
Self {
191+
schema: Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])),
192+
}
193+
}
194+
}
195+
196+
impl datafusion::logical_expr::TableSource for NoOpSource {
197+
fn as_any(&self) -> &dyn Any {
198+
self
199+
}
200+
fn schema(&self) -> SchemaRef {
201+
Arc::clone(&self.schema)
202+
}
203+
}
204+
205+
impl crate::FederatedTableSource for NoOpSource {
206+
fn federation_provider(&self) -> Arc<dyn crate::FederationProvider> {
207+
Arc::new(crate::analyzer::NopFederationProvider {})
208+
}
209+
}
210+
211+
// A TableProvider that records which DML methods were called.
212+
#[derive(Debug)]
213+
struct RecordingProvider {
214+
schema: SchemaRef,
215+
}
216+
217+
impl RecordingProvider {
218+
fn new() -> Self {
219+
Self {
220+
schema: Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])),
221+
}
222+
}
223+
}
224+
225+
#[async_trait]
226+
impl TableProvider for RecordingProvider {
227+
fn as_any(&self) -> &dyn Any {
228+
self
229+
}
230+
fn schema(&self) -> SchemaRef {
231+
Arc::clone(&self.schema)
232+
}
233+
fn table_type(&self) -> TableType {
234+
TableType::Base
235+
}
236+
async fn scan(
237+
&self,
238+
_state: &dyn Session,
239+
_projection: Option<&Vec<usize>>,
240+
_filters: &[Expr],
241+
_limit: Option<usize>,
242+
) -> Result<Arc<dyn ExecutionPlan>> {
243+
Err(DataFusionError::NotImplemented("scan".to_string()))
244+
}
245+
async fn delete_from(
246+
&self,
247+
_state: &dyn Session,
248+
_filters: Vec<Expr>,
249+
) -> Result<Arc<dyn ExecutionPlan>> {
250+
Err(DataFusionError::NotImplemented(
251+
"recording_delete_from".to_string(),
252+
))
253+
}
254+
async fn update(
255+
&self,
256+
_state: &dyn Session,
257+
_assignments: Vec<(String, Expr)>,
258+
_filters: Vec<Expr>,
259+
) -> Result<Arc<dyn ExecutionPlan>> {
260+
Err(DataFusionError::NotImplemented(
261+
"recording_update".to_string(),
262+
))
263+
}
264+
async fn insert_into(
265+
&self,
266+
_state: &dyn Session,
267+
_input: Arc<dyn ExecutionPlan>,
268+
_insert_op: InsertOp,
269+
) -> Result<Arc<dyn ExecutionPlan>> {
270+
Err(DataFusionError::NotImplemented("insert_into".to_string()))
271+
}
272+
}
273+
274+
// Helper: build a session state suitable for calling TableProvider methods.
275+
fn make_session() -> datafusion::execution::session_state::SessionState {
276+
crate::default_session_state()
277+
}
278+
279+
#[tokio::test]
280+
async fn delete_from_delegates_to_inner_provider() {
281+
let source = Arc::new(NoOpSource::new());
282+
let provider = Arc::new(RecordingProvider::new());
283+
let adaptor = FederatedTableProviderAdaptor::new_with_provider(source, provider);
284+
let state = make_session();
285+
286+
let err = adaptor
287+
.delete_from(&state, vec![])
288+
.await
289+
.unwrap_err()
290+
.to_string();
291+
292+
assert!(
293+
err.contains("recording_delete_from"),
294+
"expected inner provider error, got: {err}"
295+
);
296+
}
297+
298+
#[tokio::test]
299+
async fn delete_from_errors_without_inner_provider() {
300+
let source = Arc::new(NoOpSource::new());
301+
let adaptor = FederatedTableProviderAdaptor::new(source);
302+
let state = make_session();
303+
304+
let err = adaptor
305+
.delete_from(&state, vec![])
306+
.await
307+
.unwrap_err()
308+
.to_string();
309+
310+
assert!(
311+
err.contains("FederatedTableProviderAdaptor cannot delete_from"),
312+
"unexpected error: {err}"
313+
);
314+
}
315+
316+
#[tokio::test]
317+
async fn update_delegates_to_inner_provider() {
318+
let source = Arc::new(NoOpSource::new());
319+
let provider = Arc::new(RecordingProvider::new());
320+
let adaptor = FederatedTableProviderAdaptor::new_with_provider(source, provider);
321+
let state = make_session();
322+
323+
let err = adaptor
324+
.update(&state, vec![], vec![])
325+
.await
326+
.unwrap_err()
327+
.to_string();
328+
329+
assert!(
330+
err.contains("recording_update"),
331+
"expected inner provider error, got: {err}"
332+
);
333+
}
334+
335+
#[tokio::test]
336+
async fn update_errors_without_inner_provider() {
337+
let source = Arc::new(NoOpSource::new());
338+
let adaptor = FederatedTableProviderAdaptor::new(source);
339+
let state = make_session();
340+
341+
let err = adaptor
342+
.update(&state, vec![], vec![])
343+
.await
344+
.unwrap_err()
345+
.to_string();
346+
347+
assert!(
348+
err.contains("FederatedTableProviderAdaptor cannot update"),
349+
"unexpected error: {err}"
350+
);
351+
}
140352
}
141353

142354
// FederatedTableProvider extends DataFusion's TableProvider trait

0 commit comments

Comments
 (0)