Skip to content

Commit f7d56bc

Browse files
[exporter/kafka] Remove messageHeaders abstraction since Sarama is deprecated (#46815)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description The generic header utility functions (`metadataToHeaders[H]`, `setMessageHeaders[M,H]`, `setHeaders[M,H]`) in `internal/kafkaclient/headers.go` were designed to abstract over Sarama and franz-go types via Go generics. Now that Sarama has been fully removed, the generics are only ever instantiated with `kgo.RecordHeader` / `*kgo.Record`. I decided to add a changelog file to give some visibility, but this is not expected to change any user behaviour. <!-- Issue number (e.g. #1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes #46809 <!--Describe what testing was performed and which tests were added.--> #### Testing Updated unit tests, and below are the values of the benchmark tests after the changes. ```sh goos: darwin goarch: arm64 pkg: github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter/internal/kafkaclient cpu: Apple M3 Pro │ headers_fix.txt │ headers_remove.txt │ │ sec/op │ sec/op vs base │ MetadataToHeaders/NoMatch/keys=1-12 26.56n ± 3% 24.21n ± 3% -8.83% (p=0.000 n=10) MetadataToHeaders/NoMatch/keys=5-12 119.7n ± 4% 104.7n ± 4% -12.57% (p=0.000 n=10) MetadataToHeaders/NoMatch/keys=10-12 227.7n ± 3% 198.2n ± 2% -12.96% (p=0.000 n=10) MetadataToHeaders/AllMatch/keys=1-12 65.62n ± 3% 57.62n ± 0% -12.21% (p=0.000 n=10) MetadataToHeaders/AllMatch/keys=5-12 278.4n ± 3% 245.4n ± 2% -11.85% (p=0.000 n=10) MetadataToHeaders/AllMatch/keys=10-12 506.2n ± 4% 442.7n ± 3% -12.55% (p=0.000 n=10) MetadataToHeaders/NoKeys-12 1.430n ± 5% 1.425n ± 6% ~ (p=0.280 n=10) geomean 71.53n 64.17n -10.29% │ headers_fix.txt │ headers_remove.txt │ │ B/op │ B/op vs base │ MetadataToHeaders/NoMatch/keys=1-12 0.000 ± 0% 0.000 ± 0% ~ (p=1.000 n=10) ¹ MetadataToHeaders/NoMatch/keys=5-12 0.000 ± 0% 0.000 ± 0% ~ (p=1.000 n=10) ¹ MetadataToHeaders/NoMatch/keys=10-12 0.000 ± 0% 0.000 ± 0% ~ (p=1.000 n=10) ¹ MetadataToHeaders/AllMatch/keys=1-12 72.00 ± 0% 72.00 ± 0% ~ (p=1.000 n=10) ¹ MetadataToHeaders/AllMatch/keys=5-12 328.0 ± 0% 328.0 ± 0% ~ (p=1.000 n=10) ¹ MetadataToHeaders/AllMatch/keys=10-12 656.0 ± 0% 656.0 ± 0% ~ (p=1.000 n=10) ¹ MetadataToHeaders/NoKeys-12 0.000 ± 0% 0.000 ± 0% ~ (p=1.000 n=10) ¹ geomean ² +0.00% ² ¹ all samples are equal ² summaries must be >0 to compute geomean │ headers_fix.txt │ headers_remove.txt │ │ allocs/op │ allocs/op vs base │ MetadataToHeaders/NoMatch/keys=1-12 0.000 ± 0% 0.000 ± 0% ~ (p=1.000 n=10) ¹ MetadataToHeaders/NoMatch/keys=5-12 0.000 ± 0% 0.000 ± 0% ~ (p=1.000 n=10) ¹ MetadataToHeaders/NoMatch/keys=10-12 0.000 ± 0% 0.000 ± 0% ~ (p=1.000 n=10) ¹ MetadataToHeaders/AllMatch/keys=1-12 3.000 ± 0% 3.000 ± 0% ~ (p=1.000 n=10) ¹ MetadataToHeaders/AllMatch/keys=5-12 11.00 ± 0% 11.00 ± 0% ~ (p=1.000 n=10) ¹ MetadataToHeaders/AllMatch/keys=10-12 21.00 ± 0% 21.00 ± 0% ~ (p=1.000 n=10) ¹ MetadataToHeaders/NoKeys-12 0.000 ± 0% 0.000 ± 0% ~ (p=1.000 n=10) ¹ geomean ² +0.00% ² ¹ all samples are equal ² summaries must be >0 to compute geomean ``` <!--Describe the documentation added.--> #### Documentation <!--Please delete paragraphs that you did not use before submitting.--> --------- Signed-off-by: Paulo Dias <paulodias.gm@gmail.com> Co-authored-by: Christos Markou <chrismarkou92@gmail.com>
1 parent d843e0e commit f7d56bc

4 files changed

Lines changed: 63 additions & 101 deletions

File tree

exporter/kafkaexporter/internal/kafkaclient/franzgo.go

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,7 @@ func NewFranzSyncProducer(client *kgo.Client,
3434
// ExportData sends a batch of messages to Kafka
3535
func (p *FranzSyncProducer) ExportData(ctx context.Context, msgs Messages) error {
3636
messages := makeFranzMessages(msgs)
37-
setMessageHeaders(ctx, messages, p.metadataKeys,
38-
func(key string, value []byte) kgo.RecordHeader {
39-
return kgo.RecordHeader{Key: key, Value: value}
40-
},
41-
func(m *kgo.Record) []kgo.RecordHeader { return m.Headers },
42-
func(m *kgo.Record, h []kgo.RecordHeader) { m.Headers = h },
43-
)
37+
setMessageHeaders(ctx, messages, p.metadataKeys)
4438
result := p.client.ProduceSync(ctx, messages...)
4539
var errs []error
4640
for _, r := range result {

exporter/kafkaexporter/internal/kafkaclient/headers.go

Lines changed: 14 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -6,69 +6,39 @@ package kafkaclient // import "github.com/open-telemetry/opentelemetry-collector
66
import (
77
"context"
88

9+
"github.com/twmb/franz-go/pkg/kgo"
910
"go.opentelemetry.io/collector/client"
1011
)
1112

12-
// metadataToHeaders converts metadata from the context into a slice of headers using the provided header constructor.
13-
func metadataToHeaders[H any](ctx context.Context, keys []string,
14-
makeHeader func(key string, value []byte) H,
15-
) []H {
13+
// metadataToHeaders converts context metadata into a kgo.RecordHeader slice.
14+
func metadataToHeaders(ctx context.Context, keys []string) []kgo.RecordHeader {
1615
if len(keys) == 0 {
1716
return nil
1817
}
1918
info := client.FromContext(ctx)
20-
var headers []H
19+
var headers []kgo.RecordHeader
2120
for _, key := range keys {
22-
valueSlice := info.Metadata.Get(key)
23-
for _, v := range valueSlice {
21+
for _, v := range info.Metadata.Get(key) {
2422
if headers == nil {
25-
headers = make([]H, 0, len(keys))
23+
headers = make([]kgo.RecordHeader, 0, len(keys))
2624
}
27-
headers = append(headers, makeHeader(key, []byte(v)))
25+
headers = append(headers, kgo.RecordHeader{Key: key, Value: []byte(v)})
2826
}
2927
}
3028
return headers
3129
}
3230

33-
// setMessageHeaders is a generic helper for setting headers on a slice of messages.
34-
// - messages: the messages to set headers on
35-
// - ctx: context for extracting metadata
36-
// - metadataKeys: which metadata keys to extract
37-
// - makeHeader: constructs the header type for the target client
38-
// - getHeaders: gets the headers from a message
39-
// - setHeaders: sets the headers on a message
40-
// Usage example:
41-
//
42-
// setMessageHeaders(ctx, allMessages, keys, makeHeader, getHeaders, setHeaders)
43-
func setMessageHeaders[M, H any](ctx context.Context,
44-
messages []M,
45-
metadataKeys []string,
46-
makeHeader func(key string, value []byte) H,
47-
getHeaders func(M) []H,
48-
setHeadersFunc func(M, []H),
49-
) {
50-
setHeaders(
51-
messages,
52-
metadataToHeaders(ctx, metadataKeys, makeHeader),
53-
getHeaders,
54-
setHeadersFunc,
55-
)
56-
}
57-
58-
// setHeaders sets or appends headers on each message in messages using the provided get/set functions.
59-
func setHeaders[M, H any](messages []M, headers []H,
60-
getHeaders func(M) []H,
61-
setHeaders func(M, []H),
62-
) {
31+
// setMessageHeaders extracts metadata from context and sets headers on all messages.
32+
func setMessageHeaders(ctx context.Context, messages []*kgo.Record, metadataKeys []string) {
33+
headers := metadataToHeaders(ctx, metadataKeys)
6334
if len(headers) == 0 || len(messages) == 0 {
6435
return
6536
}
66-
for i := range messages {
67-
h := getHeaders(messages[i])
68-
if len(h) == 0 {
69-
setHeaders(messages[i], headers)
37+
for _, msg := range messages {
38+
if len(msg.Headers) == 0 {
39+
msg.Headers = headers
7040
} else {
71-
setHeaders(messages[i], append(h, headers...))
41+
msg.Headers = append(msg.Headers, headers...)
7242
}
7343
}
7444
}

exporter/kafkaexporter/internal/kafkaclient/headers_bench_test.go

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,6 @@ import (
1010
"go.opentelemetry.io/collector/client"
1111
)
1212

13-
type benchHeader struct {
14-
Key string
15-
Value []byte
16-
}
17-
18-
func makeBenchHeader(key string, value []byte) benchHeader {
19-
return benchHeader{Key: key, Value: value}
20-
}
21-
2213
func BenchmarkMetadataToHeaders(b *testing.B) {
2314
// Scenario 1: Keys configured but NONE match the metadata.
2415
// This is the hot path where the fix avoids a wasted allocation.
@@ -38,7 +29,7 @@ func BenchmarkMetadataToHeaders(b *testing.B) {
3829
b.ReportAllocs()
3930
b.ResetTimer()
4031
for range b.N {
41-
_ = metadataToHeaders(ctx, keys, makeBenchHeader)
32+
_ = metadataToHeaders(ctx, keys)
4233
}
4334
})
4435
}
@@ -62,7 +53,7 @@ func BenchmarkMetadataToHeaders(b *testing.B) {
6253
b.ReportAllocs()
6354
b.ResetTimer()
6455
for range b.N {
65-
_ = metadataToHeaders(ctx, keys, makeBenchHeader)
56+
_ = metadataToHeaders(ctx, keys)
6657
}
6758
})
6859
}
@@ -79,7 +70,7 @@ func BenchmarkMetadataToHeaders(b *testing.B) {
7970
b.ReportAllocs()
8071
b.ResetTimer()
8172
for range b.N {
82-
_ = metadataToHeaders[benchHeader](ctx, nil, makeBenchHeader)
73+
_ = metadataToHeaders(ctx, nil)
8374
}
8475
})
8576
}

exporter/kafkaexporter/internal/kafkaclient/headers_test.go

Lines changed: 45 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -8,24 +8,16 @@ import (
88
"testing"
99

1010
"github.com/stretchr/testify/require"
11+
"github.com/twmb/franz-go/pkg/kgo"
1112
"go.opentelemetry.io/collector/client"
1213
)
1314

14-
type testHeader struct {
15-
Key string
16-
Value []byte
17-
}
18-
19-
func makeTestHeader(key string, value []byte) testHeader {
20-
return testHeader{Key: key, Value: value}
21-
}
22-
2315
func TestMetadataToHeaders(t *testing.T) {
2416
tests := []struct {
2517
name string
2618
makeCtx func(t *testing.T) context.Context
2719
keys []string
28-
expected []testHeader
20+
expected []kgo.RecordHeader
2921
}{
3022
{
3123
name: "nil_keys",
@@ -76,7 +68,7 @@ func TestMetadataToHeaders(t *testing.T) {
7668
})
7769
},
7870
keys: []string{"x-tenant", "x-request-id"},
79-
expected: []testHeader{
71+
expected: []kgo.RecordHeader{
8072
{Key: "x-tenant", Value: []byte("tenant-1")},
8173
{Key: "x-request-id", Value: []byte("req-42")},
8274
},
@@ -91,7 +83,7 @@ func TestMetadataToHeaders(t *testing.T) {
9183
})
9284
},
9385
keys: []string{"x-ids"},
94-
expected: []testHeader{
86+
expected: []kgo.RecordHeader{
9587
{Key: "x-ids", Value: []byte("id-1")},
9688
{Key: "x-ids", Value: []byte("id-2")},
9789
{Key: "x-ids", Value: []byte("id-3")},
@@ -108,14 +100,14 @@ func TestMetadataToHeaders(t *testing.T) {
108100
})
109101
},
110102
keys: []string{"x-tenant", "x-request-id"},
111-
expected: []testHeader{
103+
expected: []kgo.RecordHeader{
112104
{Key: "x-tenant", Value: []byte("tenant-1")},
113105
},
114106
},
115107
}
116108
for _, tt := range tests {
117109
t.Run(tt.name, func(t *testing.T) {
118-
got := metadataToHeaders(tt.makeCtx(t), tt.keys, makeTestHeader)
110+
got := metadataToHeaders(tt.makeCtx(t), tt.keys)
119111
if tt.expected == nil {
120112
require.Nil(t, got)
121113
} else {
@@ -125,45 +117,60 @@ func TestMetadataToHeaders(t *testing.T) {
125117
}
126118
}
127119

128-
func TestSetHeaders(t *testing.T) {
129-
type msg struct {
130-
headers []testHeader
131-
}
132-
getH := func(m *msg) []testHeader { return m.headers }
133-
setH := func(m *msg, h []testHeader) { m.headers = h }
134-
120+
func TestSetMessageHeaders(t *testing.T) {
135121
tests := []struct {
136122
name string
137-
messages []*msg
138-
headers []testHeader
139-
expected [][]testHeader // expected headers per message after call
123+
messages []*kgo.Record
124+
makeCtx func(t *testing.T) context.Context
125+
keys []string
126+
expected [][]kgo.RecordHeader // expected headers per message after call
140127
}{
141128
{
142-
name: "nil_headers",
143-
messages: []*msg{{}},
144-
headers: nil,
145-
expected: [][]testHeader{nil},
129+
name: "nil_metadata_keys",
130+
messages: []*kgo.Record{{}},
131+
makeCtx: func(t *testing.T) context.Context {
132+
return t.Context()
133+
},
134+
keys: nil,
135+
expected: [][]kgo.RecordHeader{nil},
146136
},
147137
{
148138
name: "empty_messages",
149139
messages: nil,
150-
headers: []testHeader{{Key: "k", Value: []byte("v")}},
140+
makeCtx: func(t *testing.T) context.Context {
141+
return client.NewContext(t.Context(), client.Info{
142+
Metadata: client.NewMetadata(map[string][]string{"k": {"v"}}),
143+
})
144+
},
145+
keys: []string{"k"},
151146
expected: nil,
152147
},
153148
{
154149
name: "sets_on_empty_message_headers",
155-
messages: []*msg{{}, {}},
156-
headers: []testHeader{{Key: "x-tenant", Value: []byte("t1")}},
157-
expected: [][]testHeader{
150+
messages: []*kgo.Record{{}, {}},
151+
makeCtx: func(t *testing.T) context.Context {
152+
return client.NewContext(t.Context(), client.Info{
153+
Metadata: client.NewMetadata(map[string][]string{"x-tenant": {"t1"}}),
154+
})
155+
},
156+
keys: []string{"x-tenant"},
157+
expected: [][]kgo.RecordHeader{
158158
{{Key: "x-tenant", Value: []byte("t1")}},
159159
{{Key: "x-tenant", Value: []byte("t1")}},
160160
},
161161
},
162162
{
163-
name: "appends_to_existing_headers",
164-
messages: []*msg{{headers: []testHeader{{Key: "existing", Value: []byte("val")}}}},
165-
headers: []testHeader{{Key: "x-new", Value: []byte("new-val")}},
166-
expected: [][]testHeader{
163+
name: "appends_to_existing_headers",
164+
messages: []*kgo.Record{
165+
{Headers: []kgo.RecordHeader{{Key: "existing", Value: []byte("val")}}},
166+
},
167+
makeCtx: func(t *testing.T) context.Context {
168+
return client.NewContext(t.Context(), client.Info{
169+
Metadata: client.NewMetadata(map[string][]string{"x-new": {"new-val"}}),
170+
})
171+
},
172+
keys: []string{"x-new"},
173+
expected: [][]kgo.RecordHeader{
167174
{
168175
{Key: "existing", Value: []byte("val")},
169176
{Key: "x-new", Value: []byte("new-val")},
@@ -173,9 +180,9 @@ func TestSetHeaders(t *testing.T) {
173180
}
174181
for _, tt := range tests {
175182
t.Run(tt.name, func(t *testing.T) {
176-
setHeaders(tt.messages, tt.headers, getH, setH)
183+
setMessageHeaders(tt.makeCtx(t), tt.messages, tt.keys)
177184
for i, m := range tt.messages {
178-
require.Equal(t, tt.expected[i], m.headers)
185+
require.Equal(t, tt.expected[i], m.Headers)
179186
}
180187
})
181188
}

0 commit comments

Comments
 (0)