Skip to content

Commit 4f67582

Browse files
authored
kafka: fix support for protocol_version in franz-go client (#42833)
#### Description The franz-go client was ignoring `protocol_version`. We now set both MinVersions and MaxVersions options based on the specified version, so the client assumes that exact version. #### Link to tracking issue Relates to #42795 #### Testing Tested with kafkaexporter against Azure Event Hubs, which currently only works on Windows if you specify a version < 4 due to twmb/franz-go#1102 #### Documentation None
1 parent f5f5c4a commit 4f67582

File tree

3 files changed

+89
-0
lines changed

3 files changed

+89
-0
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: kafka
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Fix support for protocol_version in franz-go client
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [42795]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

internal/kafka/franz_client.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ import (
1515
krb5config "github.com/jcmturner/gokrb5/v8/config"
1616
"github.com/jcmturner/gokrb5/v8/keytab"
1717
"github.com/twmb/franz-go/pkg/kgo"
18+
"github.com/twmb/franz-go/pkg/kmsg"
19+
"github.com/twmb/franz-go/pkg/kversion"
1820
"github.com/twmb/franz-go/pkg/sasl"
1921
"github.com/twmb/franz-go/pkg/sasl/kerberos"
2022
"github.com/twmb/franz-go/pkg/sasl/oauth"
@@ -237,6 +239,21 @@ func commonOpts(ctx context.Context, clientCfg configkafka.ClientConfig,
237239
if clientCfg.Metadata.RefreshInterval > 0 {
238240
opts = append(opts, kgo.MetadataMaxAge(clientCfg.Metadata.RefreshInterval))
239241
}
242+
// Configure the min/max protocol version if provided
243+
if clientCfg.ProtocolVersion != "" {
244+
keyVersions := make(map[string]any)
245+
versions := kversion.FromString(clientCfg.ProtocolVersion)
246+
versions.EachMaxKeyVersion(func(k, v int16) {
247+
name := kmsg.NameForKey(k)
248+
keyVersions[name] = v
249+
})
250+
logger.Info(
251+
"setting kafka protocol version",
252+
zap.String("version", clientCfg.ProtocolVersion),
253+
zap.Any("key_versions", keyVersions),
254+
)
255+
opts = append(opts, kgo.MinVersions(versions), kgo.MaxVersions(versions))
256+
}
240257
return opts, nil
241258
}
242259

internal/kafka/franz_client_test.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -549,3 +549,48 @@ func TestFranzClient_MetadataRefreshInterval(t *testing.T) {
549549
})
550550
}
551551
}
552+
553+
func TestFranzClient_ProtocolVersion(t *testing.T) {
554+
type testcase struct {
555+
protocolVersion string
556+
expectedVersion int
557+
}
558+
tests := map[string]testcase{
559+
"without protocol version": {
560+
expectedVersion: 4, // maximum
561+
},
562+
"with protocol version": {
563+
protocolVersion: "2.1.0",
564+
expectedVersion: 2,
565+
},
566+
}
567+
568+
for name, testcase := range tests {
569+
t.Run(name, func(t *testing.T) {
570+
var calls int
571+
cluster, clientConfig := kafkatest.NewCluster(t)
572+
cluster.ControlKey(int16(kmsg.ApiVersions), func(req kmsg.Request) (kmsg.Response, error, bool) {
573+
calls++
574+
assert.Equal(t, int16(testcase.expectedVersion), req.GetVersion())
575+
return nil, nil, false
576+
})
577+
578+
clientConfig.ProtocolVersion = testcase.protocolVersion
579+
t.Run("consumer", func(t *testing.T) {
580+
consumeConfig := configkafka.NewDefaultConsumerConfig()
581+
client := mustNewFranzConsumerGroup(t, clientConfig, consumeConfig, []string{})
582+
assert.NoError(t, client.Ping(t.Context()))
583+
})
584+
t.Run("producer", func(t *testing.T) {
585+
client, err := NewFranzSyncProducer(
586+
t.Context(), clientConfig,
587+
configkafka.NewDefaultProducerConfig(), time.Second, zap.NewNop(),
588+
)
589+
require.NoError(t, err)
590+
require.NoError(t, client.Ping(t.Context())) // trigger an API call
591+
client.Close()
592+
})
593+
assert.Equal(t, 2, calls)
594+
})
595+
}
596+
}

0 commit comments

Comments
 (0)