@@ -137,6 +137,218 @@ impl TableProvider for FederatedTableProviderAdaptor {
137137 "FederatedTableProviderAdaptor cannot insert_into" . to_string ( ) ,
138138 ) )
139139 }
140+
141+ async fn delete_from (
142+ & self ,
143+ state : & dyn Session ,
144+ filters : Vec < Expr > ,
145+ ) -> Result < Arc < dyn ExecutionPlan > > {
146+ if let Some ( table_provider) = & self . table_provider {
147+ return table_provider. delete_from ( state, filters) . await ;
148+ }
149+
150+ Err ( DataFusionError :: NotImplemented (
151+ "FederatedTableProviderAdaptor cannot delete_from" . to_string ( ) ,
152+ ) )
153+ }
154+
155+ async fn update (
156+ & self ,
157+ state : & dyn Session ,
158+ assignments : Vec < ( String , Expr ) > ,
159+ filters : Vec < Expr > ,
160+ ) -> Result < Arc < dyn ExecutionPlan > > {
161+ if let Some ( table_provider) = & self . table_provider {
162+ return table_provider. update ( state, assignments, filters) . await ;
163+ }
164+
165+ Err ( DataFusionError :: NotImplemented (
166+ "FederatedTableProviderAdaptor cannot update" . to_string ( ) ,
167+ ) )
168+ }
169+ }
170+
171+ #[ cfg( test) ]
172+ mod tests {
173+ use super :: * ;
174+ use async_trait:: async_trait;
175+ use datafusion:: arrow:: datatypes:: { DataType , Field , Schema , SchemaRef } ;
176+ use datafusion:: catalog:: Session ;
177+ use datafusion:: error:: DataFusionError ;
178+ use datafusion:: logical_expr:: { dml:: InsertOp , Expr , TableType } ;
179+ use datafusion:: physical_plan:: ExecutionPlan ;
180+ use std:: any:: Any ;
181+
182+ // Minimal FederatedTableSource implementation for tests that don't need DML.
183+ #[ derive( Debug ) ]
184+ struct NoOpSource {
185+ schema : SchemaRef ,
186+ }
187+
188+ impl NoOpSource {
189+ fn new ( ) -> Self {
190+ Self {
191+ schema : Arc :: new ( Schema :: new ( vec ! [ Field :: new( "id" , DataType :: Int32 , false ) ] ) ) ,
192+ }
193+ }
194+ }
195+
196+ impl datafusion:: logical_expr:: TableSource for NoOpSource {
197+ fn as_any ( & self ) -> & dyn Any {
198+ self
199+ }
200+ fn schema ( & self ) -> SchemaRef {
201+ Arc :: clone ( & self . schema )
202+ }
203+ }
204+
205+ impl crate :: FederatedTableSource for NoOpSource {
206+ fn federation_provider ( & self ) -> Arc < dyn crate :: FederationProvider > {
207+ Arc :: new ( crate :: analyzer:: NopFederationProvider { } )
208+ }
209+ }
210+
211+ // A TableProvider that records which DML methods were called.
212+ #[ derive( Debug ) ]
213+ struct RecordingProvider {
214+ schema : SchemaRef ,
215+ }
216+
217+ impl RecordingProvider {
218+ fn new ( ) -> Self {
219+ Self {
220+ schema : Arc :: new ( Schema :: new ( vec ! [ Field :: new( "id" , DataType :: Int32 , false ) ] ) ) ,
221+ }
222+ }
223+ }
224+
225+ #[ async_trait]
226+ impl TableProvider for RecordingProvider {
227+ fn as_any ( & self ) -> & dyn Any {
228+ self
229+ }
230+ fn schema ( & self ) -> SchemaRef {
231+ Arc :: clone ( & self . schema )
232+ }
233+ fn table_type ( & self ) -> TableType {
234+ TableType :: Base
235+ }
236+ async fn scan (
237+ & self ,
238+ _state : & dyn Session ,
239+ _projection : Option < & Vec < usize > > ,
240+ _filters : & [ Expr ] ,
241+ _limit : Option < usize > ,
242+ ) -> Result < Arc < dyn ExecutionPlan > > {
243+ Err ( DataFusionError :: NotImplemented ( "scan" . to_string ( ) ) )
244+ }
245+ async fn delete_from (
246+ & self ,
247+ _state : & dyn Session ,
248+ _filters : Vec < Expr > ,
249+ ) -> Result < Arc < dyn ExecutionPlan > > {
250+ Err ( DataFusionError :: NotImplemented (
251+ "recording_delete_from" . to_string ( ) ,
252+ ) )
253+ }
254+ async fn update (
255+ & self ,
256+ _state : & dyn Session ,
257+ _assignments : Vec < ( String , Expr ) > ,
258+ _filters : Vec < Expr > ,
259+ ) -> Result < Arc < dyn ExecutionPlan > > {
260+ Err ( DataFusionError :: NotImplemented (
261+ "recording_update" . to_string ( ) ,
262+ ) )
263+ }
264+ async fn insert_into (
265+ & self ,
266+ _state : & dyn Session ,
267+ _input : Arc < dyn ExecutionPlan > ,
268+ _insert_op : InsertOp ,
269+ ) -> Result < Arc < dyn ExecutionPlan > > {
270+ Err ( DataFusionError :: NotImplemented ( "insert_into" . to_string ( ) ) )
271+ }
272+ }
273+
274+ // Helper: build a session state suitable for calling TableProvider methods.
275+ fn make_session ( ) -> datafusion:: execution:: session_state:: SessionState {
276+ crate :: default_session_state ( )
277+ }
278+
279+ #[ tokio:: test]
280+ async fn delete_from_delegates_to_inner_provider ( ) {
281+ let source = Arc :: new ( NoOpSource :: new ( ) ) ;
282+ let provider = Arc :: new ( RecordingProvider :: new ( ) ) ;
283+ let adaptor = FederatedTableProviderAdaptor :: new_with_provider ( source, provider) ;
284+ let state = make_session ( ) ;
285+
286+ let err = adaptor
287+ . delete_from ( & state, vec ! [ ] )
288+ . await
289+ . unwrap_err ( )
290+ . to_string ( ) ;
291+
292+ assert ! (
293+ err. contains( "recording_delete_from" ) ,
294+ "expected inner provider error, got: {err}"
295+ ) ;
296+ }
297+
298+ #[ tokio:: test]
299+ async fn delete_from_errors_without_inner_provider ( ) {
300+ let source = Arc :: new ( NoOpSource :: new ( ) ) ;
301+ let adaptor = FederatedTableProviderAdaptor :: new ( source) ;
302+ let state = make_session ( ) ;
303+
304+ let err = adaptor
305+ . delete_from ( & state, vec ! [ ] )
306+ . await
307+ . unwrap_err ( )
308+ . to_string ( ) ;
309+
310+ assert ! (
311+ err. contains( "FederatedTableProviderAdaptor cannot delete_from" ) ,
312+ "unexpected error: {err}"
313+ ) ;
314+ }
315+
316+ #[ tokio:: test]
317+ async fn update_delegates_to_inner_provider ( ) {
318+ let source = Arc :: new ( NoOpSource :: new ( ) ) ;
319+ let provider = Arc :: new ( RecordingProvider :: new ( ) ) ;
320+ let adaptor = FederatedTableProviderAdaptor :: new_with_provider ( source, provider) ;
321+ let state = make_session ( ) ;
322+
323+ let err = adaptor
324+ . update ( & state, vec ! [ ] , vec ! [ ] )
325+ . await
326+ . unwrap_err ( )
327+ . to_string ( ) ;
328+
329+ assert ! (
330+ err. contains( "recording_update" ) ,
331+ "expected inner provider error, got: {err}"
332+ ) ;
333+ }
334+
335+ #[ tokio:: test]
336+ async fn update_errors_without_inner_provider ( ) {
337+ let source = Arc :: new ( NoOpSource :: new ( ) ) ;
338+ let adaptor = FederatedTableProviderAdaptor :: new ( source) ;
339+ let state = make_session ( ) ;
340+
341+ let err = adaptor
342+ . update ( & state, vec ! [ ] , vec ! [ ] )
343+ . await
344+ . unwrap_err ( )
345+ . to_string ( ) ;
346+
347+ assert ! (
348+ err. contains( "FederatedTableProviderAdaptor cannot update" ) ,
349+ "unexpected error: {err}"
350+ ) ;
351+ }
140352}
141353
142354// FederatedTableProvider extends DataFusion's TableProvider trait
0 commit comments