Skip to content

[exporter/kafka] add support for partitioning kafka records#47243

Open
VihasMakwana wants to merge 6 commits intoopen-telemetry:mainfrom
VihasMakwana:kafka-partition-extension
Open

[exporter/kafka] add support for partitioning kafka records#47243
VihasMakwana wants to merge 6 commits intoopen-telemetry:mainfrom
VihasMakwana:kafka-partition-extension

Conversation

@VihasMakwana
Copy link
Copy Markdown
Contributor

Description

This PR adds support for partitioning records based on known partitioners exposed by franz-go library.
A new config, record_partitioner is exposed for this purpose. Moreover, users can also implement their own partitioning logic by implementing a new extension and implementing the RecordPartitionerExtension interface in the extension.

Link to tracking issue

Fixes #46931

Testing

Added

Documentation

Added

Co-authored-by: Paulo Dias <44772900+paulojmdias@users.noreply.github.com>
Copy link
Copy Markdown
Member

@paulojmdias paulojmdias left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @VihasMakwana! Some notes for review

@VihasMakwana VihasMakwana force-pushed the kafka-partition-extension branch from 8500606 to 10d179f Compare March 31, 2026 06:37
@VihasMakwana VihasMakwana force-pushed the kafka-partition-extension branch from 10d179f to 6735ac6 Compare March 31, 2026 06:49
Copy link
Copy Markdown
Member

@paulojmdias paulojmdias left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small nits

}

func (c *RecordPartitionerConfig) Validate() error {
fmt.Println(c.Type)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
fmt.Println(c.Type)

Maybe it was kept here by mistake


func buildPartitionerOpt(cfg RecordPartitionerConfig, host component.Host) (kgo.Opt, error) {
switch cfg.Type {
case "", RecordPartitionerTypeSaramaCompatible:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
case "", RecordPartitionerTypeSaramaCompatible:
case RecordPartitionerTypeSaramaCompatible:

We don't need to validate it empty right?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[kafka/exporter] Provide a way to use custom partitioner implementation

3 participants