@@ -555,7 +555,7 @@ func (g *Generation) partitionWatcher(interval time.Duration, topic string) {
555555type coordinator interface {
556556 io.Closer
557557 findCoordinator (findCoordinatorRequestV0 ) (findCoordinatorResponseV0 , error )
558- joinGroup (joinGroupRequestV1 ) (joinGroupResponseV1 , error )
558+ joinGroup (joinGroupRequest ) (joinGroupResponse , error )
559559 syncGroup (syncGroupRequestV0 ) (syncGroupResponseV0 , error )
560560 leaveGroup (leaveGroupRequestV0 ) (leaveGroupResponseV0 , error )
561561 heartbeat (heartbeatRequestV0 ) (heartbeatResponseV0 , error )
@@ -588,11 +588,11 @@ func (t *timeoutCoordinator) findCoordinator(req findCoordinatorRequestV0) (find
588588 return t .conn .findCoordinator (req )
589589}
590590
591- func (t * timeoutCoordinator ) joinGroup (req joinGroupRequestV1 ) (joinGroupResponseV1 , error ) {
591+ func (t * timeoutCoordinator ) joinGroup (req joinGroupRequest ) (joinGroupResponse , error ) {
592592 // in the case of join group, the consumer group coordinator may wait up
593593 // to rebalance timeout in order to wait for all members to join.
594594 if err := t .conn .SetDeadline (time .Now ().Add (t .timeout + t .rebalanceTimeout )); err != nil {
595- return joinGroupResponseV1 {}, err
595+ return joinGroupResponse {}, err
596596 }
597597 return t .conn .joinGroup (req )
598598}
@@ -932,7 +932,7 @@ func (cg *ConsumerGroup) coordinator() (coordinator, error) {
932932// * InvalidSessionTimeout:
933933// * GroupAuthorizationFailed:
934934func (cg * ConsumerGroup ) joinGroup (conn coordinator , memberID string ) (string , int32 , GroupMemberAssignments , error ) {
935- request , err := cg .makeJoinGroupRequestV1 (memberID )
935+ request , err := cg .makeJoinGroupRequest (memberID )
936936 if err != nil {
937937 return "" , 0 , nil , err
938938 }
@@ -978,8 +978,8 @@ func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, i
978978
979979// makeJoinGroupRequestV1 handles the logic of constructing a joinGroup
980980// request.
981- func (cg * ConsumerGroup ) makeJoinGroupRequestV1 (memberID string ) (joinGroupRequestV1 , error ) {
982- request := joinGroupRequestV1 {
981+ func (cg * ConsumerGroup ) makeJoinGroupRequest (memberID string ) (joinGroupRequest , error ) {
982+ request := joinGroupRequest {
983983 GroupID : cg .config .ID ,
984984 MemberID : memberID ,
985985 SessionTimeout : int32 (cg .config .SessionTimeout / time .Millisecond ),
@@ -990,7 +990,7 @@ func (cg *ConsumerGroup) makeJoinGroupRequestV1(memberID string) (joinGroupReque
990990 for _ , balancer := range cg .config .GroupBalancers {
991991 userData , err := balancer .UserData ()
992992 if err != nil {
993- return joinGroupRequestV1 {}, fmt .Errorf ("unable to construct protocol metadata for member, %v: %w" , balancer .ProtocolName (), err )
993+ return joinGroupRequest {}, fmt .Errorf ("unable to construct protocol metadata for member, %v: %w" , balancer .ProtocolName (), err )
994994 }
995995 request .GroupProtocols = append (request .GroupProtocols , joinGroupRequestGroupProtocolV1 {
996996 ProtocolName : balancer .ProtocolName (),
@@ -1007,7 +1007,7 @@ func (cg *ConsumerGroup) makeJoinGroupRequestV1(memberID string) (joinGroupReque
10071007
10081008// assignTopicPartitions uses the selected GroupBalancer to assign members to
10091009// their various partitions.
1010- func (cg * ConsumerGroup ) assignTopicPartitions (conn coordinator , group joinGroupResponseV1 ) (GroupMemberAssignments , error ) {
1010+ func (cg * ConsumerGroup ) assignTopicPartitions (conn coordinator , group joinGroupResponse ) (GroupMemberAssignments , error ) {
10111011 cg .withLogger (func (l Logger ) {
10121012 l .Printf ("selected as leader for group, %s\n " , cg .config .ID )
10131013 })
@@ -1050,7 +1050,7 @@ func (cg *ConsumerGroup) assignTopicPartitions(conn coordinator, group joinGroup
10501050}
10511051
10521052// makeMemberProtocolMetadata maps encoded member metadata ([]byte) into []GroupMember.
1053- func (cg * ConsumerGroup ) makeMemberProtocolMetadata (in []joinGroupResponseMemberV1 ) ([]GroupMember , error ) {
1053+ func (cg * ConsumerGroup ) makeMemberProtocolMetadata (in []joinGroupResponseMember ) ([]GroupMember , error ) {
10541054 members := make ([]GroupMember , 0 , len (in ))
10551055 for _ , item := range in {
10561056 metadata := groupMetadata {}
0 commit comments