diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java index f31db75bd83..1915aca809c 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java @@ -287,12 +287,17 @@ private void createDataEventsForTable( .filter(field -> table.columnWithName(field).typeName().equals("uuid")) .collect(Collectors.toList()); + List columnNames = + table.columns().stream() + .map(column -> jdbcConnection.quotedColumnIdString(column.name())) + .collect(Collectors.toList()); final String selectSql = PostgresQueryUtils.buildSplitScanQuery( snapshotSplit.getTableId(), snapshotSplit.getSplitKeyType(), snapshotSplit.getSplitStart() == null, snapshotSplit.getSplitEnd() == null, + columnNames, uuidFields); LOG.debug( "For split '{}' of table {} using select statement: '{}'", diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/PostgresQueryUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/PostgresQueryUtils.java index 9c0d558474c..348344bab14 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/PostgresQueryUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/PostgresQueryUtils.java @@ -31,7 +31,6 @@ import java.util.Iterator; import java.util.List; import java.util.Optional; -import java.util.stream.Collectors; import static org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.rowToArray; @@ -156,17 +155,16 @@ public static String buildSplitScanQuery( boolean isFirstSplit, boolean isLastSplit, List uuidFields) { - return buildSplitQuery(tableId, pkRowType, isFirstSplit, isLastSplit, uuidFields, -1, true); + return buildSplitScanQuery(tableId, pkRowType, isFirstSplit, isLastSplit, null, uuidFields); } - private static String buildSplitQuery( + public static String buildSplitScanQuery( TableId tableId, RowType pkRowType, boolean isFirstSplit, boolean isLastSplit, - List uuidFields, - int limitSize, - boolean isScanningData) { + List columnNames, + List uuidFields) { final String condition; if (isFirstSplit && isLastSplit) { @@ -174,11 +172,9 @@ private static String buildSplitQuery( } else if (isFirstSplit) { final StringBuilder sql = new StringBuilder(); addPrimaryKeyColumnsToCondition(pkRowType, sql, " <= ", uuidFields); - if (isScanningData) { - sql.append(" AND NOT ("); - addPrimaryKeyColumnsToCondition(pkRowType, sql, " = ", uuidFields); - sql.append(")"); - } + sql.append(" AND NOT ("); + addPrimaryKeyColumnsToCondition(pkRowType, sql, " = ", uuidFields); + sql.append(")"); condition = sql.toString(); } else if (isLastSplit) { final StringBuilder sql = new StringBuilder(); @@ -187,30 +183,19 @@ private static String buildSplitQuery( } else { final StringBuilder sql = new StringBuilder(); addPrimaryKeyColumnsToCondition(pkRowType, sql, " >= ", uuidFields); - if (isScanningData) { - sql.append(" AND NOT ("); - addPrimaryKeyColumnsToCondition(pkRowType, sql, " = ", uuidFields); - sql.append(")"); - } + sql.append(" AND NOT ("); + addPrimaryKeyColumnsToCondition(pkRowType, sql, " = ", uuidFields); + sql.append(")"); sql.append(" AND "); addPrimaryKeyColumnsToCondition(pkRowType, sql, " <= ", uuidFields); condition = sql.toString(); } - if (isScanningData) { - return buildSelectWithRowLimits( - tableId, limitSize, "*", Optional.ofNullable(condition), Optional.empty()); - } else { - final String orderBy = - pkRowType.getFieldNames().stream().collect(Collectors.joining(", ")); - return buildSelectWithBoundaryRowLimits( - tableId, - limitSize, - getPrimaryKeyColumnsProjection(pkRowType), - getMaxPrimaryKeyColumnsProjection(pkRowType), - Optional.ofNullable(condition), - orderBy); - } + return buildSelectWithRowLimits( + tableId, + columnNames == null ? "*" : String.join(",", columnNames), + Optional.ofNullable(condition), + Optional.empty()); } public static PreparedStatement readTableSplitDataStatement( @@ -330,7 +315,6 @@ private static String getMaxPrimaryKeyColumnsProjection(RowType pkRowType) { private static String buildSelectWithRowLimits( TableId tableId, - int limit, String projection, Optional condition, Optional orderBy) { @@ -343,9 +327,6 @@ private static String buildSelectWithRowLimits( if (orderBy.isPresent()) { sql.append(" ORDER BY ").append(orderBy.get()); } - if (limit > 0) { - sql.append(" LIMIT ").append(limit); - } return sql.toString(); }