2121import org .apache .paimon .CoreOptions ;
2222import org .apache .paimon .manifest .PartitionEntry ;
2323import org .apache .paimon .predicate .Predicate ;
24+ import org .apache .paimon .predicate .SortValue ;
2425import org .apache .paimon .predicate .TopN ;
2526import org .apache .paimon .schema .SchemaManager ;
2627import org .apache .paimon .schema .TableSchema ;
2728import org .apache .paimon .table .BucketMode ;
2829import org .apache .paimon .table .source .snapshot .SnapshotReader ;
2930import org .apache .paimon .table .source .snapshot .StartingScanner ;
3031import org .apache .paimon .table .source .snapshot .StartingScanner .ScannedResult ;
32+ import org .apache .paimon .types .DataType ;
3133
3234import java .util .ArrayList ;
3335import java .util .List ;
36+ import java .util .Optional ;
37+
38+ import static org .apache .paimon .table .source .PushDownUtils .minmaxAvailable ;
3439
3540/** {@link TableScan} implementation for batch planning. */
3641public class DataTableBatchScan extends AbstractDataTableScan {
@@ -93,10 +98,15 @@ public TableScan.Plan plan() {
9398
9499 if (hasNext ) {
95100 hasNext = false ;
96- StartingScanner .Result result = startingScanner .scan (snapshotReader );
97- result = applyPushDownLimit (result );
98- result = applyPushDownTopN (result );
99- return DataFilePlan .fromResult (result );
101+ Optional <StartingScanner .Result > pushed = applyPushDownLimit ();
102+ if (pushed .isPresent ()) {
103+ return DataFilePlan .fromResult (pushed .get ());
104+ }
105+ pushed = applyPushDownTopN ();
106+ if (pushed .isPresent ()) {
107+ return DataFilePlan .fromResult (pushed .get ());
108+ }
109+ return DataFilePlan .fromResult (startingScanner .scan (snapshotReader ));
100110 } else {
101111 throw new EndOfScanException ();
102112 }
@@ -110,51 +120,77 @@ public List<PartitionEntry> listPartitionEntries() {
110120 return startingScanner .scanPartitions (snapshotReader );
111121 }
112122
113- private StartingScanner .Result applyPushDownLimit (StartingScanner .Result result ) {
114- if (pushDownLimit != null && result instanceof ScannedResult ) {
115- long scannedRowCount = 0 ;
116- SnapshotReader .Plan plan = ((ScannedResult ) result ).plan ();
117- List <DataSplit > splits = plan .dataSplits ();
118- if (splits .isEmpty ()) {
119- return result ;
120- }
123+ private Optional <StartingScanner .Result > applyPushDownLimit () {
124+ if (pushDownLimit == null ) {
125+ return Optional .empty ();
126+ }
127+
128+ StartingScanner .Result result = startingScanner .scan (snapshotReader );
129+ if (!(result instanceof ScannedResult )) {
130+ return Optional .of (result );
131+ }
132+
133+ long scannedRowCount = 0 ;
134+ SnapshotReader .Plan plan = ((ScannedResult ) result ).plan ();
135+ List <DataSplit > splits = plan .dataSplits ();
136+ if (splits .isEmpty ()) {
137+ return Optional .of (result );
138+ }
121139
122- List <Split > limitedSplits = new ArrayList <>();
123- for (DataSplit dataSplit : splits ) {
124- if (dataSplit .rawConvertible ()) {
125- long partialMergedRowCount = dataSplit .partialMergedRowCount ();
126- limitedSplits .add (dataSplit );
127- scannedRowCount += partialMergedRowCount ;
128- if (scannedRowCount >= pushDownLimit ) {
129- SnapshotReader .Plan newPlan =
130- new PlanImpl (plan .watermark (), plan .snapshotId (), limitedSplits );
131- return new ScannedResult (newPlan );
132- }
140+ List <Split > limitedSplits = new ArrayList <>();
141+ for (DataSplit dataSplit : splits ) {
142+ if (dataSplit .rawConvertible ()) {
143+ long partialMergedRowCount = dataSplit .partialMergedRowCount ();
144+ limitedSplits .add (dataSplit );
145+ scannedRowCount += partialMergedRowCount ;
146+ if (scannedRowCount >= pushDownLimit ) {
147+ SnapshotReader .Plan newPlan =
148+ new PlanImpl (plan .watermark (), plan .snapshotId (), limitedSplits );
149+ return Optional .of (new ScannedResult (newPlan ));
133150 }
134151 }
135152 }
136- return result ;
153+ return Optional . of ( result ) ;
137154 }
138155
139- private StartingScanner .Result applyPushDownTopN (StartingScanner . Result result ) {
156+ private Optional < StartingScanner .Result > applyPushDownTopN () {
140157 if (topN == null
141158 || pushDownLimit != null
142- || !(result instanceof ScannedResult )
143159 || !schema .primaryKeys ().isEmpty ()
144160 || options ().deletionVectorsEnabled ()) {
145- return result ;
161+ return Optional .empty ();
162+ }
163+
164+ List <SortValue > orders = topN .orders ();
165+ if (orders .size () != 1 ) {
166+ return Optional .empty ();
167+ }
168+
169+ if (topN .limit () > 100 ) {
170+ return Optional .empty ();
171+ }
172+
173+ SortValue order = orders .get (0 );
174+ DataType type = order .field ().type ();
175+ if (!minmaxAvailable (type )) {
176+ return Optional .empty ();
177+ }
178+
179+ StartingScanner .Result result = startingScanner .scan (snapshotReader .keepStats ());
180+ if (!(result instanceof ScannedResult )) {
181+ return Optional .of (result );
146182 }
147183
148184 SnapshotReader .Plan plan = ((ScannedResult ) result ).plan ();
149185 List <DataSplit > splits = plan .dataSplits ();
150186 if (splits .isEmpty ()) {
151- return result ;
187+ return Optional . of ( result ) ;
152188 }
153189
154190 TopNDataSplitEvaluator evaluator = new TopNDataSplitEvaluator (schema , schemaManager );
155- List <Split > topNSplits = new ArrayList <>(evaluator .evaluate (topN , splits ));
191+ List <Split > topNSplits = new ArrayList <>(evaluator .evaluate (order , topN . limit () , splits ));
156192 SnapshotReader .Plan newPlan = new PlanImpl (plan .watermark (), plan .snapshotId (), topNSplits );
157- return new ScannedResult (newPlan );
193+ return Optional . of ( new ScannedResult (newPlan ) );
158194 }
159195
160196 @ Override
0 commit comments