Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions .chloggen/kafka-partition-extension.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
component: exporter/kafka

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add support for partitioning kafka records

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [46931]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
Add support for RoundRobin and LeastBackup partitioning strategies, as well as custom partitioners
provided by RecordPartitionerExtension implementations. Users can implement their own partitioning logic
and plug it into the kafka exporter via the RecordPartitionerExtension interface.


# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
41 changes: 41 additions & 0 deletions exporter/kafkaexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ The following settings can be optionally configured:
- `partition_metrics_by_resource_attributes` (default = false) configures the exporter to include the hash of sorted resource attributes as the message partitioning key in metric messages sent to kafka.
- `partition_logs_by_resource_attributes` (default = false) configures the exporter to include the hash of sorted resource attributes as the message partitioning key in log messages sent to kafka.
- `partition_logs_by_trace_id` (default = false): configures the exporter to partition log messages by trace ID, if the log record has one associated. Note: `partition_logs_by_resource_attributes` and `partition_logs_by_trace_id` are mutually exclusive, and enabling both will lead to an error.
- `record_partitioner`: configures the Kafka-level record partitioner. When unset, the default sarama-compatible sticky key partitioner is used.
- `type`: The partitioner strategy. Valid values are:
- `sarama_compatible` (default): Sticky key partitioner using Sarama-compatible FNV-1a hashing.
- `round_robin`: Distributes records evenly across all partitions.
- `least_backup`: Sends records to the partition with the fewest in-flight messages.
- `custom`: Delegates partitioning to a custom extension.
- `extension`: The ID of a cusom partitioner extension to be used.
- `tls`: see [TLS Configuration Settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md) for the full set of available options. Set to `tls: insecure: false` explicitly when using `AWS_MSK_IAM_OAUTHBEARER` as the authentication method.
- `auth`
- `plain_text` (Deprecated in v0.123.0: use sasl with mechanism set to PLAIN instead.)
Expand Down Expand Up @@ -160,3 +167,37 @@ The destination topic can be defined in a few different ways and takes priority
2. Otherwise, if `topic_from_attribute` is configured, and the corresponding attribute is found on the ingested data, the value of this attribute is used.
3. If a prior component in the collector pipeline sets the topic on the context via the `topic.WithTopic` function (from the `github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/topic` package), the value set in the context is used.
4. Finally, the `<signal>::topic` configuration is used for the signal-specific destination topic.

## Partitioning Kafka Records

Kafka topics are partitioned, meaning a topic is spread over a number of “buckets” located on different Kafka brokers.
The exporter supports multiple strategies to control how records are distributed across kafka partitions within a topic.

Available strategies for partitioning are `sarama_compatible`, `round_robin`, `least_backup` and `custom`

### Using custom partitioner

The Kafka exporter allows you to define a custom partitioning strategy via an extension. A sample config for custom partitioner would look like:

```yaml
exporters:
kafka:
brokers:
- localhost:9092
record_partitioner:
type: custom
extension: my_custom_partitioner

extensions:
my_custom_partitioner:
# your extension-specific configuration here

# rest of the pipeline config
```

Use custom partitioner if:
- Built-in strategies (round_robin, least_backup, etc.) don’t fit your needs
- You require domain-specific routing logic

