Skip to content

Conversation

gmdfalk
Copy link

@gmdfalk gmdfalk commented Sep 29, 2025

Purpose

Add --metadata_column support to Paimon Kafka CDC connector, similar to the already existing options added for MySQL and Postgres: #2077

Supported metadata columns are those on org.apache.kafka.clients.consumer.ConsumerRecord i.e.:

  • topic
  • partition
  • offset
  • timestamp
  • timestampType: This is the name of the enum i.e. NoTimestampType, CreateTime or LogAppendTime

The feature is backwards compatible. It's only active when --metadata_column is supplied resp. SynchronizationActionBase.withMetadataColumns is used.

For now, I've only implemented this for the KafkaDebeziumAvroDeserializationSchema and KafkaDebeziumJsonDeserializationSchema.

Tests

KafkaMetadataConverter.java
Will also add more integration tests for Kafka Table and Database sync actions for various input formats.

API and Format

No changes to public apis or storage format.

The changes here are contained to the flink cdc package but I did have to update CdcSourceRecord since it previously didn't provide a way to surface arbitrary metadata for a record.

The metadata attribute on CdcSourceRecord is intentionally a generic Map so that it can potentially be used to add metadata support for other connectors like Pulsar or Mongo that are not yet implemented.

Documentation

Added the new --metadata_column parameter to Kafka CDC docs.

Dev notes

For running integration tests on MacOS with Rancher Desktop, i had to properly expose the docker socket to testcontainers e.g. system wide via sudo ln -sf "$HOME/.rd/docker.sock" /var/run/docker.sock.

Todo/WIP

  • Consider making the new parsing path the default (without metadata convertors, it will just be a noop) instead of having custom constructors
  • Add integration tests for all Kafka CDC formats (currently only working on itests for debezium avro and debezium json)

@gmdfalk gmdfalk changed the title add kafka metadata support add --metadata_column support for Kafka CDC connector Sep 29, 2025
Signed-off-by: Max Falk <[email protected]>
Signed-off-by: Max Falk <[email protected]>
private static final long serialVersionUID = 1L;

@Override
public String read(JsonNode source) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could push this into CdcMetadataConverter, so that both read() methods default to UnsupportedOperationException.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, you could dedupe most of this by adding a 'fieldName' parameter & pulling all the methods out into a superclass.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants