Skip to content

Commit 4e65f41

Browse files
authored
[chore] Inject component.Host into franz-go Kafka clients (#45336)
#### Description Inject `component.Host` into franz-go Kafka clients. This will enable us to expose an oauth2 token source as an extension (e.g. oauth2clientauth, or azureauth (#45064)). See also #41873 (comment) #### Link to tracking issue N/A #### Testing N/A #### Documentation N/A
1 parent e25488e commit 4e65f41

File tree

10 files changed

+105
-36
lines changed

10 files changed

+105
-36
lines changed

exporter/kafkaexporter/kafka_exporter.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ func (e *kafkaExporter[T]) Start(ctx context.Context, host component.Host) (err
8787

8888
producer, err := kafka.NewFranzSyncProducer(
8989
ctx,
90+
host,
9091
e.cfg.ClientConfig,
9192
e.cfg.Producer,
9293
e.cfg.TimeoutSettings.Timeout,

exporter/kafkaexporter/kafka_exporter_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -949,7 +949,7 @@ func configureExporter[T any](tb testing.TB,
949949
kgo.ClientID(cfg.ClientID),
950950
}
951951

952-
client, err := kafka.NewFranzSyncProducer(tb.Context(), kcfg,
952+
client, err := kafka.NewFranzSyncProducer(tb.Context(), host, kcfg,
953953
cfg.Producer, 1*time.Second, zap.NewNop(), kgoClientOpts...)
954954
require.NoError(tb, err, "failed to create kgo.Client with fake cluster addresses")
955955

internal/kafka/franz_client.go

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/twmb/franz-go/pkg/sasl/plain"
2525
"github.com/twmb/franz-go/pkg/sasl/scram"
2626
"github.com/twmb/franz-go/plugin/kzap"
27+
"go.opentelemetry.io/collector/component"
2728
"go.opentelemetry.io/collector/config/configcompression"
2829
"go.uber.org/zap"
2930

@@ -38,7 +39,10 @@ const (
3839
)
3940

4041
// NewFranzSyncProducer creates a new Kafka client using the franz-go library.
41-
func NewFranzSyncProducer(ctx context.Context, clientCfg configkafka.ClientConfig,
42+
func NewFranzSyncProducer(
43+
ctx context.Context,
44+
host component.Host,
45+
clientCfg configkafka.ClientConfig,
4246
cfg configkafka.ProducerConfig,
4347
timeout time.Duration,
4448
logger *zap.Logger,
@@ -50,7 +54,7 @@ func NewFranzSyncProducer(ctx context.Context, clientCfg configkafka.ClientConfi
5054
default:
5155
codec = codec.WithLevel(int(cfg.CompressionParams.Level))
5256
}
53-
opts, err := commonOpts(ctx, clientCfg, logger, append(
57+
opts, err := commonOpts(ctx, host, clientCfg, logger, append(
5458
opts,
5559
kgo.ProduceRequestTimeout(timeout),
5660
kgo.ProducerBatchCompression(codec),
@@ -86,14 +90,17 @@ func NewFranzSyncProducer(ctx context.Context, clientCfg configkafka.ClientConfi
8690
}
8791

8892
// NewFranzConsumerGroup creates a new Kafka consumer client using the franz-go library.
89-
func NewFranzConsumerGroup(ctx context.Context, clientCfg configkafka.ClientConfig,
93+
func NewFranzConsumerGroup(
94+
ctx context.Context,
95+
host component.Host,
96+
clientCfg configkafka.ClientConfig,
9097
consumerCfg configkafka.ConsumerConfig,
9198
topics []string,
9299
excludeTopics []string,
93100
logger *zap.Logger,
94101
opts ...kgo.Opt,
95102
) (*kgo.Client, error) {
96-
opts, err := commonOpts(ctx, clientCfg, logger, append([]kgo.Opt{
103+
opts, err := commonOpts(ctx, host, clientCfg, logger, append([]kgo.Opt{
97104
kgo.ConsumeTopics(topics...),
98105
kgo.ConsumerGroup(consumerCfg.GroupID),
99106
kgo.SessionTimeout(consumerCfg.SessionTimeout),
@@ -167,11 +174,12 @@ func NewFranzConsumerGroup(ctx context.Context, clientCfg configkafka.ClientConf
167174
// NewFranzClient creates a franz-go client using the same commonOpts used for producer/consumer.
168175
func NewFranzClient(
169176
ctx context.Context,
177+
host component.Host,
170178
clientCfg configkafka.ClientConfig,
171179
logger *zap.Logger,
172180
opts ...kgo.Opt,
173181
) (*kgo.Client, error) {
174-
opts, err := commonOpts(ctx, clientCfg, logger, opts...)
182+
opts, err := commonOpts(ctx, host, clientCfg, logger, opts...)
175183
if err != nil {
176184
return nil, err
177185
}
@@ -181,23 +189,22 @@ func NewFranzClient(
181189
// NewFranzClusterAdminClient creates a kadm admin client from a freshly created franz client.
182190
func NewFranzClusterAdminClient(
183191
ctx context.Context,
192+
host component.Host,
184193
clientCfg configkafka.ClientConfig,
185194
logger *zap.Logger,
186195
opts ...kgo.Opt,
187196
) (*kadm.Client, *kgo.Client, error) {
188-
cl, err := NewFranzClient(ctx, clientCfg, logger, opts...)
197+
cl, err := NewFranzClient(ctx, host, clientCfg, logger, opts...)
189198
if err != nil {
190199
return nil, nil, err
191200
}
192201
return kadm.NewClient(cl), cl, nil
193202
}
194203

195-
// NewFranzAdminFromClient returns a kadm admin bound to an existing kgo client.
196-
func NewFranzAdminFromClient(cl *kgo.Client) *kadm.Client {
197-
return kadm.NewClient(cl)
198-
}
199-
200-
func commonOpts(ctx context.Context, clientCfg configkafka.ClientConfig,
204+
func commonOpts(
205+
ctx context.Context,
206+
_ component.Host,
207+
clientCfg configkafka.ClientConfig,
201208
logger *zap.Logger,
202209
opts ...kgo.Opt,
203210
) ([]kgo.Opt, error) {

internal/kafka/franz_client_test.go

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/twmb/franz-go/pkg/kfake"
2121
"github.com/twmb/franz-go/pkg/kgo"
2222
"github.com/twmb/franz-go/pkg/kmsg"
23+
"go.opentelemetry.io/collector/component/componenttest"
2324
"go.opentelemetry.io/collector/config/configopaque"
2425
"go.opentelemetry.io/collector/config/configtls"
2526
"go.uber.org/zap"
@@ -45,7 +46,8 @@ func TestNewFranzSyncProducer_SASL(t *testing.T) {
4546
Version: 1, // kfake only supports version 1
4647
}
4748
tl := zaptest.NewLogger(t, zaptest.Level(zap.WarnLevel))
48-
client, err := NewFranzSyncProducer(t.Context(), clientConfig,
49+
client, err := NewFranzSyncProducer(
50+
t.Context(), componenttest.NewNopHost(), clientConfig,
4951
configkafka.NewDefaultProducerConfig(), time.Second, tl,
5052
)
5153
if err != nil {
@@ -124,7 +126,8 @@ func TestNewFranzSyncProducer_TLS(t *testing.T) {
124126
observedLogs.TakeAll() // drop existing logs
125127
clientConfig := clientConfig // copy
126128
clientConfig.TLS = cfg
127-
client, err := NewFranzSyncProducer(t.Context(), clientConfig,
129+
client, err := NewFranzSyncProducer(
130+
t.Context(), componenttest.NewNopHost(), clientConfig,
128131
configkafka.NewDefaultProducerConfig(), time.Second, logger,
129132
)
130133
if err != nil {
@@ -183,7 +186,10 @@ func TestNewFranzSyncProducerCompression(t *testing.T) {
183186
prodCfg.Compression = compressionAlgo
184187

185188
tl := zaptest.NewLogger(t, zaptest.Level(zap.InfoLevel))
186-
client, err := NewFranzSyncProducer(t.Context(), clientConfig, prodCfg, time.Second, tl)
189+
client, err := NewFranzSyncProducer(
190+
t.Context(), componenttest.NewNopHost(), clientConfig,
191+
prodCfg, time.Second, tl,
192+
)
187193
require.NoError(t, err)
188194
defer client.Close()
189195

@@ -250,7 +256,10 @@ func TestNewFranzSyncProducerRequiredAcks(t *testing.T) {
250256
prodCfg.RequiredAcks = ack
251257

252258
tl := zaptest.NewLogger(t, zaptest.Level(zap.WarnLevel))
253-
client, err := NewFranzSyncProducer(t.Context(), clientConfig, prodCfg, time.Second, tl)
259+
client, err := NewFranzSyncProducer(
260+
t.Context(), componenttest.NewNopHost(), clientConfig,
261+
prodCfg, time.Second, tl,
262+
)
254263
require.NoError(t, err)
255264
defer client.Close()
256265

@@ -425,7 +434,8 @@ func mustNewFranzConsumerGroup(t *testing.T,
425434
// up and avoid waiting for too long.
426435
minAge := 10 * time.Millisecond
427436
opts = append(opts, kgo.MetadataMinAge(minAge), kgo.MetadataMaxAge(minAge*2))
428-
client, err := NewFranzConsumerGroup(t.Context(), clientConfig, consumerConfig,
437+
client, err := NewFranzConsumerGroup(
438+
t.Context(), componenttest.NewNopHost(), clientConfig, consumerConfig,
429439
topics, nil, zaptest.NewLogger(t, zaptest.Level(zap.InfoLevel)), opts...,
430440
)
431441
require.NoError(t, err)
@@ -447,7 +457,8 @@ func TestFranzClient_MetadataRefreshInterval(t *testing.T) {
447457
name: "producer",
448458
setupClient: func(t *testing.T, clientConfig configkafka.ClientConfig, _ string, metadataMinAge time.Duration) {
449459
tl := zaptest.NewLogger(t, zaptest.Level(zap.WarnLevel))
450-
client, err := NewFranzSyncProducer(t.Context(), clientConfig,
460+
client, err := NewFranzSyncProducer(
461+
t.Context(), componenttest.NewNopHost(), clientConfig,
451462
configkafka.NewDefaultProducerConfig(), time.Second, tl,
452463
kgo.MetadataMinAge(metadataMinAge),
453464
)
@@ -534,7 +545,7 @@ func TestFranzClient_ProtocolVersion(t *testing.T) {
534545
})
535546
t.Run("producer", func(t *testing.T) {
536547
client, err := NewFranzSyncProducer(
537-
t.Context(), clientConfig,
548+
t.Context(), componenttest.NewNopHost(), clientConfig,
538549
configkafka.NewDefaultProducerConfig(), time.Second, zap.NewNop(),
539550
)
540551
require.NoError(t, err)
@@ -551,12 +562,12 @@ func TestNewFranzClient_And_Admin(t *testing.T) {
551562
tl := zaptest.NewLogger(t, zaptest.Level(zap.WarnLevel))
552563

553564
// Plain client
554-
cl, err := NewFranzClient(t.Context(), clientCfg, tl)
565+
cl, err := NewFranzClient(t.Context(), componenttest.NewNopHost(), clientCfg, tl)
555566
require.NoError(t, err)
556567
t.Cleanup(cl.Close)
557568

558569
// Admin from fresh client
559-
ad, cl2, err := NewFranzClusterAdminClient(t.Context(), clientCfg, tl)
570+
ad, cl2, err := NewFranzClusterAdminClient(t.Context(), componenttest.NewNopHost(), clientCfg, tl)
560571
require.NoError(t, err)
561572
t.Cleanup(func() { ad.Close(); cl2.Close() })
562573

internal/kafka/go.mod

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,14 @@ require (
1111
github.com/twmb/franz-go/pkg/kfake v0.0.0-20251021233722-4ca18825d8c0
1212
github.com/twmb/franz-go/pkg/sasl/kerberos v1.1.0
1313
github.com/xdg-go/scram v1.2.0
14+
go.opentelemetry.io/collector/component v1.49.1-0.20260119125341-0550b08ddcc5
15+
go.opentelemetry.io/collector/component/componenttest v0.143.1-0.20260119125341-0550b08ddcc5
1416
go.opentelemetry.io/collector/config/configcompression v1.49.1-0.20260119125341-0550b08ddcc5
1517
go.opentelemetry.io/collector/config/configopaque v1.49.1-0.20260119125341-0550b08ddcc5
1618
go.opentelemetry.io/collector/config/configtls v1.49.1-0.20260119125341-0550b08ddcc5
1719
go.uber.org/goleak v1.3.0
1820
)
1921

20-
require go.opentelemetry.io/collector/confmap/xconfmap v0.143.1-0.20260119125341-0550b08ddcc5 // indirect
21-
2222
require (
2323
github.com/aws/aws-sdk-go-v2 v1.36.4 // indirect
2424
github.com/aws/aws-sdk-go-v2/config v1.29.16 // indirect
@@ -33,46 +33,60 @@ require (
3333
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.30.2 // indirect
3434
github.com/aws/aws-sdk-go-v2/service/sts v1.33.21 // indirect
3535
github.com/aws/smithy-go v1.22.2 // indirect
36+
github.com/cespare/xxhash/v2 v2.3.0 // indirect
3637
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
3738
github.com/eapache/go-resiliency v1.7.0 // indirect
3839
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
3940
github.com/eapache/queue v1.1.0 // indirect
4041
github.com/foxboron/go-tpm-keyfiles v0.0.0-20250903184740-5d135037bd4d // indirect
4142
github.com/fsnotify/fsnotify v1.9.0 // indirect
43+
github.com/go-logr/logr v1.4.3 // indirect
44+
github.com/go-logr/stdr v1.2.2 // indirect
4245
github.com/go-viper/mapstructure/v2 v2.5.0 // indirect
4346
github.com/gobwas/glob v0.2.3 // indirect
4447
github.com/golang/snappy v0.0.4 // indirect
4548
github.com/google/go-tpm v0.9.8 // indirect
49+
github.com/google/uuid v1.6.0 // indirect
4650
github.com/hashicorp/go-uuid v1.0.3 // indirect
4751
github.com/hashicorp/go-version v1.8.0 // indirect
4852
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
4953
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
5054
github.com/jcmturner/gofork v1.7.6 // indirect
5155
github.com/jcmturner/gokrb5/v8 v8.4.4
5256
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
57+
github.com/json-iterator/go v1.1.12 // indirect
5358
github.com/klauspost/compress v1.18.2 // indirect
5459
github.com/knadh/koanf/maps v0.1.2 // indirect
5560
github.com/knadh/koanf/providers/confmap v1.0.0 // indirect
5661
github.com/knadh/koanf/v2 v2.3.0 // indirect
5762
github.com/mitchellh/copystructure v1.2.0 // indirect
5863
github.com/mitchellh/reflectwalk v1.0.2 // indirect
64+
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
65+
github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee // indirect
5966
github.com/pierrec/lz4/v4 v4.1.22 // indirect
6067
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
61-
github.com/rogpeppe/go-internal v1.13.1 // indirect
6268
github.com/twmb/franz-go v1.20.6
6369
github.com/twmb/franz-go/pkg/kadm v1.17.1
6470
github.com/twmb/franz-go/pkg/kmsg v1.12.0
6571
github.com/twmb/franz-go/plugin/kzap v1.1.2
6672
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
6773
github.com/xdg-go/stringprep v1.0.4 // indirect
74+
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
6875
go.opentelemetry.io/collector/confmap v1.49.1-0.20260119125341-0550b08ddcc5 // indirect
76+
go.opentelemetry.io/collector/confmap/xconfmap v0.143.1-0.20260119125341-0550b08ddcc5 // indirect
6977
go.opentelemetry.io/collector/featuregate v1.49.1-0.20260119125341-0550b08ddcc5 // indirect
78+
go.opentelemetry.io/collector/pdata v1.49.1-0.20260119125341-0550b08ddcc5 // indirect
79+
go.opentelemetry.io/otel v1.39.0 // indirect
80+
go.opentelemetry.io/otel/metric v1.39.0 // indirect
81+
go.opentelemetry.io/otel/sdk v1.39.0 // indirect
82+
go.opentelemetry.io/otel/sdk/metric v1.39.0 // indirect
83+
go.opentelemetry.io/otel/trace v1.39.0 // indirect
7084
go.uber.org/multierr v1.11.0 // indirect
7185
go.uber.org/zap v1.27.1
7286
go.yaml.in/yaml/v3 v3.0.4 // indirect
7387
golang.org/x/crypto v0.45.0 // indirect
7488
golang.org/x/net v0.47.0 // indirect
75-
golang.org/x/sys v0.38.0 // indirect
89+
golang.org/x/sys v0.39.0 // indirect
7690
golang.org/x/text v0.31.0 // indirect
7791
gopkg.in/yaml.v3 v3.0.1 // indirect
7892
)

0 commit comments

Comments
 (0)