@@ -240,7 +240,7 @@ impl TableProvider for TableDeletionsTable {
240240 . map ( |& i| self . output_schema . field ( i) . clone ( ) )
241241 . collect ( ) ;
242242 Arc :: new ( Schema :: new ( fields) )
243- }
243+ } ,
244244 None => self . output_schema . clone ( ) ,
245245 } ;
246246 return Ok ( Arc :: new ( EmptyExec :: new ( output_schema) ) ) ;
@@ -320,15 +320,17 @@ impl DeletedRowsExec {
320320impl DisplayAs for DeletedRowsExec {
321321 fn fmt_as ( & self , t : DisplayFormatType , f : & mut std:: fmt:: Formatter ) -> std:: fmt:: Result {
322322 match t {
323- DisplayFormatType :: Default | DisplayFormatType :: Verbose | DisplayFormatType :: TreeRender => {
323+ DisplayFormatType :: Default
324+ | DisplayFormatType :: Verbose
325+ | DisplayFormatType :: TreeRender => {
324326 write ! (
325327 f,
326328 "DeletedRowsExec: snapshot_id={}, full_delete={}, has_previous={}" ,
327329 self . snapshot_id,
328330 self . current_delete_scan. is_none( ) ,
329331 self . previous_delete_scan. is_some( )
330332 )
331- }
333+ } ,
332334 }
333335 }
334336}
@@ -365,28 +367,31 @@ impl ExecutionPlan for DeletedRowsExec {
365367 let mut idx = 0 ;
366368
367369 let current = if self . current_delete_scan . is_some ( ) {
368- let c = children. get ( idx) . cloned ( ) . ok_or_else ( || {
369- DataFusionError :: Internal ( "Missing current delete child" . into ( ) )
370- } ) ?;
370+ let c = children
371+ . get ( idx)
372+ . cloned ( )
373+ . ok_or_else ( || DataFusionError :: Internal ( "Missing current delete child" . into ( ) ) ) ?;
371374 idx += 1 ;
372375 Some ( c)
373376 } else {
374377 None
375378 } ;
376379
377380 let previous = if self . previous_delete_scan . is_some ( ) {
378- let p = children. get ( idx) . cloned ( ) . ok_or_else ( || {
379- DataFusionError :: Internal ( "Missing previous delete child" . into ( ) )
380- } ) ?;
381+ let p = children
382+ . get ( idx)
383+ . cloned ( )
384+ . ok_or_else ( || DataFusionError :: Internal ( "Missing previous delete child" . into ( ) ) ) ?;
381385 idx += 1 ;
382386 Some ( p)
383387 } else {
384388 None
385389 } ;
386390
387- let data = children. get ( idx) . cloned ( ) . ok_or_else ( || {
388- DataFusionError :: Internal ( "Missing data file child" . into ( ) )
389- } ) ?;
391+ let data = children
392+ . get ( idx)
393+ . cloned ( )
394+ . ok_or_else ( || DataFusionError :: Internal ( "Missing data file child" . into ( ) ) ) ?;
390395
391396 Ok ( Arc :: new ( DeletedRowsExec :: new (
392397 current,
@@ -569,7 +574,10 @@ impl DeletedRowsStream {
569574
570575 // Append CDC columns
571576 let num_output_rows = keep_indices. len ( ) ;
572- columns. push ( Arc :: new ( Int64Array :: from ( vec ! [ self . snapshot_id; num_output_rows] ) ) ) ;
577+ columns. push ( Arc :: new ( Int64Array :: from ( vec ! [
578+ self . snapshot_id;
579+ num_output_rows
580+ ] ) ) ) ;
573581 columns. push ( Arc :: new ( StringArray :: from ( vec ! [ "delete" ; num_output_rows] ) ) ) ;
574582
575583 RecordBatch :: try_new ( self . output_schema . clone ( ) , columns)
@@ -590,7 +598,7 @@ impl Stream for DeletedRowsStream {
590598 Poll :: Ready ( Some ( Ok ( batch) ) ) => {
591599 let positions = Self :: extract_positions ( & batch) ;
592600 self . current_positions . extend ( positions) ;
593- }
601+ } ,
594602 Poll :: Ready ( Some ( Err ( e) ) ) => return Poll :: Ready ( Some ( Err ( e) ) ) ,
595603 Poll :: Ready ( None ) => {
596604 if self . previous_delete_stream . is_some ( ) {
@@ -599,44 +607,44 @@ impl Stream for DeletedRowsStream {
599607 self . compute_deleted_positions ( ) ;
600608 self . state = StreamState :: ReadingData ;
601609 }
602- }
610+ } ,
603611 Poll :: Pending => return Poll :: Pending ,
604612 }
605- }
613+ } ,
606614 StreamState :: ReadingPreviousDelete => {
607615 let prev = self . previous_delete_stream . as_mut ( ) . unwrap ( ) ;
608616 match Pin :: new ( prev) . poll_next ( cx) {
609617 Poll :: Ready ( Some ( Ok ( batch) ) ) => {
610618 let positions = Self :: extract_positions ( & batch) ;
611619 self . previous_positions . extend ( positions) ;
612- }
620+ } ,
613621 Poll :: Ready ( Some ( Err ( e) ) ) => return Poll :: Ready ( Some ( Err ( e) ) ) ,
614622 Poll :: Ready ( None ) => {
615623 self . compute_deleted_positions ( ) ;
616624 self . state = StreamState :: ReadingData ;
617- }
625+ } ,
618626 Poll :: Pending => return Poll :: Pending ,
619627 }
620- }
628+ } ,
621629 StreamState :: ReadingData => {
622630 match Pin :: new ( & mut self . data_stream ) . poll_next ( cx) {
623631 Poll :: Ready ( Some ( Ok ( batch) ) ) => {
624632 match self . filter_batch ( & batch) ? {
625633 Some ( filtered) => return Poll :: Ready ( Some ( Ok ( filtered) ) ) ,
626634 None => continue , // No deleted rows in this batch
627635 }
628- }
636+ } ,
629637 Poll :: Ready ( Some ( Err ( e) ) ) => return Poll :: Ready ( Some ( Err ( e) ) ) ,
630638 Poll :: Ready ( None ) => {
631639 self . state = StreamState :: Done ;
632640 return Poll :: Ready ( None ) ;
633- }
641+ } ,
634642 Poll :: Pending => return Poll :: Pending ,
635643 }
636- }
644+ } ,
637645 StreamState :: Done => {
638646 return Poll :: Ready ( None ) ;
639- }
647+ } ,
640648 }
641649 }
642650 }
0 commit comments