Skip to content

[FLINK-39810][Connectors/kinesis] Support upsert changelog streams in Kinesis SQL connector#249

Open
fmorillo7694 wants to merge 2 commits into
apache:mainfrom
fmorillo7694:feature/kinesis-upsert-support
Open

[FLINK-39810][Connectors/kinesis] Support upsert changelog streams in Kinesis SQL connector#249
fmorillo7694 wants to merge 2 commits into
apache:mainfrom
fmorillo7694:feature/kinesis-upsert-support

Conversation

@fmorillo7694

Copy link
Copy Markdown

Summary

  • Enable the kinesis connector to accept upsert changelog streams (GROUP BY,
    deduplication, streaming joins) by detecting PRIMARY KEY on the table
  • DELETE/UPDATE_BEFORE rows are written as empty-payload tombstone records
  • Primary key fields are used as the Kinesis partition key for consistent shard routing
  • No new configuration needed — just define PRIMARY KEY (col) NOT ENFORCED

Changes

  • KinesisDynamicSink: Added upsertMode flag and UpsertSerializationSchemaWrapper
    that writes tombstones for deletes and normalizes RowKind for inserts
  • KinesisDynamicTableFactory: Detects primary key on the table, enables upsert mode,
    and overrides the partitioner to use primary key fields
  • KinesisUpsertSinkSerializationTest: Unit tests for serialization behavior across
    all RowKind types

Test plan

  • Unit tests for UpsertSerializationSchemaWrapper (INSERT, UPDATE_AFTER, DELETE, UPDATE_BEFORE)
  • Unit tests for RowKind preservation after serialization
  • Existing KinesisDynamicTableSinkFactoryTest still passes (backward compatible)
  • Checkstyle, Spotless, ArchUnit all pass
  • Verified end-to-end against real KDS: GROUP BY query successfully wrote
    aggregated records with correct primary-key-based partition keys

Purpose of the change

For example: Implements the Table API for the Kinesis Source.

Verifying this change

Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment
  • Added unit tests
  • Manually verified by running the Kinesis connector on a local Flink cluster.

Significant changes

(Please check any boxes [x] if the answer is "yes". You can first publish the PR and check them afterwards, for convenience.)

  • Dependencies have been added or upgraded
  • Public API has been changed (Public API is any class annotated with @Public(Evolving))
  • Serializers have been changed
  • New feature has been introduced
    • If yes, how is this documented? (not applicable / docs / JavaDocs / not documented)

FCAgent added 2 commits June 1, 2026 10:10
Enable the kinesis connector to accept upsert changelog streams (GROUP BY,
deduplication, streaming joins) by detecting PRIMARY KEY on the table and
switching to upsert mode automatically. Deletes are written as empty-payload
tombstone records. The primary key fields are used as the Kinesis partition
key for consistent shard routing.

No separate connector needed — just define a PRIMARY KEY on the table.
Add documentation for the new upsert mode capability including usage
examples, ordering considerations, and tombstone behavior.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants