3131import java .util .Iterator ;
3232import java .util .List ;
3333import java .util .Optional ;
34- import java .util .stream .Collectors ;
3534
3635import static org .apache .flink .cdc .connectors .base .utils .SourceRecordUtils .rowToArray ;
3736
@@ -156,29 +155,26 @@ public static String buildSplitScanQuery(
156155 boolean isFirstSplit ,
157156 boolean isLastSplit ,
158157 List <String > uuidFields ) {
159- return buildSplitQuery (tableId , pkRowType , isFirstSplit , isLastSplit , uuidFields , - 1 , true );
158+ return buildSplitScanQuery (tableId , pkRowType , isFirstSplit , isLastSplit , null , uuidFields );
160159 }
161160
162- private static String buildSplitQuery (
161+ public static String buildSplitScanQuery (
163162 TableId tableId ,
164163 RowType pkRowType ,
165164 boolean isFirstSplit ,
166165 boolean isLastSplit ,
167- List <String > uuidFields ,
168- int limitSize ,
169- boolean isScanningData ) {
166+ List <String > columnNames ,
167+ List <String > uuidFields ) {
170168 final String condition ;
171169
172170 if (isFirstSplit && isLastSplit ) {
173171 condition = null ;
174172 } else if (isFirstSplit ) {
175173 final StringBuilder sql = new StringBuilder ();
176174 addPrimaryKeyColumnsToCondition (pkRowType , sql , " <= " , uuidFields );
177- if (isScanningData ) {
178- sql .append (" AND NOT (" );
179- addPrimaryKeyColumnsToCondition (pkRowType , sql , " = " , uuidFields );
180- sql .append (")" );
181- }
175+ sql .append (" AND NOT (" );
176+ addPrimaryKeyColumnsToCondition (pkRowType , sql , " = " , uuidFields );
177+ sql .append (")" );
182178 condition = sql .toString ();
183179 } else if (isLastSplit ) {
184180 final StringBuilder sql = new StringBuilder ();
@@ -187,30 +183,19 @@ private static String buildSplitQuery(
187183 } else {
188184 final StringBuilder sql = new StringBuilder ();
189185 addPrimaryKeyColumnsToCondition (pkRowType , sql , " >= " , uuidFields );
190- if (isScanningData ) {
191- sql .append (" AND NOT (" );
192- addPrimaryKeyColumnsToCondition (pkRowType , sql , " = " , uuidFields );
193- sql .append (")" );
194- }
186+ sql .append (" AND NOT (" );
187+ addPrimaryKeyColumnsToCondition (pkRowType , sql , " = " , uuidFields );
188+ sql .append (")" );
195189 sql .append (" AND " );
196190 addPrimaryKeyColumnsToCondition (pkRowType , sql , " <= " , uuidFields );
197191 condition = sql .toString ();
198192 }
199193
200- if (isScanningData ) {
201- return buildSelectWithRowLimits (
202- tableId , limitSize , "*" , Optional .ofNullable (condition ), Optional .empty ());
203- } else {
204- final String orderBy =
205- pkRowType .getFieldNames ().stream ().collect (Collectors .joining (", " ));
206- return buildSelectWithBoundaryRowLimits (
207- tableId ,
208- limitSize ,
209- getPrimaryKeyColumnsProjection (pkRowType ),
210- getMaxPrimaryKeyColumnsProjection (pkRowType ),
211- Optional .ofNullable (condition ),
212- orderBy );
213- }
194+ return buildSelectWithRowLimits (
195+ tableId ,
196+ columnNames == null ? "*" : String .join ("," , columnNames ),
197+ Optional .ofNullable (condition ),
198+ Optional .empty ());
214199 }
215200
216201 public static PreparedStatement readTableSplitDataStatement (
@@ -330,7 +315,6 @@ private static String getMaxPrimaryKeyColumnsProjection(RowType pkRowType) {
330315
331316 private static String buildSelectWithRowLimits (
332317 TableId tableId ,
333- int limit ,
334318 String projection ,
335319 Optional <String > condition ,
336320 Optional <String > orderBy ) {
@@ -343,9 +327,6 @@ private static String buildSelectWithRowLimits(
343327 if (orderBy .isPresent ()) {
344328 sql .append (" ORDER BY " ).append (orderBy .get ());
345329 }
346- if (limit > 0 ) {
347- sql .append (" LIMIT " ).append (limit );
348- }
349330 return sql .toString ();
350331 }
351332
0 commit comments