File tree Expand file tree Collapse file tree
datafusion/datasource/src Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -808,16 +808,21 @@ impl MemorySourceConfig {
808808 // order of the partitions & newly created partitions.
809809 let mut cannot_split_further = Vec :: with_capacity ( target_partitions) ;
810810 for _ in 0 ..cnt_to_repartition {
811+ // triggers loop for the cnt_to_repartition. So if need another 4 partitions, it attempts to split 4 times.
811812 loop {
813+ // Take the largest item off the heap, and attempt to split.
812814 let Some ( to_split) = max_heap. pop ( ) else {
815+ // Nothing left to attempt repartition. Break inner loop.
813816 break ;
814817 } ;
815818
819+ // Split the partition. The new partitions will be ordered with idx and idx+1.
816820 let mut new_partitions = to_split. split ( ) ;
817821 if new_partitions. len ( ) > 1 {
818822 for new_partition in new_partitions {
819823 max_heap. push ( new_partition) ;
820824 }
825+ // Successful repartition. Break inner loop, and return to outer `cnt_to_repartition` loop.
821826 break ;
822827 } else {
823828 cannot_split_further. push ( new_partitions. remove ( 0 ) ) ;
@@ -828,6 +833,7 @@ impl MemorySourceConfig {
828833 partitions. extend ( cannot_split_further) ;
829834
830835 // Finally, sort all partitions by the output ordering.
836+ // This was the original ordering of the batches within the partition. We are maintaining this ordering.
831837 partitions. sort_by_key ( |p| p. idx ) ;
832838 let partitions = partitions. into_iter ( ) . map ( |rep| rep. batches ) . collect_vec ( ) ;
833839
You can’t perform that action at this time.
0 commit comments