-
Notifications
You must be signed in to change notification settings - Fork 461
Description
Search before asking
- I searched in the issues and found nothing similar.
Motivation
This issue aims to enhance our Flink connector by introducing support for the $changelog auxiliary table. This feature is essential for capturing and processing change data capture (CDC) events seamlessly within Flink streaming jobs.
Fluss primary key tables support change data capature to track row-level changes for updates and deletes. When streaming read the primary key table, the flink connector emit records with Flink native RowKind (INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE) to enable stateful computation on changelogs. On the other hand, there are many use cases that users want to consume the plain logs without converting into Flink native RowKind. This feature is similar to Paimon $audit_log table, and Databricks table_changes(..) query.
Solution
Implementation
FlinkCatalogsupportsgetTablefor<table_name>$changelogtable path, and the returned table should include additional metadata columns (see following).FlinkRecordEmitterofFlinkSourceReadershould have a specialFlussRowToFlinkRowConverterthat converts the FlussInternalRowinto FlinkRowDatawith the additional metadata columns.CoordinatorService#createTableshould add validation that whether the created table using system reserved columns (_change_type,_log_offset,_commit_timestamp).
Schema of the $changelog table
| Column Name | Type | Values |
|---|---|---|
_change_type |
String | +I, -U, +U, -D |
_log_offset |
long | the offset of the log |
_commit_timestamp |
TIMESTAMP_LTZ | the timestamp associated when the change was happended |
Anything else?
You can take Paimon $audit_log implementation as an example: https://github.com/apache/paimon/blob/release-1.0/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java#L69
Willingness to contribute
- I'm willing to submit a PR!