Skip to content

ingest-storage: support multiple kafka seed broker addresses#14328

Open
srpvpn wants to merge 7 commits intografana:mainfrom
srpvpn:fix-ingest-storage-kafka-multi-seed-14303
Open

ingest-storage: support multiple kafka seed broker addresses#14328
srpvpn wants to merge 7 commits intografana:mainfrom
srpvpn:fix-ingest-storage-kafka-multi-seed-14303

Conversation

@srpvpn
Copy link

@srpvpn srpvpn commented Feb 11, 2026

Summary

Add support for multiple Kafka seed brokers via comma-separated values in ingest-storage.kafka.address.

Changes

  • Extended KafkaConfig.SeedBrokers() to parse and trim comma-separated broker addresses
  • Updated Kafka client initialization to use kgo.SeedBrokers(cfg.SeedBrokers()...)
  • Added unit tests for broker parsing and CreateTopic with multiple brokers
  • Updated documentation to reflect CSV format support

Testing

go test ./pkg/storage/ingest -run TestKafkaConfig_SeedBrokers -v
go test ./pkg/storage/ingest -run 'TestCreateTopic/should_support_multiple_comma-separated_seed_broker_addresses' -v
go test ./pkg/storage/ingest -run 'TestConfig_(Validate|GetConsumerGroup)|TestKafkaConfig_SeedBrokers|TestCreateTopic' -v

Fix #14303


Note

Medium Risk
Touches Kafka connection/bootstrap configuration and its parsing/validation, which can impact ingest availability if misconfigured. Changes are scoped and covered by updated tests and docs.

Overview
Adds support for configuring multiple Kafka seed brokers for ingest storage by changing ingest-storage.kafka.address from a single string to a CSV-backed list and wiring it through Kafka client creation (kgo.SeedBrokers(cfg.Address...)).

Updates validation, tests, fixture generation, and various ingest/distributor test setups to use the new list type, and refreshes CLI help + docs + changelog to document the comma-separated format.

Written by Cursor Bugbot for commit 8009b06. This will update automatically on new commits. Configure here.

@srpvpn srpvpn requested review from a team and tacole02 as code owners February 11, 2026 20:46
@CLAassistant
Copy link

CLAassistant commented Feb 11, 2026

CLA assistant check
All committers have signed the CLA.

Copy link
Contributor

@56quarters 56quarters left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR! I'm not super familiar with ingest storage so I'll let someone from the ingest squad take a look.

cfg.concurrentFetchersFetchBackoffConfig = defaultFetchBackoffConfig

f.StringVar(&cfg.Address, prefix+"address", "", "The Kafka backend address.")
f.StringVar(&cfg.Address, prefix+"address", "", "The Kafka seed broker address, or a comma-separated list of seed broker addresses.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a type in dskit called StringSliceCSV that handles this. There's an example here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I fixed it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, the comment above was about using the flagext.StringSliceCSV for cfg.Address. Any reason we can't do that here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay: KafkaConfig.Address now uses flagext.StringSliceCSV directly, and the flag is registered via f.Var(&cfg.Address, ...). SeedBrokers still keeps trimming and
backward-compatible fallback behavior. Also updated ingest tests/fixtures to set address through the same CSV parsing path.

Copy link
Contributor

@tacole02 tacole02 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docs look good! Thank you!

Copy link
Contributor

@narqo narqo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for iterating on that. Please see the comments. After addressing those, please check the tests. I think there are more places around Mimir, that use KafkaConfig.Address. Those tests need to be fixed.

@srpvpn srpvpn requested a review from narqo February 14, 2026 07:18
}
}

func TestKafkaConfig_AddressCSV(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is redundant and doesn't bring value: it checks what must be guarantied by the type used. Please, cut.

require.NoError(t, CreateTopic(cfg, logger))
})

t.Run("should support multiple comma-separated seed broker addresses", func(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain what this subtest has to do with TestCreateTopic — does passing multiple seed addresses change how a topic is created?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the redundant TestKafkaConfig_AddressCSV and dropped the unrelated multi-seed subtest from TestCreateTopic.

@srpvpn srpvpn requested a review from narqo February 16, 2026 19:29
@narqo
Copy link
Contributor

narqo commented Feb 18, 2026

I think there are more places around Mimir, that use KafkaConfig.Address. Those tests need to be fixed.

Judging by the broken CI, this comment still needs to be address.

@srpvpn
Copy link
Author

srpvpn commented Feb 19, 2026

@narqo Addressed. I fixed the remaining test usages of KafkaConfig.Address

@srpvpn srpvpn force-pushed the fix-ingest-storage-kafka-multi-seed-14303 branch from 2279b09 to 8009b06 Compare February 19, 2026 17:37
Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Bugbot Autofix is ON, but it could not run because the branch was deleted or merged before Autofix could start.

WriteTimeout time.Duration `yaml:"write_timeout"`
WriteClients int `yaml:"write_clients"`
// Address is a list of seed brokers. The config name is singular for backward compatibility.
Address flagext.StringSliceCSV `yaml:"address"`
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Broker addresses not trimmed, spaces cause connection failures

Medium Severity

flagext.StringSliceCSV.Set splits on commas without trimming whitespace from individual items. A common user input like "broker1:9092, broker2:9092" produces ["broker1:9092", " broker2:9092"] — note the leading space. The franz-go parseBrokerAddr function does not trim whitespace either, so the untrimmed address broker2:9092 causes DNS resolution failures. No validation or trimming is performed in Validate() or anywhere in the pipeline to catch this.

Additional Locations (1)

Fix in Cursor Fix in Web

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

How to configure multiple Kafka broker addresses for AWS MSK in ingest_storage?

5 participants

Comments