Skip to content

Commit 8e576aa

Browse files
[FLINK-38959][postgres] Update split state's table schemas info and infer schema change event based on pgoutput plugin's relation message. (#4316)
1 parent 5d8a317 commit 8e576aa

File tree

21 files changed

+1264
-121
lines changed

21 files changed

+1264
-121
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_LSN_COMMIT_CHECKPOINTS_DELAY;
7575
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
7676
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_STARTUP_MODE;
77+
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCHEMA_CHANGE_ENABLED;
7778
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SERVER_TIME_ZONE;
7879
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SLOT_NAME;
7980
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
@@ -131,6 +132,7 @@ public DataSource createDataSource(Context context) {
131132
boolean skipSnapshotBackfill = config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
132133
int lsnCommitCheckpointsDelay = config.get(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY);
133134
boolean tableIdIncludeDatabase = config.get(TABLE_ID_INCLUDE_DATABASE);
135+
boolean includeSchemaChanges = config.get(SCHEMA_CHANGE_ENABLED);
134136

135137
validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
136138
validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
@@ -172,6 +174,7 @@ public DataSource createDataSource(Context context) {
172174
.lsnCommitCheckpointsDelay(lsnCommitCheckpointsDelay)
173175
.assignUnboundedChunkFirst(isAssignUnboundedChunkFirst)
174176
.includeDatabaseInTableId(tableIdIncludeDatabase)
177+
.includeSchemaChanges(includeSchemaChanges)
175178
.getConfigFactory();
176179

177180
List<TableId> tableIds = PostgresSchemaUtils.listTables(configFactory.create(0), null);
@@ -262,6 +265,7 @@ public Set<ConfigOption<?>> optionalOptions() {
262265
options.add(METADATA_LIST);
263266
options.add(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
264267
options.add(TABLE_ID_INCLUDE_DATABASE);
268+
options.add(SCHEMA_CHANGE_ENABLED);
265269
return options;
266270
}
267271

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,4 +273,12 @@ public class PostgresDataSourceOptions {
273273
"Whether to include database in the generated Table ID. "
274274
+ "If set to true, the Table ID will be in the format (database, schema, table). "
275275
+ "If set to false, the Table ID will be in the format (schema, table). Defaults to false.");
276+
277+
@Experimental
278+
public static final ConfigOption<Boolean> SCHEMA_CHANGE_ENABLED =
279+
ConfigOptions.key("schema-change.enabled")
280+
.booleanType()
281+
.defaultValue(false)
282+
.withDescription(
283+
"Whether to infer CDC column types when processing pgoutput Relation messages.");
276284
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java

Lines changed: 64 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -19,41 +19,39 @@
1919

2020
import org.apache.flink.api.connector.source.SourceOutput;
2121
import org.apache.flink.cdc.common.event.CreateTableEvent;
22-
import org.apache.flink.cdc.common.event.Event;
22+
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
2323
import org.apache.flink.cdc.common.schema.Schema;
24-
import org.apache.flink.cdc.common.types.DataType;
2524
import org.apache.flink.cdc.connectors.base.options.StartupOptions;
2625
import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory;
2726
import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
2827
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
2928
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitState;
3029
import org.apache.flink.cdc.connectors.base.source.metrics.SourceReaderMetrics;
31-
import org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter;
3230
import org.apache.flink.cdc.connectors.postgres.source.PostgresDialect;
3331
import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
32+
import org.apache.flink.cdc.connectors.postgres.source.schema.PostgresSchemaRecord;
3433
import org.apache.flink.cdc.connectors.postgres.source.utils.TableDiscoveryUtils;
3534
import org.apache.flink.cdc.connectors.postgres.utils.PostgresSchemaUtils;
36-
import org.apache.flink.cdc.connectors.postgres.utils.PostgresTypeUtils;
3735
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
3836
import org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema;
3937
import org.apache.flink.connector.base.source.reader.RecordEmitter;
4038

4139
import io.debezium.connector.postgresql.connection.PostgresConnection;
4240
import io.debezium.data.Envelope;
43-
import io.debezium.relational.Column;
4441
import io.debezium.relational.Table;
4542
import io.debezium.relational.TableId;
4643
import io.debezium.relational.history.TableChanges;
4744
import org.apache.kafka.connect.data.Field;
4845
import org.apache.kafka.connect.data.Struct;
4946
import org.apache.kafka.connect.source.SourceRecord;
47+
import org.slf4j.Logger;
48+
import org.slf4j.LoggerFactory;
5049

5150
import java.sql.SQLException;
5251
import java.util.HashMap;
5352
import java.util.HashSet;
5453
import java.util.List;
5554
import java.util.Map;
56-
import java.util.Objects;
5755
import java.util.Set;
5856

5957
import static io.debezium.connector.AbstractSourceInfo.SCHEMA_NAME_KEY;
@@ -62,24 +60,26 @@
6260
import static org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.isDataChangeRecord;
6361
import static org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.isSchemaChangeEvent;
6462
import static org.apache.flink.cdc.connectors.postgres.utils.PostgresSchemaUtils.toCdcTableId;
63+
import static org.apache.flink.cdc.connectors.postgres.utils.SchemaChangeUtil.inferSchemaChangeEvent;
64+
import static org.apache.flink.cdc.connectors.postgres.utils.SchemaChangeUtil.toCreateTableEvent;
6565

6666
/** The {@link RecordEmitter} implementation for PostgreSQL pipeline connector. */
67-
public class PostgresPipelineRecordEmitter<T> extends IncrementalSourceRecordEmitter<T> {
67+
public class PostgresPipelineRecordEmitter<T> extends PostgresSourceRecordEmitter<T> {
68+
private static final Logger LOG = LoggerFactory.getLogger(PostgresPipelineRecordEmitter.class);
6869
private final PostgresSourceConfig sourceConfig;
6970
private final PostgresDialect postgresDialect;
7071

7172
// Used when startup mode is initial
72-
private Set<TableId> alreadySendCreateTableTables;
73+
private final Set<TableId> alreadySendCreateTableTables;
74+
private final boolean isBounded;
75+
private final boolean includeDatabaseInTableId;
76+
private final Map<TableId, CreateTableEvent> createTableEventCache;
7377

7478
// Used when startup mode is not initial
7579
private boolean shouldEmitAllCreateTableEventsInSnapshotMode = true;
76-
private boolean isBounded = false;
77-
private boolean includeDatabaseInTableId = false;
78-
79-
private final Map<TableId, CreateTableEvent> createTableEventCache;
8080

8181
public PostgresPipelineRecordEmitter(
82-
DebeziumDeserializationSchema debeziumDeserializationSchema,
82+
DebeziumDeserializationSchema<T> debeziumDeserializationSchema,
8383
SourceReaderMetrics sourceReaderMetrics,
8484
PostgresSourceConfig sourceConfig,
8585
OffsetFactory offsetFactory,
@@ -108,16 +108,11 @@ public void applySplit(SourceSplitBase split) {
108108
} else {
109109
for (Map.Entry<TableId, TableChanges.TableChange> entry :
110110
split.getTableSchemas().entrySet()) {
111-
TableId tableId =
112-
entry.getKey(); // Use the TableId from the map key which contains full info
113111
TableChanges.TableChange tableChange = entry.getValue();
112+
113+
Table table = tableChange.getTable();
114114
CreateTableEvent createTableEvent =
115-
new CreateTableEvent(
116-
toCdcTableId(
117-
tableId,
118-
sourceConfig.getDatabaseList().get(0),
119-
includeDatabaseInTableId),
120-
buildSchemaFromTable(tableChange.getTable()));
115+
toCreateTableEvent(table, sourceConfig, postgresDialect);
121116
((DebeziumEventDeserializationSchema) debeziumDeserializationSchema)
122117
.applyChangeEvent(createTableEvent);
123118
}
@@ -137,68 +132,63 @@ protected void processElement(
137132
shouldEmitAllCreateTableEventsInSnapshotMode = false;
138133
} else if (isLowWatermarkEvent(element) && splitState.isSnapshotSplitState()) {
139134
TableId tableId = splitState.asSnapshotSplitState().toSourceSplit().getTableId();
140-
if (!alreadySendCreateTableTables.contains(tableId)) {
141-
sendCreateTableEvent(tableId, (SourceOutput<Event>) output);
142-
alreadySendCreateTableTables.add(tableId);
143-
}
144-
} else {
145-
boolean isDataChangeRecord = isDataChangeRecord(element);
146-
if (isDataChangeRecord || isSchemaChangeEvent(element)) {
147-
TableId tableId = getTableId(element);
148-
if (!alreadySendCreateTableTables.contains(tableId)) {
149-
CreateTableEvent createTableEvent = createTableEventCache.get(tableId);
150-
if (createTableEvent != null) {
151-
output.collect((T) createTableEvent);
152-
}
153-
alreadySendCreateTableTables.add(tableId);
154-
}
155-
// In rare case, we may miss some CreateTableEvents before DataChangeEvents.
156-
// Don't send CreateTableEvent for SchemaChangeEvents as it's the latest schema.
157-
if (isDataChangeRecord && !createTableEventCache.containsKey(tableId)) {
158-
CreateTableEvent createTableEvent = getCreateTableEvent(sourceConfig, tableId);
159-
output.collect((T) createTableEvent);
160-
createTableEventCache.put(tableId, createTableEvent);
161-
}
162-
}
135+
maybeSendCreateTableEventFromCache(tableId, output);
136+
} else if (isDataChangeRecord(element)) {
137+
handleDataChangeRecord(element, output);
138+
} else if (isSchemaChangeEvent(element) && sourceConfig.isSchemaChangeEnabled()) {
139+
handleSchemaChangeRecord(element, output, splitState);
163140
}
164141
super.processElement(element, output, splitState);
165142
}
166143

167-
private Schema buildSchemaFromTable(Table table) {
168-
List<Column> columns = table.columns();
169-
Schema.Builder tableBuilder = Schema.newBuilder();
170-
for (int i = 0; i < columns.size(); i++) {
171-
Column column = columns.get(i);
172-
173-
String colName = column.name();
174-
DataType dataType;
175-
try (PostgresConnection jdbc = postgresDialect.openJdbcConnection()) {
176-
dataType =
177-
PostgresTypeUtils.fromDbzColumn(
178-
column,
179-
this.sourceConfig.getDbzConnectorConfig(),
180-
jdbc.getTypeRegistry());
181-
}
182-
if (!column.isOptional()) {
183-
dataType = dataType.notNull();
184-
}
185-
tableBuilder.physicalColumn(
186-
colName,
187-
dataType,
188-
column.comment(),
189-
column.defaultValueExpression().orElse(null));
144+
private void handleDataChangeRecord(SourceRecord element, SourceOutput<T> output) {
145+
TableId tableId = getTableId(element);
146+
maybeSendCreateTableEventFromCache(tableId, output);
147+
// In rare case, we may miss some CreateTableEvents before DataChangeEvents.
148+
// Don't send CreateTableEvent for SchemaChangeEvents as it's the latest schema.
149+
if (!createTableEventCache.containsKey(tableId)) {
150+
CreateTableEvent createTableEvent = getCreateTableEvent(sourceConfig, tableId);
151+
sendCreateTableEvent(createTableEvent, output);
152+
createTableEventCache.put(tableId, createTableEvent);
153+
}
154+
}
155+
156+
private void handleSchemaChangeRecord(
157+
SourceRecord element, SourceOutput<T> output, SourceSplitState splitState) {
158+
if (!(element instanceof PostgresSchemaRecord)) {
159+
// Ignore non-Postgres schema change records; they may represent non-relation
160+
// schema changes that are not handled via PostgresSchemaRecord.
161+
LOG.warn("Ignoring non-PostgresSchemaRecord schema change event: {}", element);
162+
return;
163+
}
164+
Map<TableId, TableChanges.TableChange> existedTableSchemas =
165+
splitState.toSourceSplit().getTableSchemas();
166+
PostgresSchemaRecord schemaRecord = (PostgresSchemaRecord) element;
167+
Table schemaAfter = schemaRecord.getTable();
168+
maybeSendCreateTableEventFromCache(schemaAfter.id(), output);
169+
Table schemaBefore = null;
170+
if (existedTableSchemas.containsKey(schemaAfter.id())) {
171+
schemaBefore = existedTableSchemas.get(schemaAfter.id()).getTable();
190172
}
191-
tableBuilder.comment(table.comment());
173+
List<SchemaChangeEvent> schemaChangeEvents =
174+
inferSchemaChangeEvent(
175+
schemaAfter.id(), schemaBefore, schemaAfter, sourceConfig, postgresDialect);
176+
LOG.info("Inferred Schema change events: {}", schemaChangeEvents);
177+
schemaChangeEvents.forEach(schemaChangeEvent -> output.collect((T) schemaChangeEvent));
178+
}
192179

193-
List<String> primaryKey = table.primaryKeyColumnNames();
194-
if (Objects.nonNull(primaryKey) && !primaryKey.isEmpty()) {
195-
tableBuilder.primaryKey(primaryKey);
180+
private void maybeSendCreateTableEventFromCache(TableId tableId, SourceOutput<T> output) {
181+
if (!alreadySendCreateTableTables.contains(tableId)) {
182+
CreateTableEvent createTableEvent = createTableEventCache.get(tableId);
183+
if (createTableEvent != null) {
184+
sendCreateTableEvent(createTableEvent, output);
185+
}
186+
alreadySendCreateTableTables.add(tableId);
196187
}
197-
return tableBuilder.build();
198188
}
199189

200-
private void sendCreateTableEvent(TableId tableId, SourceOutput<Event> output) {
201-
output.collect(getCreateTableEvent(sourceConfig, tableId));
190+
private void sendCreateTableEvent(CreateTableEvent createTableEvent, SourceOutput<T> output) {
191+
output.collect((T) createTableEvent);
202192
}
203193

204194
private CreateTableEvent getCreateTableEvent(

0 commit comments

Comments
 (0)