Support dynamic table routing via kafka message header 'table.name.format'#1519
Open
Juheon Oh (OZOOOOOH) wants to merge 3 commits intoconfluentinc:masterfrom
Open
Support dynamic table routing via kafka message header 'table.name.format'#1519Juheon Oh (OZOOOOOH) wants to merge 3 commits intoconfluentinc:masterfrom
Juheon Oh (OZOOOOOH) wants to merge 3 commits intoconfluentinc:masterfrom
Conversation
- Support dynamic table routing via 'table.name.format' header
|
🎉 All Contributor License Agreements have been signed. Ready to merge. |
There was a problem hiding this comment.
Pull Request Overview
Adds support for dynamic table routing based on a Kafka message header (table.name.format).
Key changes:
- Extended
JdbcSinkConfigwith a special constant and updated documentation for header-based table routing. - Updated
JdbcDbWriterto determine destination tables from record headers when configured. - Added unit tests in
JdbcSinkTaskTestcovering header-based routing with different PK modes and invalid headers.
Reviewed Changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkConfig.java | Added TABLE_NAME_FORMAT_RECORD_HEADER, enhanced docstring for header routing. |
| src/main/java/io/confluent/connect/jdbc/sink/JdbcDbWriter.java | Implemented determineTableId and header extraction logic. |
| src/test/java/io/confluent/connect/jdbc/sink/JdbcSinkTaskTest.java | New tests for header-based routing (Kafka PK, record-key PK, invalid header). |
Comments suppressed due to low confidence (2)
src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkConfig.java:132
- There's a missing space between the closing parenthesis and the next sentence in the docstring. Consider changing to
"a header key ('table.name.format') that contains..."for readability.
+ "a header key ('table.name.format')"
src/test/java/io/confluent/connect/jdbc/sink/JdbcSinkTaskTest.java:575
- In the Kafka PK mode test, only the
kafka_offsetfield is asserted. Consider adding assertions forkafka_topicandkafka_partitionto fully validate all configured primary key columns.
assertEquals(44, rs.getLong("kafka_offset"));
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Author
|
Hi Sangeet Mishra (@sangeet259) , could you review this PR please? |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem
#960
#1208
#1194
Multiple table sink support has been requested in several issues.
Currently, JDBC sink connector only supports routing all records to a single table or using topic-based routing, which limits flexibility for complex data pipeline requirements.
Solution
This PR implements a simple and non-intrusive approach to support multiple table sinks by leveraging Kafka message headers.
I am currently operating a data pipeline in production using this feature, successfully routing data from a single Kafka topic through a single connector to over 100 different tables. This demonstrates the scalability and reliability of this approach for complex, multi-table data ingestion scenarios.
Key Implementation Details:
table.name.formatis set to__RECORD_HEADER__, the connector activates header-based table routingtable.name.formatHow it works:
table.name.format=__RECORD_HEADER__table.name.formatand target table name as valueDoes this solution apply anywhere else?
If yes, where?
Test Strategy
Testing done:
Release Plan