Skip to content

Commit 0d86de2

Browse files
committed
[FLINK-39071] Select fields rather * to read snapshot data in case schema changes.
1 parent 8ea04ec commit 0d86de2

2 files changed

Lines changed: 20 additions & 34 deletions

File tree

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,12 +287,17 @@ private void createDataEventsForTable(
287287
.filter(field -> table.columnWithName(field).typeName().equals("uuid"))
288288
.collect(Collectors.toList());
289289

290+
List<String> columnNames =
291+
table.columns().stream()
292+
.map(column -> jdbcConnection.quotedColumnIdString(column.name()))
293+
.collect(Collectors.toList());
290294
final String selectSql =
291295
PostgresQueryUtils.buildSplitScanQuery(
292296
snapshotSplit.getTableId(),
293297
snapshotSplit.getSplitKeyType(),
294298
snapshotSplit.getSplitStart() == null,
295299
snapshotSplit.getSplitEnd() == null,
300+
columnNames,
296301
uuidFields);
297302
LOG.debug(
298303
"For split '{}' of table {} using select statement: '{}'",

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/PostgresQueryUtils.java

Lines changed: 15 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import java.util.Iterator;
3232
import java.util.List;
3333
import java.util.Optional;
34-
import java.util.stream.Collectors;
3534

3635
import static org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.rowToArray;
3736

@@ -156,29 +155,26 @@ public static String buildSplitScanQuery(
156155
boolean isFirstSplit,
157156
boolean isLastSplit,
158157
List<String> uuidFields) {
159-
return buildSplitQuery(tableId, pkRowType, isFirstSplit, isLastSplit, uuidFields, -1, true);
158+
return buildSplitScanQuery(tableId, pkRowType, isFirstSplit, isLastSplit, null, uuidFields);
160159
}
161160

162-
private static String buildSplitQuery(
161+
public static String buildSplitScanQuery(
163162
TableId tableId,
164163
RowType pkRowType,
165164
boolean isFirstSplit,
166165
boolean isLastSplit,
167-
List<String> uuidFields,
168-
int limitSize,
169-
boolean isScanningData) {
166+
List<String> columnNames,
167+
List<String> uuidFields) {
170168
final String condition;
171169

172170
if (isFirstSplit && isLastSplit) {
173171
condition = null;
174172
} else if (isFirstSplit) {
175173
final StringBuilder sql = new StringBuilder();
176174
addPrimaryKeyColumnsToCondition(pkRowType, sql, " <= ", uuidFields);
177-
if (isScanningData) {
178-
sql.append(" AND NOT (");
179-
addPrimaryKeyColumnsToCondition(pkRowType, sql, " = ", uuidFields);
180-
sql.append(")");
181-
}
175+
sql.append(" AND NOT (");
176+
addPrimaryKeyColumnsToCondition(pkRowType, sql, " = ", uuidFields);
177+
sql.append(")");
182178
condition = sql.toString();
183179
} else if (isLastSplit) {
184180
final StringBuilder sql = new StringBuilder();
@@ -187,30 +183,19 @@ private static String buildSplitQuery(
187183
} else {
188184
final StringBuilder sql = new StringBuilder();
189185
addPrimaryKeyColumnsToCondition(pkRowType, sql, " >= ", uuidFields);
190-
if (isScanningData) {
191-
sql.append(" AND NOT (");
192-
addPrimaryKeyColumnsToCondition(pkRowType, sql, " = ", uuidFields);
193-
sql.append(")");
194-
}
186+
sql.append(" AND NOT (");
187+
addPrimaryKeyColumnsToCondition(pkRowType, sql, " = ", uuidFields);
188+
sql.append(")");
195189
sql.append(" AND ");
196190
addPrimaryKeyColumnsToCondition(pkRowType, sql, " <= ", uuidFields);
197191
condition = sql.toString();
198192
}
199193

200-
if (isScanningData) {
201-
return buildSelectWithRowLimits(
202-
tableId, limitSize, "*", Optional.ofNullable(condition), Optional.empty());
203-
} else {
204-
final String orderBy =
205-
pkRowType.getFieldNames().stream().collect(Collectors.joining(", "));
206-
return buildSelectWithBoundaryRowLimits(
207-
tableId,
208-
limitSize,
209-
getPrimaryKeyColumnsProjection(pkRowType),
210-
getMaxPrimaryKeyColumnsProjection(pkRowType),
211-
Optional.ofNullable(condition),
212-
orderBy);
213-
}
194+
return buildSelectWithRowLimits(
195+
tableId,
196+
columnNames == null ? "*" : String.join(",", columnNames),
197+
Optional.ofNullable(condition),
198+
Optional.empty());
214199
}
215200

216201
public static PreparedStatement readTableSplitDataStatement(
@@ -330,7 +315,6 @@ private static String getMaxPrimaryKeyColumnsProjection(RowType pkRowType) {
330315

331316
private static String buildSelectWithRowLimits(
332317
TableId tableId,
333-
int limit,
334318
String projection,
335319
Optional<String> condition,
336320
Optional<String> orderBy) {
@@ -343,9 +327,6 @@ private static String buildSelectWithRowLimits(
343327
if (orderBy.isPresent()) {
344328
sql.append(" ORDER BY ").append(orderBy.get());
345329
}
346-
if (limit > 0) {
347-
sql.append(" LIMIT ").append(limit);
348-
}
349330
return sql.toString();
350331
}
351332

0 commit comments

Comments
 (0)