Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.shaded.guava31.com.google.common.util.concurrent.ThreadFactoryBuilder;

import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.data.Envelope;
import io.debezium.pipeline.DataChangeEvent;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
Expand Down Expand Up @@ -177,7 +178,14 @@ public Iterator<SourceRecords> pollWithBuffer() throws InterruptedException {
}

if (!reachChangeLogStart) {
outputBuffer.put((Struct) record.key(), record);
if (record.key() != null) {
outputBuffer.put((Struct) record.key(), record);
} else {
// For tables without primary key, use after struct as buffer key
Struct value = (Struct) record.value();
Struct after = value.getStruct(Envelope.FieldName.AFTER);
outputBuffer.put(after, record);
}
} else {
if (isChangeRecordInChunkRange(record)) {
// rewrite overlapping snapshot records through the record key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import io.debezium.util.SchemaNameAdjuster;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Instant;
import java.util.Collection;
Expand All @@ -49,6 +51,8 @@
@Internal
public abstract class JdbcSourceFetchTaskContext implements FetchTask.Context {

private static final Logger LOG = LoggerFactory.getLogger(JdbcSourceFetchTaskContext.class);

Comment thread
JNSimba marked this conversation as resolved.
Outdated
protected final JdbcSourceConfig sourceConfig;
protected final JdbcDataSourceDialect dataSourceDialect;
protected CommonConnectorConfig dbzConnectorConfig;
Expand Down Expand Up @@ -92,33 +96,44 @@ public Object[] getSplitKey(SourceRecord record) {
@Override
public void rewriteOutputBuffer(
Map<Struct, SourceRecord> outputBuffer, SourceRecord changeRecord) {
Struct key = (Struct) changeRecord.key();
boolean hasPrimaryKey = changeRecord.key() != null;
Struct value = (Struct) changeRecord.value();
if (value != null) {
Envelope.Operation operation =
Envelope.Operation.forCode(value.getString(Envelope.FieldName.OPERATION));
switch (operation) {
case CREATE:
if (hasPrimaryKey) {
outputBuffer.put(
(Struct) changeRecord.key(), buildReadRecord(changeRecord));
} else {
Struct after = value.getStruct(Envelope.FieldName.AFTER);
outputBuffer.put(after, buildReadRecord(changeRecord));
}
break;
case UPDATE:
Envelope envelope = Envelope.fromSchema(changeRecord.valueSchema());
Struct source = value.getStruct(Envelope.FieldName.SOURCE);
Struct after = value.getStruct(Envelope.FieldName.AFTER);
Instant fetchTs =
Instant.ofEpochMilli((Long) source.get(Envelope.FieldName.TIMESTAMP));
SourceRecord record =
new SourceRecord(
changeRecord.sourcePartition(),
changeRecord.sourceOffset(),
changeRecord.topic(),
changeRecord.kafkaPartition(),
changeRecord.keySchema(),
changeRecord.key(),
changeRecord.valueSchema(),
envelope.read(after, source, fetchTs));
outputBuffer.put(key, record);
if (hasPrimaryKey) {
outputBuffer.put(
(Struct) changeRecord.key(), buildReadRecord(changeRecord));
} else {
// For no-PK table: remove the before image, insert the after image
Struct before = value.getStruct(Envelope.FieldName.BEFORE);
Struct after = value.getStruct(Envelope.FieldName.AFTER);
if (before != null) {
outputBuffer.remove(before);
}
outputBuffer.put(after, buildReadRecord(changeRecord));
}
break;
case DELETE:
outputBuffer.remove(key);
if (hasPrimaryKey) {
outputBuffer.remove((Struct) changeRecord.key());
} else {
Struct before = value.getStruct(Envelope.FieldName.BEFORE);
if (before != null) {
outputBuffer.remove(before);
}
}
break;
case READ:
throw new IllegalStateException(
Expand All @@ -129,6 +144,23 @@ public void rewriteOutputBuffer(
}
}

private SourceRecord buildReadRecord(SourceRecord changeRecord) {
Struct value = (Struct) changeRecord.value();
Envelope envelope = Envelope.fromSchema(changeRecord.valueSchema());
Struct source = value.getStruct(Envelope.FieldName.SOURCE);
Struct after = value.getStruct(Envelope.FieldName.AFTER);
Instant fetchTs = Instant.ofEpochMilli((Long) source.get(Envelope.FieldName.TIMESTAMP));
return new SourceRecord(
changeRecord.sourcePartition(),
changeRecord.sourceOffset(),
changeRecord.topic(),
changeRecord.kafkaPartition(),
changeRecord.keySchema(),
changeRecord.key(),
changeRecord.valueSchema(),
envelope.read(after, source, fetchTs));
}

@Override
public List<SourceRecord> formatMessageTimestamp(Collection<SourceRecord> snapshotRecords) {
return snapshotRecords.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.cdc.connectors.base.source.meta.split.FinishedSnapshotSplitInfo;
import org.apache.flink.table.types.logical.RowType;

import io.debezium.data.Envelope;
import io.debezium.util.SchemaNameAdjuster;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
Expand Down Expand Up @@ -237,7 +238,29 @@ public static Object[] getSplitKey(
RowType splitBoundaryType, SourceRecord dataRecord, SchemaNameAdjuster nameAdjuster) {
// the split key field contains single field now
String splitFieldName = nameAdjuster.adjust(splitBoundaryType.getFieldNames().get(0));
Struct key = (Struct) dataRecord.key();
return new Object[] {key.get(splitFieldName)};
Struct target;
if (dataRecord.key() != null) {
target = (Struct) dataRecord.key();
} else {
// For tables without primary key, extract chunk key from value struct
target = getStructContainingChunkKey(dataRecord);
}
return new Object[] {target.get(splitFieldName)};
}

/**
* For tables without primary key, the chunk key is not in the record key (which is null).
* Instead, extract it from the value's after struct (for CREATE/READ) or before struct (for
* UPDATE/DELETE).
*/
public static Struct getStructContainingChunkKey(SourceRecord record) {
Struct value = (Struct) record.value();
Envelope.Operation op =
Envelope.Operation.forCode(value.getString(Envelope.FieldName.OPERATION));
if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
return value.getStruct(Envelope.FieldName.AFTER);
} else {
return value.getStruct(Envelope.FieldName.BEFORE);
}
Comment thread
JNSimba marked this conversation as resolved.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,13 @@
import io.debezium.connector.postgresql.connection.PostgresConnectionUtils;
import io.debezium.connector.postgresql.connection.PostgresReplicationConnection;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.relational.history.TableChanges.TableChange;
import io.debezium.schema.TopicSelector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

Expand All @@ -64,6 +67,7 @@
/** The dialect for Postgres. */
public class PostgresDialect implements JdbcDataSourceDialect {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(PostgresDialect.class);
private static final String CONNECTION_NAME = "postgres-cdc-connector";

private final PostgresSourceConfig sourceConfig;
Expand Down Expand Up @@ -196,13 +200,64 @@ public Map<TableId, TableChange> discoverDataCollectionSchemas(JdbcSourceConfig
try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) {
// fetch table schemas
Map<TableId, TableChange> tableSchemas = queryTableSchema(jdbc, capturedTableIds);
// validate REPLICA IDENTITY for tables without primary key
validateReplicaIdentityForNoPkTables(jdbc, tableSchemas);
return tableSchemas;
Comment thread
JNSimba marked this conversation as resolved.
} catch (Exception e) {
throw new FlinkRuntimeException(
"Error to discover table schemas: " + e.getMessage(), e);
}
}

/**
* Validates that tables without primary key have REPLICA IDENTITY FULL set. This is required
* for PostgreSQL to include the full before image in WAL for UPDATE and DELETE events.
*/
private void validateReplicaIdentityForNoPkTables(
JdbcConnection jdbc, Map<TableId, TableChange> tableSchemas) throws SQLException {
for (Map.Entry<TableId, TableChange> entry : tableSchemas.entrySet()) {
TableId tableId = entry.getKey();
Table table = entry.getValue().getTable();
if (table.primaryKeyColumnNames().isEmpty()) {
String replicaIdentity = queryReplicaIdentity(jdbc, tableId);
if (!"f".equalsIgnoreCase(replicaIdentity)) {
throw new FlinkRuntimeException(
String.format(
"Table '%s.%s' has no primary key. "
+ "To use incremental snapshot for tables without primary key, "
+ "REPLICA IDENTITY FULL must be set. "
+ "Please execute: ALTER TABLE %s.%s REPLICA IDENTITY FULL",
tableId.schema(),
tableId.table(),
tableId.schema(),
tableId.table()));
}
LOG.info(
"Table '{}.{}' has no primary key but has REPLICA IDENTITY FULL set.",
tableId.schema(),
tableId.table());
}
}
}

private String queryReplicaIdentity(JdbcConnection jdbc, TableId tableId) throws SQLException {
String query =
String.format(
"SELECT relreplident FROM pg_class c "
+ "JOIN pg_namespace n ON c.relnamespace = n.oid "
+ "WHERE n.nspname = '%s' AND c.relname = '%s'",
tableId.schema(), tableId.table());
Comment thread
JNSimba marked this conversation as resolved.
Outdated
final String[] result = new String[1];
jdbc.query(
query,
rs -> {
if (rs.next()) {
result[0] = rs.getString(1);
}
});
Comment thread
JNSimba marked this conversation as resolved.
return result[0];
}

@Override
public JdbcConnectionPoolFactory getPooledDataSourceFactory() {
return new PostgresConnectionPoolFactory();
Expand Down
Loading
Loading