> [!NOTE]
> The custom partitioner extension must implement the RecordPartitionerExtension interface (see [partitioner.go](./partitioner.go)).
44 changes: 44 additions & 0 deletions exporter/kafkaexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ import (

var _ component.Config = (*Config)(nil)

var (
errRecordPartitionerUnknownType = errors.New("unknown partitioner type")
errRecordPartitionerExtRequired = errors.New("partitioner.extension must be set when type is \"extension\"")
)

var errLogsPartitionExclusive = errors.New(
"partition_logs_by_resource_attributes and partition_logs_by_trace_id cannot both be enabled",
)
Expand All @@ -29,6 +34,36 @@ var (
errIncludeMetadataKeysNotPartitioned = errors.New("sending_queue::batch::partition::metadata_keys must include all include_metadata_keys values")
)

// RecordPartitionerConfig configures the strategy used to assign Kafka records to partitions.
type RecordPartitionerConfig struct {
// Type is the partitioning strategy. Valid values are:
// - "" or "sarama_compatible" (default): sticky key partitioner with Sarama-compatible FNV-1a hashing
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// - "" or "sarama_compatible" (default): sticky key partitioner with Sarama-compatible FNV-1a hashing
// - "sarama_compatible" (default): sticky key partitioner with Sarama-compatible FNV-1a hashing

Do we need to handle an empty string here? The default config sets the value.

// - "sticky": franz-go StickyKeyPartitioner with murmur2 hashing
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem to be implemented?

Also, there's StickyPartitioner and StickyKeyPartitioner, so the name is a bit misleading.
Also also, what if someone wants to use StickyKeyPartitioner with a different hasher?
Also also also, what if someone wants to use the more modern UniformBytesPartitioner?

I think this simple name-based config is too simplistic. I suggest we make it more like this:

exporters:
  kafka:
    ...
    record_partitioner:
      # exactly one of the below is required, defaulting to sticky_key with sarama_compat hasher
      least_backup:
      round_robin:
      sticky:
      sticky_key:
        hasher: sarama_compat # == SaramaCompatHasher(fnv32a)
      uniform_bytes:
        bytes: 123
        adaptive: true
        keys: false
        hasher: kafka_murmur2
      extension: whatever

// - "round_robin": round-robin across all available partitions
// - "least_backup": routes to the partition with fewest in-flight records
// - "extension": delegates to an extension implementing RecordPartitionerExtension
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// - "extension": delegates to an extension implementing RecordPartitionerExtension

Can we simplify and make Type and Extension mutually exclusive? Then just setting Extension is sufficient.

Type string `mapstructure:"type"`

// Extension is the component ID of an extension implementing RecordPartitionerExtension.
// Required when Type is "extension"; must be empty for all other types.
Extension *component.ID `mapstructure:"extension,omitempty"`
}

func (c *RecordPartitionerConfig) Validate() error {
switch c.Type {
case RecordPartitionerTypeSaramaCompatible,
RecordPartitionerTypeRoundRobin,
RecordPartitionerTypeLeastBackup:
case RecordPartitionerTypeCustom:
if c.Extension == nil {
return errRecordPartitionerExtRequired
}
default:
return fmt.Errorf("%w: %q", errRecordPartitionerUnknownType, c.Type)
}
return nil
}

// Config defines configuration for Kafka exporter.
type Config struct {
TimeoutSettings exporterhelper.TimeoutConfig `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
Expand Down Expand Up @@ -78,12 +113,21 @@ type Config struct {
// selection falls back to the Kafka client’s default strategy. Resource
// attributes are not used for the key when this option is enabled.
PartitionLogsByTraceID bool `mapstructure:"partition_logs_by_trace_id"`

// RecordPartitioner configures how Kafka records are assigned to partitions.
// The default ("sarama_compatible") retains the legacy Sarama-compatible hashing
// behavior. Set to "sticky", "round_robin", or "least_backup" to use one of the
// built-in franz-go partitioners, or "extension" to delegate to a custom extension.
RecordPartitioner RecordPartitionerConfig `mapstructure:"record_partitioner"`
}

func (c *Config) Validate() error {
if c.PartitionLogsByResourceAttributes && c.PartitionLogsByTraceID {
return errLogsPartitionExclusive
}
if err := c.RecordPartitioner.Validate(); err != nil {
return fmt.Errorf("record_partitioner: %w", err)
}
if err := validateBatchPartitionerKeys(c); err != nil {
return err
}
Expand Down
15 changes: 15 additions & 0 deletions exporter/kafkaexporter/config.schema.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,16 @@
$defs:
record_partitioner_config:
description: RecordPartitionerConfig configures the strategy used to assign Kafka records to partitions.
type: object
properties:
extension:
description: Extension is the component ID of an extension implementing RecordPartitionerExtension. Required when Type is "extension"; must be empty for all other types.
x-pointer: true
type: string
x-customType: go.opentelemetry.io/collector/component.ID
type:
description: 'Type is the partitioning strategy. Valid values are: - "" or "sarama_compatible" (default): sticky key partitioner with Sarama-compatible FNV-1a hashing - "sticky": franz-go StickyKeyPartitioner with murmur2 hashing - "round_robin": round-robin across all available partitions - "least_backup": routes to the partition with fewest in-flight records - "extension": delegates to an extension implementing RecordPartitionerExtension'
type: string
signal_config:
description: SignalConfig holds signal-specific configuration for the Kafka exporter.
type: object
Expand Down Expand Up @@ -43,6 +55,9 @@ properties:
profiles:
description: Profiles holds configuration about how profiles should be sent to Kafka.
$ref: signal_config
record_partitioner:
description: RecordPartitioner configures how Kafka records are assigned to partitions. The default ("sarama_compatible") retains the legacy Sarama-compatible hashing behavior. Set to "sticky", "round_robin", or "least_backup" to use one of the built-in franz-go partitioners, or "extension" to delegate to a custom extension.
$ref: record_partitioner_config
sending_queue:
x-optional: true
$ref: go.opentelemetry.io/collector/exporter/exporterhelper.queue_batch_config
Expand Down
56 changes: 56 additions & 0 deletions exporter/kafkaexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,43 @@ func TestLoadConfig(t *testing.T) {
PartitionMetricsByResourceAttributes: true,
PartitionLogsByResourceAttributes: true,
PartitionLogsByTraceID: false,
RecordPartitioner: RecordPartitionerConfig{
Type: "sarama_compatible",
},
},
},
{
id: component.NewIDWithName(metadata.Type, "round_robin_partitioner"),
expected: &Config{
TimeoutSettings: exporterhelper.NewDefaultTimeoutConfig(),
BackOffConfig: configretry.NewDefaultBackOffConfig(),
QueueBatchConfig: configoptional.Some(exporterhelper.NewDefaultQueueConfig()),
ClientConfig: configkafka.NewDefaultClientConfig(),
Producer: configkafka.NewDefaultProducerConfig(),
Logs: SignalConfig{Topic: defaultLogsTopic, Encoding: defaultLogsEncoding},
Metrics: SignalConfig{Topic: defaultMetricsTopic, Encoding: defaultMetricsEncoding},
Traces: SignalConfig{Topic: defaultTracesTopic, Encoding: defaultTracesEncoding},
Profiles: SignalConfig{Topic: defaultProfilesTopic, Encoding: defaultProfilesEncoding},
RecordPartitioner: RecordPartitionerConfig{
Type: RecordPartitionerTypeRoundRobin,
},
},
},
{
id: component.NewIDWithName(metadata.Type, "least_backup_partitioner"),
expected: &Config{
TimeoutSettings: exporterhelper.NewDefaultTimeoutConfig(),
BackOffConfig: configretry.NewDefaultBackOffConfig(),
QueueBatchConfig: configoptional.Some(exporterhelper.NewDefaultQueueConfig()),
ClientConfig: configkafka.NewDefaultClientConfig(),
Producer: configkafka.NewDefaultProducerConfig(),
Logs: SignalConfig{Topic: defaultLogsTopic, Encoding: defaultLogsEncoding},
Metrics: SignalConfig{Topic: defaultMetricsTopic, Encoding: defaultMetricsEncoding},
Traces: SignalConfig{Topic: defaultTracesTopic, Encoding: defaultTracesEncoding},
Profiles: SignalConfig{Topic: defaultProfilesTopic, Encoding: defaultProfilesEncoding},
RecordPartitioner: RecordPartitionerConfig{
Type: RecordPartitionerTypeLeastBackup,
},
},
},
{
Expand Down Expand Up @@ -111,6 +148,9 @@ func TestLoadConfig(t *testing.T) {
IncludeMetadataKeys: []string{
"metadata_key",
},
RecordPartitioner: RecordPartitionerConfig{
Type: "sarama_compatible",
},
},
},
{
Expand All @@ -137,6 +177,9 @@ func TestLoadConfig(t *testing.T) {
Topic: "otlp_profiles",
Encoding: "per_signal_encoding",
},
RecordPartitioner: RecordPartitionerConfig{
Type: "sarama_compatible",
},
},
},
{
Expand Down Expand Up @@ -180,6 +223,9 @@ func TestLoadConfig(t *testing.T) {
Encoding: "otlp_proto",
},
IncludeMetadataKeys: []string{"metadata_key"},
RecordPartitioner: RecordPartitionerConfig{
Type: "sarama_compatible",
},
},
},
}
Expand Down Expand Up @@ -226,6 +272,16 @@ func TestLoadConfigFailed(t *testing.T) {
errorContains: `sending_queue::batch::partition::metadata_keys must include all include_metadata_keys values: missing "required_key" from sending_queue::batch::partition::metadata_keys=[metadata_key]`,
configFile: "config-batch-partition-validation-failed.yaml",
},
{
id: component.NewIDWithName(metadata.Type, "invalid_partitioner"),
errorContains: errRecordPartitionerUnknownType.Error(),
configFile: "config-partitioning-failed.yaml",
},
{
id: component.NewIDWithName(metadata.Type, "extension_not_set"),
errorContains: errRecordPartitionerExtRequired.Error(),
configFile: "config-partitioning-failed.yaml",
},
}

for _, tt := range tests {
Expand Down
3 changes: 3 additions & 0 deletions exporter/kafkaexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ func createDefaultConfig() component.Config {
PartitionMetricsByResourceAttributes: defaultPartitionMetricsByResourceAttributesEnabled,
PartitionLogsByResourceAttributes: defaultPartitionLogsByResourceAttributesEnabled,
PartitionLogsByTraceID: defaultPartitionLogsByTraceIDEnabled,
RecordPartitioner: RecordPartitionerConfig{
Type: RecordPartitionerTypeSaramaCompatible,
},
}
}

Expand Down
6 changes: 6 additions & 0 deletions exporter/kafkaexporter/kafka_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ func (e *kafkaExporter[T]) Start(ctx context.Context, host component.Host) (err
return err
}

partitionerOpt, err := buildPartitionerOpt(e.cfg.RecordPartitioner, host)
if err != nil {
return fmt.Errorf("failed to configure record partitioner: %w", err)
}

producer, err := kafka.NewFranzSyncProducer(
ctx,
host,
Expand All @@ -94,6 +99,7 @@ func (e *kafkaExporter[T]) Start(ctx context.Context, host component.Host) (err
e.cfg.TimeoutSettings.Timeout,
e.logger,
kgo.WithHooks(kafkaclient.NewFranzProducerMetrics(tb)),
partitionerOpt,
)
if err != nil {
return err
Expand Down
66 changes: 66 additions & 0 deletions exporter/kafkaexporter/partitioner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package kafkaexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter"

import (
"fmt"

"github.com/twmb/franz-go/pkg/kgo"
"go.opentelemetry.io/collector/component"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka"
)

const (
// RecordPartitionerTypeSaramaCompatible is the default partitioner. It uses a sticky
// key partitioner with Sarama-compatible FNV-1a hashing when a record key is set,
// and a random sticky partition when no key is set.
RecordPartitionerTypeSaramaCompatible = "sarama_compatible"

// RecordPartitionerTypeRoundRobin distributes records evenly across all available
// partitions in a round-robin fashion, regardless of the record key.
RecordPartitionerTypeRoundRobin = "round_robin"

// RecordPartitionerTypeLeastBackup routes each record to the partition with the fewest
// buffered records, which can reduce produce latency under uneven load.
RecordPartitionerTypeLeastBackup = "least_backup"

// RecordPartitionerTypeCustom delegates partitioning to a user-provided extension
// that implements RecordPartitionerExtension.
RecordPartitionerTypeCustom = "custom"
)

// RecordPartitionerExtension is implemented by extensions that supply a custom Kafka record
// partitioner for use with the kafka exporter.
type RecordPartitionerExtension interface {
component.Component

GetPartitioner() kgo.Partitioner
}

func buildPartitionerOpt(cfg RecordPartitionerConfig, host component.Host) (kgo.Opt, error) {
switch cfg.Type {
case RecordPartitionerTypeSaramaCompatible:
return kgo.RecordPartitioner(kafka.NewSaramaCompatPartitioner()), nil
case RecordPartitionerTypeRoundRobin:
return kgo.RecordPartitioner(kgo.RoundRobinPartitioner()), nil
case RecordPartitionerTypeLeastBackup:
return kgo.RecordPartitioner(kgo.LeastBackupPartitioner()), nil
case RecordPartitionerTypeCustom:
if cfg.Extension == nil {
return nil, errRecordPartitionerExtRequired
}
ext, ok := host.GetExtensions()[*cfg.Extension]
if !ok {
return nil, fmt.Errorf("partitioner extension %q not found", cfg.Extension)
}
partExt, ok := ext.(RecordPartitionerExtension)
if !ok {
return nil, fmt.Errorf("extension %q does not implement RecordPartitionerExtension", cfg.Extension)
}
return kgo.RecordPartitioner(partExt.GetPartitioner()), nil
default:
return nil, fmt.Errorf("unknown partitioner type %q", cfg.Type)
}
}
Loading
Loading