Skip to content

Commit ba20ded

Browse files
hindessmpuellanivisdnwe
authored
feat: support V5 ListConsumerGroups protocol (#3292)
Add TypesFilter to request and GroupType to group data in the protocol. These come from [KIP-848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol) Note: the option to specify these on a request is not currently exposed via `(ca *clusterAdmin) ListConsumerGroups()` --------- Signed-off-by: beanz <[email protected]> Signed-off-by: Dominic Evans <[email protected]> Co-authored-by: Cassondra Foesch <[email protected]> Co-authored-by: Dominic Evans <[email protected]>
1 parent 020be9c commit ba20ded

File tree

6 files changed

+138
-6
lines changed

6 files changed

+138
-6
lines changed

admin.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1008,7 +1008,10 @@ func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err e
10081008
_ = b.Open(conf) // Ensure that broker is opened
10091009

10101010
request := &ListGroupsRequest{}
1011-
if ca.conf.Version.IsAtLeast(V2_6_0_0) {
1011+
if ca.conf.Version.IsAtLeast(V3_8_0_0) {
1012+
// Version 5 adds the TypesFilter field (KIP-848).
1013+
request.Version = 5
1014+
} else if ca.conf.Version.IsAtLeast(V2_6_0_0) {
10121015
// Version 4 adds the StatesFilter field (KIP-518).
10131016
request.Version = 4
10141017
} else if ca.conf.Version.IsAtLeast(V2_4_0_0) {

functional_admin_test.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ package sarama
44

55
import (
66
"context"
7+
"maps"
8+
"slices"
79
"testing"
810

911
"github.com/davecgh/go-spew/spew"
@@ -189,6 +191,48 @@ func TestFuncAdminDescribeGroups(t *testing.T) {
189191
m2.AssertCleanShutdown()
190192
}
191193

194+
func TestFuncAdminListConsumerGroups(t *testing.T) {
195+
setupFunctionalTest(t)
196+
defer teardownFunctionalTest(t)
197+
198+
group1 := testFuncConsumerGroupID(t)
199+
group2 := testFuncConsumerGroupID(t)
200+
201+
config := NewFunctionalTestConfig()
202+
adminClient, err := NewClusterAdmin(FunctionalTestEnv.KafkaBrokerAddrs, config)
203+
if err != nil {
204+
t.Fatal(err)
205+
}
206+
defer safeClose(t, adminClient)
207+
208+
config1 := NewFunctionalTestConfig()
209+
config1.ClientID = "M1"
210+
config1.Consumer.Offsets.Initial = OffsetNewest
211+
m1 := runTestFuncConsumerGroupMemberWithConfig(t, config1, group1, 100, nil, "test.4")
212+
defer m1.Close()
213+
214+
config2 := NewFunctionalTestConfig()
215+
config2.ClientID = "M2"
216+
config2.Consumer.Offsets.Initial = OffsetNewest
217+
config2.Consumer.Group.InstanceId = "Instance2"
218+
m2 := runTestFuncConsumerGroupMemberWithConfig(t, config2, group2, 100, nil, "test.4")
219+
defer m2.Close()
220+
221+
m1.WaitForState(2)
222+
m2.WaitForState(2)
223+
224+
res, err := adminClient.ListConsumerGroups()
225+
if err != nil {
226+
t.Fatal(err)
227+
}
228+
assert.GreaterOrEqual(t, len(res), 2)
229+
assert.Contains(t, slices.Collect(maps.Keys(res)), group1)
230+
assert.Contains(t, slices.Collect(maps.Keys(res)), group2)
231+
232+
m1.AssertCleanShutdown()
233+
m2.AssertCleanShutdown()
234+
}
235+
192236
func TestFuncAdminListConsumerGroupOffsets(t *testing.T) {
193237
checkKafkaVersion(t, "0.8.2.0")
194238
setupFunctionalTest(t)

list_groups_request.go

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package sarama
33
type ListGroupsRequest struct {
44
Version int16
55
StatesFilter []string // version 4 or later
6+
TypesFilter []string // version 5 or later
67
}
78

89
func (r *ListGroupsRequest) setVersion(v int16) {
@@ -19,6 +20,15 @@ func (r *ListGroupsRequest) encode(pe packetEncoder) error {
1920
}
2021
}
2122
}
23+
if r.Version >= 5 {
24+
pe.putCompactArrayLength(len(r.TypesFilter))
25+
for _, filter := range r.TypesFilter {
26+
err := pe.putCompactString(filter)
27+
if err != nil {
28+
return err
29+
}
30+
}
31+
}
2232
if r.Version >= 3 {
2333
pe.putEmptyTaggedFieldArray()
2434
}
@@ -41,6 +51,20 @@ func (r *ListGroupsRequest) decode(pd packetDecoder, version int16) (err error)
4151
}
4252
}
4353
}
54+
if r.Version >= 5 {
55+
filterLen, err := pd.getCompactArrayLength()
56+
if err != nil {
57+
return err
58+
}
59+
if filterLen > 0 {
60+
r.TypesFilter = make([]string, filterLen)
61+
for i := 0; i < filterLen; i++ {
62+
if r.TypesFilter[i], err = pd.getCompactString(); err != nil {
63+
return err
64+
}
65+
}
66+
}
67+
}
4468
if r.Version >= 3 {
4569
if _, err = pd.getEmptyTaggedFieldArray(); err != nil {
4670
return err
@@ -65,11 +89,13 @@ func (r *ListGroupsRequest) headerVersion() int16 {
6589
}
6690

6791
func (r *ListGroupsRequest) isValidVersion() bool {
68-
return r.Version >= 0 && r.Version <= 4
92+
return r.Version >= 0 && r.Version <= 5
6993
}
7094

7195
func (r *ListGroupsRequest) requiredVersion() KafkaVersion {
7296
switch r.Version {
97+
case 5:
98+
return V3_8_0_0
7399
case 4:
74100
return V2_6_0_0
75101
case 3:

list_groups_request_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,16 @@ func TestListGroupsRequest(t *testing.T) {
3636
6, 'E', 'm', 'p', 't', 'y', // compact string
3737
0, // empty tag buffer
3838
})
39+
40+
testRequest(t, "ListGroupsRequest", &ListGroupsRequest{
41+
Version: 5,
42+
StatesFilter: []string{"Empty"},
43+
TypesFilter: []string{"Classic"},
44+
}, []byte{
45+
2, // compact array length (1)
46+
6, 'E', 'm', 'p', 't', 'y', // compact string
47+
2, // compact array length (1)
48+
8, 'C', 'l', 'a', 's', 's', 'i', 'c', // compact string
49+
0, // empty tag buffer
50+
})
3951
}

list_groups_response.go

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ func (r *ListGroupsResponse) setVersion(v int16) {
1414

1515
type GroupData struct {
1616
GroupState string // version 4 or later
17+
GroupType string // version 5 or later
1718
}
1819

1920
func (r *ListGroupsResponse) encode(pe packetEncoder) error {
@@ -51,6 +52,13 @@ func (r *ListGroupsResponse) encode(pe packetEncoder) error {
5152
return err
5253
}
5354
}
55+
56+
if r.Version >= 5 {
57+
groupData := r.GroupsData[groupId]
58+
if err := pe.putCompactString(groupData.GroupType); err != nil {
59+
return err
60+
}
61+
}
5462
}
5563
}
5664

@@ -115,13 +123,20 @@ func (r *ListGroupsResponse) decode(pd packetDecoder, version int16) error {
115123
r.Groups[groupId] = protocolType
116124

117125
if r.Version >= 4 {
126+
var groupData GroupData
118127
groupState, err := pd.getCompactString()
119128
if err != nil {
120129
return err
121130
}
122-
r.GroupsData[groupId] = GroupData{
123-
GroupState: groupState,
131+
groupData.GroupState = groupState
132+
if r.Version >= 5 {
133+
groupType, err := pd.getCompactString()
134+
if err != nil {
135+
return err
136+
}
137+
groupData.GroupType = groupType
124138
}
139+
r.GroupsData[groupId] = groupData
125140
}
126141

127142
if r.Version >= 3 {
@@ -156,11 +171,13 @@ func (r *ListGroupsResponse) headerVersion() int16 {
156171
}
157172

158173
func (r *ListGroupsResponse) isValidVersion() bool {
159-
return r.Version >= 0 && r.Version <= 4
174+
return r.Version >= 0 && r.Version <= 5
160175
}
161176

162177
func (r *ListGroupsResponse) requiredVersion() KafkaVersion {
163178
switch r.Version {
179+
case 5:
180+
return V3_8_0_0
164181
case 4:
165182
return V2_6_0_0
166183
case 3:

list_groups_response_test.go

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,18 @@ var (
3535
0, // Empty tag buffer
3636
0, // Empty tag buffer
3737
}
38+
39+
listGroupResponseV5 = []byte{
40+
0, 0, 0, 0, // no throttle time
41+
0, 0, // no error
42+
2, // compact array length (1)
43+
4, 'f', 'o', 'o', // group name (compact string)
44+
9, 'c', 'o', 'n', 's', 'u', 'm', 'e', 'r', // protocol type (compact string)
45+
6, 'E', 'm', 'p', 't', 'y', // state (compact string)
46+
8, 'C', 'l', 'a', 's', 's', 'i', 'c', // type (compact string)
47+
0, // Empty tag buffer
48+
0, // Empty tag buffer
49+
}
3850
)
3951

4052
func TestListGroupsResponse(t *testing.T) {
@@ -82,6 +94,24 @@ func TestListGroupsResponse(t *testing.T) {
8294
t.Error("Expected foo group to use consumer protocol")
8395
}
8496
if response.GroupsData["foo"].GroupState != "Empty" {
85-
t.Error("Expected foo grup to have empty state")
97+
t.Error("Expected foo group to have empty state")
98+
}
99+
100+
response = new(ListGroupsResponse)
101+
testVersionDecodable(t, "no error", response, listGroupResponseV5, 5)
102+
if !errors.Is(response.Err, ErrNoError) {
103+
t.Error("Expected no gerror, found:", response.Err)
104+
}
105+
if len(response.Groups) != 1 {
106+
t.Error("Expected one group")
107+
}
108+
if response.Groups["foo"] != "consumer" {
109+
t.Error("Expected foo group to use consumer protocol")
110+
}
111+
if response.GroupsData["foo"].GroupState != "Empty" {
112+
t.Error("Expected foo group to have empty state")
113+
}
114+
if response.GroupsData["foo"].GroupType != "Classic" {
115+
t.Error("Expected foo group to have type 'Classic', found: ", response.GroupsData["foo"].GroupType)
86116
}
87117
}

0 commit comments

Comments
 (0)