Skip to content

Commit 1ea71bb

Browse files
committed
add option to control whether need to infer and emit schema change event.
1 parent db595e4 commit 1ea71bb

19 files changed

Lines changed: 1028 additions & 106 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_LSN_COMMIT_CHECKPOINTS_DELAY;
7575
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
7676
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_STARTUP_MODE;
77+
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCHEMA_CHANGE_ENABLED;
7778
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SERVER_TIME_ZONE;
7879
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SLOT_NAME;
7980
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
@@ -131,6 +132,7 @@ public DataSource createDataSource(Context context) {
131132
boolean skipSnapshotBackfill = config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
132133
int lsnCommitCheckpointsDelay = config.get(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY);
133134
boolean tableIdIncludeDatabase = config.get(TABLE_ID_INCLUDE_DATABASE);
135+
boolean schemaChangeEnabled = config.get(SCHEMA_CHANGE_ENABLED);
134136

135137
validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
136138
validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
@@ -172,6 +174,7 @@ public DataSource createDataSource(Context context) {
172174
.lsnCommitCheckpointsDelay(lsnCommitCheckpointsDelay)
173175
.assignUnboundedChunkFirst(isAssignUnboundedChunkFirst)
174176
.includeDatabaseInTableId(tableIdIncludeDatabase)
177+
.enableSchemaChange(schemaChangeEnabled)
175178
.getConfigFactory();
176179

177180
List<TableId> tableIds = PostgresSchemaUtils.listTables(configFactory.create(0), null);
@@ -262,6 +265,7 @@ public Set<ConfigOption<?>> optionalOptions() {
262265
options.add(METADATA_LIST);
263266
options.add(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
264267
options.add(TABLE_ID_INCLUDE_DATABASE);
268+
options.add(SCHEMA_CHANGE_ENABLED);
265269
return options;
266270
}
267271

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,4 +273,12 @@ public class PostgresDataSourceOptions {
273273
"Whether to include database in the generated Table ID. "
274274
+ "If set to true, the Table ID will be in the format (database, schema, table). "
275275
+ "If set to false, the Table ID will be in the format (schema, table). Defaults to false.");
276+
277+
@Experimental
278+
public static final ConfigOption<Boolean> SCHEMA_CHANGE_ENABLED =
279+
ConfigOptions.key("schema-change.enabled")
280+
.booleanType()
281+
.defaultValue(false)
282+
.withDescription(
283+
"Whether to infer CDC column types when processing pgoutput Relation messages.");
276284
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java

Lines changed: 43 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -20,27 +20,25 @@
2020
import org.apache.flink.api.connector.source.SourceOutput;
2121
import org.apache.flink.cdc.common.event.CreateTableEvent;
2222
import org.apache.flink.cdc.common.event.Event;
23+
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
2324
import org.apache.flink.cdc.common.schema.Schema;
24-
import org.apache.flink.cdc.common.types.DataType;
2525
import org.apache.flink.cdc.connectors.base.options.StartupOptions;
2626
import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory;
2727
import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
2828
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
2929
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitState;
3030
import org.apache.flink.cdc.connectors.base.source.metrics.SourceReaderMetrics;
31-
import org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter;
3231
import org.apache.flink.cdc.connectors.postgres.source.PostgresDialect;
3332
import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
33+
import org.apache.flink.cdc.connectors.postgres.source.schema.DebeziumSchemaRecord;
3434
import org.apache.flink.cdc.connectors.postgres.source.utils.TableDiscoveryUtils;
3535
import org.apache.flink.cdc.connectors.postgres.utils.PostgresSchemaUtils;
36-
import org.apache.flink.cdc.connectors.postgres.utils.PostgresTypeUtils;
3736
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
3837
import org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema;
3938
import org.apache.flink.connector.base.source.reader.RecordEmitter;
4039

4140
import io.debezium.connector.postgresql.connection.PostgresConnection;
4241
import io.debezium.data.Envelope;
43-
import io.debezium.relational.Column;
4442
import io.debezium.relational.Table;
4543
import io.debezium.relational.TableId;
4644
import io.debezium.relational.history.TableChanges;
@@ -53,7 +51,6 @@
5351
import java.util.HashSet;
5452
import java.util.List;
5553
import java.util.Map;
56-
import java.util.Objects;
5754
import java.util.Set;
5855

5956
import static io.debezium.connector.AbstractSourceInfo.SCHEMA_NAME_KEY;
@@ -62,24 +59,25 @@
6259
import static org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.isDataChangeRecord;
6360
import static org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.isSchemaChangeEvent;
6461
import static org.apache.flink.cdc.connectors.postgres.utils.PostgresSchemaUtils.toCdcTableId;
62+
import static org.apache.flink.cdc.connectors.postgres.utils.SchemaChangeUtil.inferSchemaChangeEvent;
63+
import static org.apache.flink.cdc.connectors.postgres.utils.SchemaChangeUtil.toCreateTableEvent;
6564

6665
/** The {@link RecordEmitter} implementation for PostgreSQL pipeline connector. */
67-
public class PostgresPipelineRecordEmitter<T> extends IncrementalSourceRecordEmitter<T> {
66+
public class PostgresPipelineRecordEmitter<T> extends PostgresSourceRecordEmitter<T> {
6867
private final PostgresSourceConfig sourceConfig;
6968
private final PostgresDialect postgresDialect;
7069

7170
// Used when startup mode is initial
72-
private Set<TableId> alreadySendCreateTableTables;
71+
private final Set<TableId> alreadySendCreateTableTables;
72+
private final boolean isBounded;
73+
private final boolean includeDatabaseInTableId;
74+
private final Map<TableId, CreateTableEvent> createTableEventCache;
7375

7476
// Used when startup mode is not initial
7577
private boolean shouldEmitAllCreateTableEventsInSnapshotMode = true;
76-
private boolean isBounded = false;
77-
private boolean includeDatabaseInTableId = false;
78-
79-
private final Map<TableId, CreateTableEvent> createTableEventCache;
8078

8179
public PostgresPipelineRecordEmitter(
82-
DebeziumDeserializationSchema debeziumDeserializationSchema,
80+
DebeziumDeserializationSchema<T> debeziumDeserializationSchema,
8381
SourceReaderMetrics sourceReaderMetrics,
8482
PostgresSourceConfig sourceConfig,
8583
OffsetFactory offsetFactory,
@@ -108,16 +106,13 @@ public void applySplit(SourceSplitBase split) {
108106
} else {
109107
for (Map.Entry<TableId, TableChanges.TableChange> entry :
110108
split.getTableSchemas().entrySet()) {
111-
TableId tableId =
112-
entry.getKey(); // Use the TableId from the map key which contains full info
113109
TableChanges.TableChange tableChange = entry.getValue();
110+
111+
// Currently serialize of split will mismatch the catalog and schema of Table.id()
112+
113+
Table table = tableChange.getTable();
114114
CreateTableEvent createTableEvent =
115-
new CreateTableEvent(
116-
toCdcTableId(
117-
tableId,
118-
sourceConfig.getDatabaseList().get(0),
119-
includeDatabaseInTableId),
120-
buildSchemaFromTable(tableChange.getTable()));
115+
toCreateTableEvent(entry.getKey(), table, sourceConfig, postgresDialect);
121116
((DebeziumEventDeserializationSchema) debeziumDeserializationSchema)
122117
.applyChangeEvent(createTableEvent);
123118
}
@@ -141,60 +136,42 @@ protected void processElement(
141136
sendCreateTableEvent(tableId, (SourceOutput<Event>) output);
142137
alreadySendCreateTableTables.add(tableId);
143138
}
144-
} else {
145-
boolean isDataChangeRecord = isDataChangeRecord(element);
146-
if (isDataChangeRecord || isSchemaChangeEvent(element)) {
147-
TableId tableId = getTableId(element);
148-
if (!alreadySendCreateTableTables.contains(tableId)) {
149-
CreateTableEvent createTableEvent = createTableEventCache.get(tableId);
150-
if (createTableEvent != null) {
151-
output.collect((T) createTableEvent);
152-
}
153-
alreadySendCreateTableTables.add(tableId);
154-
}
155-
// In rare case, we may miss some CreateTableEvents before DataChangeEvents.
156-
// Don't send CreateTableEvent for SchemaChangeEvents as it's the latest schema.
157-
if (isDataChangeRecord && !createTableEventCache.containsKey(tableId)) {
158-
CreateTableEvent createTableEvent = getCreateTableEvent(sourceConfig, tableId);
139+
} else if (isDataChangeRecord(element)) {
140+
TableId tableId = getTableId(element);
141+
if (!alreadySendCreateTableTables.contains(tableId)) {
142+
CreateTableEvent createTableEvent = createTableEventCache.get(tableId);
143+
if (createTableEvent != null) {
159144
output.collect((T) createTableEvent);
160-
createTableEventCache.put(tableId, createTableEvent);
161145
}
146+
alreadySendCreateTableTables.add(tableId);
147+
}
148+
// In rare case, we may miss some CreateTableEvents before DataChangeEvents.
149+
// Don't send CreateTableEvent for SchemaChangeEvents as it's the latest schema.
150+
if (!createTableEventCache.containsKey(tableId)) {
151+
CreateTableEvent createTableEvent = getCreateTableEvent(sourceConfig, tableId);
152+
output.collect((T) createTableEvent);
153+
createTableEventCache.put(tableId, createTableEvent);
162154
}
155+
} else if (isSchemaChangeEvent(element) && sourceConfig.isSchemaChangeEnabled()) {
156+
handleSchemaChangeRecord(element, output, splitState);
163157
}
164158
super.processElement(element, output, splitState);
165159
}
166160

167-
private Schema buildSchemaFromTable(Table table) {
168-
List<Column> columns = table.columns();
169-
Schema.Builder tableBuilder = Schema.newBuilder();
170-
for (int i = 0; i < columns.size(); i++) {
171-
Column column = columns.get(i);
172-
173-
String colName = column.name();
174-
DataType dataType;
175-
try (PostgresConnection jdbc = postgresDialect.openJdbcConnection()) {
176-
dataType =
177-
PostgresTypeUtils.fromDbzColumn(
178-
column,
179-
this.sourceConfig.getDbzConnectorConfig(),
180-
jdbc.getTypeRegistry());
181-
}
182-
if (!column.isOptional()) {
183-
dataType = dataType.notNull();
184-
}
185-
tableBuilder.physicalColumn(
186-
colName,
187-
dataType,
188-
column.comment(),
189-
column.defaultValueExpression().orElse(null));
190-
}
191-
tableBuilder.comment(table.comment());
192-
193-
List<String> primaryKey = table.primaryKeyColumnNames();
194-
if (Objects.nonNull(primaryKey) && !primaryKey.isEmpty()) {
195-
tableBuilder.primaryKey(primaryKey);
161+
private void handleSchemaChangeRecord(
162+
SourceRecord element, SourceOutput<T> output, SourceSplitState splitState) {
163+
Map<TableId, TableChanges.TableChange> existedTableSchemas =
164+
splitState.toSourceSplit().getTableSchemas();
165+
DebeziumSchemaRecord schemaRecord = (DebeziumSchemaRecord) element;
166+
Table schemaAfter = schemaRecord.getTable();
167+
Table schemaBefore = null;
168+
if (existedTableSchemas.containsKey(schemaAfter.id())) {
169+
schemaBefore = existedTableSchemas.get(schemaAfter.id()).getTable();
196170
}
197-
return tableBuilder.build();
171+
List<SchemaChangeEvent> schemaChangeEvents =
172+
inferSchemaChangeEvent(
173+
schemaAfter.id(), schemaBefore, schemaAfter, sourceConfig, postgresDialect);
174+
schemaChangeEvents.forEach(schemaChangeEvent -> output.collect((T) schemaChangeEvent));
198175
}
199176

200177
private void sendCreateTableEvent(TableId tableId, SourceOutput<Event> output) {

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresSchemaUtils.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,11 @@ public static Schema getTableSchema(
174174
}
175175
}
176176

177+
public static Schema buildSchemaFromTable(
178+
Table table, PostgresConnectorConfig dbzConfig, TypeRegistry typeRegistry) {
179+
return toSchema(table, dbzConfig, typeRegistry);
180+
}
181+
177182
public static Schema toSchema(
178183
Table table, PostgresConnectorConfig dbzConfig, TypeRegistry typeRegistry) {
179184
List<Column> columns =

0 commit comments

Comments
 (0)