Skip to content

Commit 7f5cdac

Browse files
[exporter/kafka] Add record_headers config to set static headers on outgoing records (#47201)
## Description Resolves #47193 This PR adds a new `record_headers` configuration option to the Kafka exporter **Changes made:** * added `record_headers` to `Config` struct * updated `makeFranzMessages` in the `franz-go` client wrapper to iterate over the configured map and append them as `kgo.RecordHeader` to the outgoing messages * added `TestMakeFranzMessages_RecordHeaders` to verify headers are successfully attached, bringing test suite to 147 passing tests * documented the new field and provided an example in the `README.md` * added `.chloggen/fix-47193.yaml` ## Testing - [x] added `TestMakeFranzMessages_RecordHeaders` - [x] all 147 tests passed locally --------- Signed-off-by: singhvibhanshu <singhvibhanshu@hotmail.com>
1 parent 138fbe1 commit 7f5cdac

10 files changed

Lines changed: 104 additions & 10 deletions

File tree

.chloggen/fix_47193.yaml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
7+
component: exporter/kafka
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add `record_headers` configuration option to set static headers on outgoing records
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [47193]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

exporter/kafkaexporter/README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ The following settings can be optionally configured:
4848
- `topic_from_metadata_key` (default = ""): The name of the metadata key whose value should be used as the message's topic. Useful to dynamically produce to topics based on request inputs. It takes precedence over `topic_from_attribute` and `topic` settings.
4949
- `topic_from_attribute` (default = ""): Specify the resource attribute whose value should be used as the message's topic. See [Destination Topic](#destination-topic) below for more details.
5050
- `include_metadata_keys` (default = []): Specifies a list of metadata keys to propagate as Kafka message headers. If one or more keys aren't found in the metadata, they are ignored. When `sending_queue::batch` is enabled, `sending_queue::batch::partition::metadata_keys` must be configured and include all values configured in `include_metadata_keys`.
51+
- `record_headers` (default = {}): Specifies a map of key/value pairs to set as static headers on every outgoing Kafka record.
5152
- `partition_traces_by_id` (default = false): configures the exporter to include the trace ID as the message key in trace messages sent to kafka. *Please note:* this setting does not have any effect on Jaeger encoding exporters since Jaeger exporters include trace ID as the message key by default.
5253
- `partition_metrics_by_resource_attributes` (default = false) configures the exporter to include the hash of sorted resource attributes as the message partitioning key in metric messages sent to kafka.
5354
- `partition_logs_by_resource_attributes` (default = false) configures the exporter to include the hash of sorted resource attributes as the message partitioning key in log messages sent to kafka.
@@ -158,6 +159,9 @@ exporters:
158159
kafka:
159160
brokers:
160161
- localhost:9092
162+
record_headers:
163+
my-custom-header: "my-custom-value"
164+
another-header: "another-value"
161165
```
162166
163167
## Destination Topic

exporter/kafkaexporter/config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"slices"
1111

1212
"go.opentelemetry.io/collector/component"
13+
"go.opentelemetry.io/collector/config/configopaque"
1314
"go.opentelemetry.io/collector/config/configoptional"
1415
"go.opentelemetry.io/collector/config/configretry"
1516
"go.opentelemetry.io/collector/confmap"
@@ -142,6 +143,9 @@ type Config struct {
142143
// IncludeMetadataKeys indicates the receiver's client metadata keys to propagate as Kafka message headers.
143144
IncludeMetadataKeys []string `mapstructure:"include_metadata_keys"`
144145

146+
// RecordHeaders sets static headers on every outgoing Kafka record.
147+
RecordHeaders configopaque.MapList `mapstructure:"record_headers"`
148+
145149
// TopicFromAttribute is the name of the attribute to use as the topic name.
146150
TopicFromAttribute string `mapstructure:"topic_from_attribute"`
147151

exporter/kafkaexporter/config.schema.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,9 @@ properties:
7171
profiles:
7272
description: Profiles holds configuration about how profiles should be sent to Kafka.
7373
$ref: signal_config
74+
record_headers:
75+
description: RecordHeaders sets static headers on every outgoing Kafka record.
76+
$ref: go.opentelemetry.io/collector/config/configopaque.map_list
7477
record_partitioner:
7578
description: RecordPartitioner configures how Kafka records are assigned to partitions. The default ("sarama_compatible") retains the legacy Sarama-compatible hashing behavior. Set to "sticky", "round_robin", or "least_backup" to use one of the built-in franz-go partitioners, or "extension" to delegate to a custom extension.
7679
$ref: record_partitioner_config

exporter/kafkaexporter/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ require (
2121
go.opentelemetry.io/collector/client v1.55.1-0.20260409104450-d686cf9058ce
2222
go.opentelemetry.io/collector/component v1.55.1-0.20260409104450-d686cf9058ce
2323
go.opentelemetry.io/collector/component/componenttest v0.149.1-0.20260409104450-d686cf9058ce
24+
go.opentelemetry.io/collector/config/configopaque v1.55.1-0.20260409104450-d686cf9058ce
2425
go.opentelemetry.io/collector/config/configoptional v1.55.1-0.20260409104450-d686cf9058ce
2526
go.opentelemetry.io/collector/config/configretry v1.55.1-0.20260409104450-d686cf9058ce
2627
go.opentelemetry.io/collector/confmap v1.55.1-0.20260409104450-d686cf9058ce
@@ -104,7 +105,6 @@ require (
104105
github.com/xdg-go/stringprep v1.0.4 // indirect
105106
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
106107
go.opentelemetry.io/collector/config/configcompression v1.55.1-0.20260409104450-d686cf9058ce // indirect
107-
go.opentelemetry.io/collector/config/configopaque v1.55.1-0.20260409104450-d686cf9058ce // indirect
108108
go.opentelemetry.io/collector/config/configtls v1.55.1-0.20260409104450-d686cf9058ce // indirect
109109
go.opentelemetry.io/collector/consumer/consumererror/xconsumererror v0.149.1-0.20260409104450-d686cf9058ce // indirect
110110
go.opentelemetry.io/collector/consumer/consumertest v0.149.1-0.20260409104450-d686cf9058ce // indirect

exporter/kafkaexporter/internal/kafkaclient/franzgo.go

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
"github.com/twmb/franz-go/pkg/kerr"
1212
"github.com/twmb/franz-go/pkg/kgo"
13+
"go.opentelemetry.io/collector/config/configopaque"
1314
"go.opentelemetry.io/collector/consumer/consumererror"
1415
)
1516

@@ -45,24 +46,27 @@ func recordUserSize(r *kgo.Record) int {
4546
type FranzSyncProducer struct {
4647
client *kgo.Client
4748
metadataKeys []string
49+
recordHeaders configopaque.MapList
4850
maxMessageBytes int
4951
}
5052

5153
// NewFranzSyncProducer Franz-go producer from a kgo.Client and a Messenger.
5254
func NewFranzSyncProducer(client *kgo.Client,
5355
metadataKeys []string,
56+
recordHeaders configopaque.MapList,
5457
maxMessageBytes int,
5558
) *FranzSyncProducer {
5659
return &FranzSyncProducer{
5760
client: client,
5861
metadataKeys: metadataKeys,
62+
recordHeaders: recordHeaders,
5963
maxMessageBytes: maxMessageBytes,
6064
}
6165
}
6266

6367
// ExportData sends a batch of messages to Kafka
6468
func (p *FranzSyncProducer) ExportData(ctx context.Context, msgs Messages) error {
65-
messages := makeFranzMessages(msgs)
69+
messages := makeFranzMessages(msgs, p.recordHeaders)
6670
setMessageHeaders(ctx, messages, p.metadataKeys)
6771
result := p.client.ProduceSync(ctx, messages...)
6872
var errs []error
@@ -93,18 +97,24 @@ func (p *FranzSyncProducer) Close() error {
9397
return nil
9498
}
9599

96-
func makeFranzMessages(messages Messages) []*kgo.Record {
100+
func makeFranzMessages(messages Messages, recordHeaders configopaque.MapList) []*kgo.Record {
97101
msgs := make([]*kgo.Record, 0, messages.Count)
98102
for _, msg := range messages.TopicMessages {
99103
for _, message := range msg.Messages {
100-
msg := &kgo.Record{Topic: msg.Topic}
104+
record := &kgo.Record{Topic: msg.Topic}
101105
if message.Key != nil {
102-
msg.Key = message.Key
106+
record.Key = message.Key
103107
}
104108
if message.Value != nil {
105-
msg.Value = message.Value
109+
record.Value = message.Value
106110
}
107-
msgs = append(msgs, msg)
111+
for _, pair := range recordHeaders {
112+
record.Headers = append(record.Headers, kgo.RecordHeader{
113+
Key: pair.Name,
114+
Value: []byte(string(pair.Value)),
115+
})
116+
}
117+
msgs = append(msgs, record)
108118
}
109119
}
110120
return msgs

exporter/kafkaexporter/internal/kafkaclient/franzgo_test.go

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ import (
1212
"github.com/twmb/franz-go/pkg/kerr"
1313
"github.com/twmb/franz-go/pkg/kfake"
1414
"github.com/twmb/franz-go/pkg/kgo"
15+
"go.opentelemetry.io/collector/client"
16+
"go.opentelemetry.io/collector/config/configopaque"
1517
"go.opentelemetry.io/collector/consumer/consumererror"
1618

1719
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter/internal/marshaler"
@@ -33,7 +35,7 @@ func TestExportData_MessageTooLarge(t *testing.T) {
3335
require.NoError(t, err)
3436
t.Cleanup(client.Close)
3537

36-
producer := NewFranzSyncProducer(client, nil, maxMessageBytes)
38+
producer := NewFranzSyncProducer(client, nil, nil, maxMessageBytes)
3739

3840
// Create a message larger than maxMessageBytes to trigger MessageTooLarge.
3941
largeValue := []byte(strings.Repeat("x", maxMessageBytes*2))
@@ -64,3 +66,46 @@ func TestExportData_MessageTooLarge(t *testing.T) {
6466
assert.Contains(t, err.Error(), "record size")
6567
assert.Contains(t, err.Error(), "exceeds max")
6668
}
69+
70+
func TestMakeFranzMessages_RecordHeaders(t *testing.T) {
71+
recordHeaders := configopaque.MapList{
72+
{Name: "static-key-ONLY", Value: configopaque.String("static-value")},
73+
{Name: "shared-key", Value: configopaque.String("static-value-override")},
74+
}
75+
76+
md := client.NewMetadata(map[string][]string{
77+
"dynamic-key-ONLY": {"dynamic-value"},
78+
"shared-key": {"dynamic-value-wins"},
79+
})
80+
ctx := client.NewContext(t.Context(), client.Info{Metadata: md})
81+
82+
msgs := Messages{
83+
Count: 1,
84+
TopicMessages: []TopicMessages{{
85+
Topic: "test-topic",
86+
Messages: []marshaler.Message{{
87+
Value: []byte("test-payload"),
88+
}},
89+
}},
90+
}
91+
92+
records := makeFranzMessages(msgs, recordHeaders)
93+
setMessageHeaders(ctx, records, []string{"dynamic-key-ONLY", "shared-key"})
94+
95+
require.Len(t, records, 1, "expected exactly 1 record")
96+
record := records[0]
97+
98+
assert.Equal(t, "test-topic", record.Topic)
99+
assert.Equal(t, []byte("test-payload"), record.Value)
100+
101+
require.Len(t, record.Headers, 4, "expected exactly 4 headers on the record")
102+
103+
headerMap := make(map[string]string)
104+
for _, h := range record.Headers {
105+
headerMap[h.Key] = string(h.Value)
106+
}
107+
108+
assert.Equal(t, "static-value", headerMap["static-key-ONLY"], "static headers unique key failed")
109+
assert.Equal(t, "dynamic-value", headerMap["dynamic-key-ONLY"], "dynamic headers unique key failed")
110+
assert.Equal(t, "dynamic-value-wins", headerMap["shared-key"], "Precedence for common key failed")
111+
}

exporter/kafkaexporter/kafka_exporter.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ func (e *kafkaExporter[T]) Start(ctx context.Context, host component.Host) (err
106106
}
107107
e.producer = kafkaclient.NewFranzSyncProducer(producer,
108108
e.cfg.IncludeMetadataKeys,
109+
e.cfg.RecordHeaders,
109110
e.cfg.Producer.MaxMessageBytes,
110111
)
111112
return nil

exporter/kafkaexporter/kafka_exporter_e2e_bench_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func configureExporterBench[T any](
5050
messenger, err := exp.newMessenger(componenttest.NewNopHost())
5151
require.NoError(b, err)
5252
exp.messenger = messenger
53-
exp.producer = kafkaclient.NewFranzSyncProducer(client, cfg.IncludeMetadataKeys, cfg.Producer.MaxMessageBytes)
53+
exp.producer = kafkaclient.NewFranzSyncProducer(client, cfg.IncludeMetadataKeys, cfg.RecordHeaders, cfg.Producer.MaxMessageBytes)
5454

5555
b.Cleanup(func() { exp.Close(b.Context()) })
5656
}

exporter/kafkaexporter/kafka_exporter_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1299,7 +1299,7 @@ func configureExporter[T any](tb testing.TB,
12991299
require.NoError(tb, err, "failed to create messenger for metrics")
13001300

13011301
exp.messenger = messenger
1302-
exp.producer = kafkaclient.NewFranzSyncProducer(client, cfg.IncludeMetadataKeys, cfg.Producer.MaxMessageBytes)
1302+
exp.producer = kafkaclient.NewFranzSyncProducer(client, cfg.IncludeMetadataKeys, cfg.RecordHeaders, cfg.Producer.MaxMessageBytes)
13031303

13041304
tb.Cleanup(func() { assert.NoError(tb, exp.Close(tb.Context())) })
13051305
return cluster

0 commit comments

Comments
 (0)