Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions .chloggen/kafka-partition-extension.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
component: exporter/kafka

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add support for partitioning kafka records

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [46931]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
Add support for RoundRobin and LeastBackup partitioning strategies, as well as custom partitioners
provided by RecordPartitionerExtension implementations. Users can implement their own partitioning logic
and plug it into the kafka exporter via the RecordPartitionerExtension interface.


# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
7 changes: 7 additions & 0 deletions exporter/kafkaexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ The following settings can be optionally configured:
- `partition_metrics_by_resource_attributes` (default = false) configures the exporter to include the hash of sorted resource attributes as the message partitioning key in metric messages sent to kafka.
- `partition_logs_by_resource_attributes` (default = false) configures the exporter to include the hash of sorted resource attributes as the message partitioning key in log messages sent to kafka.
- `partition_logs_by_trace_id` (default = false): configures the exporter to partition log messages by trace ID, if the log record has one associated. Note: `partition_logs_by_resource_attributes` and `partition_logs_by_trace_id` are mutually exclusive, and enabling both will lead to an error.
- `record_partitioner`: configures the Kafka-level record partitioner. When unset, the default sarama-compatible sticky key partitioner is used.
- `type`: The partitioner strategy. Valid values are:
- `""` or `sarama_compatible`: Sticky key partitioner using Sarama-compatible FNV-1a hashing.
- `round_robin`: Distributes records evenly across all partitions.
- `least_backup`: Sends records to the partition with the fewest in-flight messages.
- `custom`: Delegates partitioning to a custom extension.
- `custom`: The ID of a cusom partitioner extension to be used.
- `tls`: see [TLS Configuration Settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md) for the full set of available options. Set to `tls: insecure: false` explicitly when using `AWS_MSK_IAM_OAUTHBEARER` as the authentication method.
- `auth`
- `plain_text` (Deprecated in v0.123.0: use sasl with mechanism set to PLAIN instead.)
Expand Down
44 changes: 44 additions & 0 deletions exporter/kafkaexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ import (

var _ component.Config = (*Config)(nil)

var (
errRecordPartitionerUnknownType = errors.New("unknown partitioner type")
errRecordPartitionerExtRequired = errors.New("partitioner.extension must be set when type is \"extension\"")
)

var errLogsPartitionExclusive = errors.New(
"partition_logs_by_resource_attributes and partition_logs_by_trace_id cannot both be enabled",
)
Expand All @@ -29,6 +34,36 @@ var (
errIncludeMetadataKeysNotPartitioned = errors.New("sending_queue::batch::partition::metadata_keys must include all include_metadata_keys values")
)

// RecordPartitionerConfig configures the strategy used to assign Kafka records to partitions.
type RecordPartitionerConfig struct {
// Type is the partitioning strategy. Valid values are:
// - "" or "sarama_compatible" (default): sticky key partitioner with Sarama-compatible FNV-1a hashing
// - "sticky": franz-go StickyKeyPartitioner with murmur2 hashing
// - "round_robin": round-robin across all available partitions
// - "least_backup": routes to the partition with fewest in-flight records
// - "extension": delegates to an extension implementing RecordPartitionerExtension
Type string `mapstructure:"type"`

// Extension is the component ID of an extension implementing RecordPartitionerExtension.
// Required when Type is "extension"; must be empty for all other types.
Extension *component.ID `mapstructure:"extension,omitempty"`
}

func (c *RecordPartitionerConfig) Validate() error {
switch c.Type {
case "", RecordPartitionerTypeSaramaCompatible,
RecordPartitionerTypeRoundRobin,
RecordPartitionerTypeLeastBackup:
case RecordPartitionerTypeExtension:
if c.Extension == nil {
return errRecordPartitionerExtRequired
}
default:
return fmt.Errorf("%w: %q", errRecordPartitionerUnknownType, c.Type)
}
return nil
}

// Config defines configuration for Kafka exporter.
type Config struct {
TimeoutSettings exporterhelper.TimeoutConfig `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
Expand Down Expand Up @@ -78,12 +113,21 @@ type Config struct {
// selection falls back to the Kafka client’s default strategy. Resource
// attributes are not used for the key when this option is enabled.
PartitionLogsByTraceID bool `mapstructure:"partition_logs_by_trace_id"`

// RecordPartitioner configures how Kafka records are assigned to partitions.
// The default ("sarama_compatible") retains the legacy Sarama-compatible hashing
// behavior. Set to "sticky", "round_robin", or "least_backup" to use one of the
// built-in franz-go partitioners, or "extension" to delegate to a custom extension.
RecordPartitioner RecordPartitionerConfig `mapstructure:"record_partitioner"`
}

func (c *Config) Validate() error {
if c.PartitionLogsByResourceAttributes && c.PartitionLogsByTraceID {
return errLogsPartitionExclusive
}
if err := c.RecordPartitioner.Validate(); err != nil {
return fmt.Errorf("record_partitioner: %w", err)
}
if err := validateBatchPartitionerKeys(c); err != nil {
return err
}
Expand Down
15 changes: 15 additions & 0 deletions exporter/kafkaexporter/config.schema.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,16 @@
$defs:
record_partitioner_config:
description: RecordPartitionerConfig configures the strategy used to assign Kafka records to partitions.
type: object
properties:
extension:
description: Extension is the component ID of an extension implementing RecordPartitionerExtension. Required when Type is "extension"; must be empty for all other types.
x-pointer: true
type: string
x-customType: go.opentelemetry.io/collector/component.ID
type:
description: 'Type is the partitioning strategy. Valid values are: - "" or "sarama_compatible" (default): sticky key partitioner with Sarama-compatible FNV-1a hashing - "sticky": franz-go StickyKeyPartitioner with murmur2 hashing - "round_robin": round-robin across all available partitions - "least_backup": routes to the partition with fewest in-flight records - "extension": delegates to an extension implementing RecordPartitionerExtension'
type: string
signal_config:
description: SignalConfig holds signal-specific configuration for the Kafka exporter.
type: object
Expand Down Expand Up @@ -43,6 +55,9 @@ properties:
profiles:
description: Profiles holds configuration about how profiles should be sent to Kafka.
$ref: signal_config
record_partitioner:
description: RecordPartitioner configures how Kafka records are assigned to partitions. The default ("sarama_compatible") retains the legacy Sarama-compatible hashing behavior. Set to "sticky", "round_robin", or "least_backup" to use one of the built-in franz-go partitioners, or "extension" to delegate to a custom extension.
$ref: record_partitioner_config
sending_queue:
x-optional: true
$ref: go.opentelemetry.io/collector/exporter/exporterhelper.queue_batch_config
Expand Down
34 changes: 34 additions & 0 deletions exporter/kafkaexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,40 @@ func TestLoadConfig(t *testing.T) {
PartitionLogsByTraceID: false,
},
},
{
id: component.NewIDWithName(metadata.Type, "round_robin_partitioner"),
expected: &Config{
TimeoutSettings: exporterhelper.NewDefaultTimeoutConfig(),
BackOffConfig: configretry.NewDefaultBackOffConfig(),
QueueBatchConfig: configoptional.Some(exporterhelper.NewDefaultQueueConfig()),
ClientConfig: configkafka.NewDefaultClientConfig(),
Producer: configkafka.NewDefaultProducerConfig(),
Logs: SignalConfig{Topic: defaultLogsTopic, Encoding: defaultLogsEncoding},
Metrics: SignalConfig{Topic: defaultMetricsTopic, Encoding: defaultMetricsEncoding},
Traces: SignalConfig{Topic: defaultTracesTopic, Encoding: defaultTracesEncoding},
Profiles: SignalConfig{Topic: defaultProfilesTopic, Encoding: defaultProfilesEncoding},
RecordPartitioner: RecordPartitionerConfig{
Type: RecordPartitionerTypeRoundRobin,
},
},
},
{
id: component.NewIDWithName(metadata.Type, "least_backup_partitioner"),
expected: &Config{
TimeoutSettings: exporterhelper.NewDefaultTimeoutConfig(),
BackOffConfig: configretry.NewDefaultBackOffConfig(),
QueueBatchConfig: configoptional.Some(exporterhelper.NewDefaultQueueConfig()),
ClientConfig: configkafka.NewDefaultClientConfig(),
Producer: configkafka.NewDefaultProducerConfig(),
Logs: SignalConfig{Topic: defaultLogsTopic, Encoding: defaultLogsEncoding},
Metrics: SignalConfig{Topic: defaultMetricsTopic, Encoding: defaultMetricsEncoding},
Traces: SignalConfig{Topic: defaultTracesTopic, Encoding: defaultTracesEncoding},
Profiles: SignalConfig{Topic: defaultProfilesTopic, Encoding: defaultProfilesEncoding},
RecordPartitioner: RecordPartitionerConfig{
Type: RecordPartitionerTypeLeastBackup,
},
},
},
{
id: component.NewIDWithName(metadata.Type, "per_signal_topic"),
expected: &Config{
Expand Down
6 changes: 6 additions & 0 deletions exporter/kafkaexporter/kafka_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ func (e *kafkaExporter[T]) Start(ctx context.Context, host component.Host) (err
return err
}

partitionerOpt, err := buildPartitionerOpt(e.cfg.RecordPartitioner, host)
if err != nil {
return fmt.Errorf("failed to configure record partitioner: %w", err)
}

producer, err := kafka.NewFranzSyncProducer(
ctx,
host,
Expand All @@ -94,6 +99,7 @@ func (e *kafkaExporter[T]) Start(ctx context.Context, host component.Host) (err
e.cfg.TimeoutSettings.Timeout,
e.logger,
kgo.WithHooks(kafkaclient.NewFranzProducerMetrics(tb)),
partitionerOpt,
)
if err != nil {
return err
Expand Down
66 changes: 66 additions & 0 deletions exporter/kafkaexporter/partitioner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package kafkaexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter"

import (
"fmt"

"github.com/twmb/franz-go/pkg/kgo"
"go.opentelemetry.io/collector/component"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka"
)

const (
// RecordPartitionerTypeSaramaCompatible is the default partitioner. It uses a sticky
// key partitioner with Sarama-compatible FNV-1a hashing when a record key is set,
// and a random sticky partition when no key is set.
RecordPartitionerTypeSaramaCompatible = "sarama_compatible"

// RecordPartitionerTypeRoundRobin distributes records evenly across all available
// partitions in a round-robin fashion, regardless of the record key.
RecordPartitionerTypeRoundRobin = "round_robin"

// RecordPartitionerTypeLeastBackup routes each record to the partition with the fewest
// buffered records, which can reduce produce latency under uneven load.
RecordPartitionerTypeLeastBackup = "least_backup"

// RecordPartitionerTypeExtension delegates partitioning to a user-provided extension
// that implements RecordPartitionerExtension.
RecordPartitionerTypeExtension = "custom"
)

// RecordPartitionerExtension is implemented by extensions that supply a custom Kafka record
// partitioner for use with the kafka exporter.
type RecordPartitionerExtension interface {
component.Component

GetPartitioner() kgo.Partitioner
}

func buildPartitionerOpt(cfg RecordPartitionerConfig, host component.Host) (kgo.Opt, error) {
switch cfg.Type {
case "", RecordPartitionerTypeSaramaCompatible:
return kgo.RecordPartitioner(kafka.NewSaramaCompatPartitioner()), nil
case RecordPartitionerTypeRoundRobin:
return kgo.RecordPartitioner(kgo.RoundRobinPartitioner()), nil
case RecordPartitionerTypeLeastBackup:
return kgo.RecordPartitioner(kgo.LeastBackupPartitioner()), nil
case RecordPartitionerTypeExtension:
if cfg.Extension == nil {
return nil, errRecordPartitionerExtRequired
}
ext, ok := host.GetExtensions()[*cfg.Extension]
if !ok {
return nil, fmt.Errorf("partitioner extension %q not found", cfg.Extension)
}
partExt, ok := ext.(RecordPartitionerExtension)
if !ok {
return nil, fmt.Errorf("extension %q does not implement RecordPartitionerExtension", cfg.Extension)
}
return kgo.RecordPartitioner(partExt.GetPartitioner()), nil
default:
return nil, fmt.Errorf("unknown partitioner type %q", cfg.Type)
}
}
Loading
Loading