-
Notifications
You must be signed in to change notification settings - Fork 3.3k
feat(kafka): automatically upsize topic partition counts #15714
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
feat(kafka): automatically upsize topic partition counts #15714
Conversation
Extends CreateKafkaTopicsStep to detect and automatically increase partition counts for existing topics that have fewer partitions than configured. Previously, the upgrade step only created new topics and skipped existing ones. Now it checks existing topics and increases their partition count if they're under-provisioned. The step fails if partition count checks fail, ensuring operators are alerted to configuration issues. Note that Kafka does not support reducing partition counts, so topics with more partitions than configured are left unchanged with a warning. Changes: - Add getCurrentPartitionCount() method to check existing partition counts - Track partitionsToIncrease map for topics that need upsizing - Track failedTopics list and throw exception if any checks fail - Add comprehensive test coverage for upsizing and failure scenarios - Update kafka-config.md to document automatic partition upsizing behavior
… auto-upsize 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <[email protected]>
Codecov Report✅ All modified and coverable lines are covered by tests. 📢 Thoughts on this report? Let us know! |
| desiredPartitions); | ||
| topicsToSkip.add(topicName); | ||
| } else { | ||
| log.info( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logging is not required as this a default behavior before this change also
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cleaned this up a bit. PTAL.
...b-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/kafka/CreateKafkaTopicsStep.java
Outdated
Show resolved
Hide resolved
| + "These topics may not have the correct partition configuration.", | ||
| failedTopics.size(), failedTopics); | ||
| log.error(errorMessage); | ||
| throw new RuntimeException(errorMessage); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO, we should not throw exception here and log the warning is sufficient. It should not be blocker for topic creation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Feel pretty strongly that failure to configure these items during a setup job should be fatal if the user has this functionality enabled. Same for topic creation and most other things during setup jobs do - its better to fail loudly. Failures may get auto-triaged by restart attempts, quiet ignores can't be.
| if (existingTopics.contains(topicName)) { | ||
| log.info("Topic {} already exists - skipping creation", topicName); | ||
| topicsToSkip.add(topicName); | ||
| // For existing topics, check if partition count needs to be increased |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO, this new block can have its own separate function for code readability
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Take a look and let me know if that doesn't address core concern here. Made the log messages a bit more consistant to ease grepping for the various categories of outcome.
|
IMO, this should be controlled by feature flag like: DATAHUB_AUTO_INCREASE_PARTITIONS |
| topicsToSkip.add(topicName); | ||
| // For existing topics, check if partition count needs to be increased | ||
| try { | ||
| int currentPartitions = getCurrentPartitionCount(adminClient, topicName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of doing single API call for each existing, can this be batch. The goal is to have single API call for all existing topics
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
Automatically increases partition counts for existing Kafka topics when configured values exceed current counts during upgrades. Adds DATAHUB_AUTO_INCREASE_PARTITIONS flag for operator control. Key changes: - Add DATAHUB_AUTO_INCREASE_PARTITIONS environment variable (defaults to true) - Batch describe topics API call for efficiency (1 call vs N calls) - Extract fetchPartitionCountsForExistingTopics method for readability - Improve logging with consistent "Checking kafka topic" format - Change partition reduction log from WARN to ERROR level - Add comprehensive test coverage for auto-increase scenarios 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <[email protected]>
Excellent suggestion. Done. Overloading that was naughty. |
Add new kafka.setup.autoIncreasePartitions property to the non-sensitive properties list in PropertiesCollectorConfigurationTest to fix test failure. This property controls automatic partition increases for Kafka topics and does not contain sensitive data. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <[email protected]>
Bundle ReportChanges will increase total bundle size by 5.35kB (0.02%) ⬆️. This is within the configured threshold ✅ Detailed changes
Affected Assets, Files, and Routes:view changes for bundle: datahub-react-web-esmAssets Changed:
|
Update error message in CreateKafkaTopicsStep to accurately reflect that failedTopics tracks all types of topic failures (creation, configuration, partition count checks), not just partition count verification. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <[email protected]>
Extends CreateKafkaTopicsStep to detect and automatically increase partition counts for existing topics that have fewer partitions than configured.
Previously, the upgrade step only created new topics and skipped existing ones. Now it checks existing topics and increases their partition count if they're under-provisioned. The step fails if partition count checks fail, ensuring operators are alerted to configuration issues. Note that Kafka does not support reducing partition counts, so topics with more partitions than configured are left unchanged with a warning.
Changes:
Add getCurrentPartitionCount() method to check existing partition counts
Track partitionsToIncrease map for topics that need upsizing
Track failedTopics list and throw exception if any checks fail
Add comprehensive test coverage for upsizing and failure scenarios
Update kafka-config.md to document automatic partition upsizing behavior
The PR conforms to DataHub's Contributing Guideline (particularly PR Title Format)
Links to related issues (if applicable)
Tests for the changes have been added/updated (if applicable)
Docs related to the changes have been added/updated (if applicable). If a new feature has been added a Usage Guide has been added for the same.
For any breaking change/potential downtime/deprecation/big changes an entry has been made in Updating DataHub