@@ -125,7 +125,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
125125 }, msg = s " Could not get partitions assigned. Last response $shareGroupHeartbeatResponse. " )
126126
127127 // Verify the response.
128- assertEquals(2 , shareGroupHeartbeatResponse.data.memberEpoch)
128+ assertEquals(3 , shareGroupHeartbeatResponse.data.memberEpoch)
129129 assertEquals(expectedAssignment, shareGroupHeartbeatResponse.data.assignment)
130130
131131 // Leave the group.
@@ -232,14 +232,26 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
232232
233233 // Heartbeats until the partitions are assigned for member 1.
234234 shareGroupHeartbeatResponse = null
235+
235236 TestUtils .waitUntilTrue(() => {
236237 shareGroupHeartbeatResponse = connectAndReceive(shareGroupHeartbeatRequest)
237- shareGroupHeartbeatResponse.data.errorCode == Errors .NONE .code && shareGroupHeartbeatResponse.data.assignment != null
238+ if (shareGroupHeartbeatResponse.data.errorCode == Errors .NONE .code && shareGroupHeartbeatResponse.data().assignment() != null ) {
239+ true
240+ } else {
241+ shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequest .Builder (
242+ new ShareGroupHeartbeatRequestData ()
243+ .setGroupId(" grp" )
244+ .setMemberId(memberId1)
245+ .setMemberEpoch(shareGroupHeartbeatResponse.data.memberEpoch()),
246+ true
247+ ).build()
248+ false
249+ }
238250 }, msg = s " Could not get partitions assigned. Last response $shareGroupHeartbeatResponse. " )
239251
240252 val topicPartitionsAssignedToMember1 = shareGroupHeartbeatResponse.data.assignment.topicPartitions()
241253 // Verify the response.
242- assertEquals(3 , shareGroupHeartbeatResponse.data.memberEpoch)
254+ assertEquals(4 , shareGroupHeartbeatResponse.data.memberEpoch)
243255
244256 // Prepare the next heartbeat for member 2.
245257 shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequest .Builder (
@@ -259,7 +271,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
259271
260272 val topicPartitionsAssignedToMember2 = shareGroupHeartbeatResponse.data.assignment.topicPartitions()
261273 // Verify the response.
262- assertEquals(3 , shareGroupHeartbeatResponse.data.memberEpoch)
274+ assertEquals(4 , shareGroupHeartbeatResponse.data.memberEpoch)
263275
264276 val partitionsAssigned : util.Set [Integer ] = new util.HashSet [Integer ]()
265277 topicPartitionsAssignedToMember1.forEach(topicPartition => {
@@ -290,7 +302,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
290302 }, msg = s " Could not get partitions assigned. Last response $shareGroupHeartbeatResponse. " )
291303
292304 // Verify the response.
293- assertEquals(3 , shareGroupHeartbeatResponse.data.memberEpoch)
305+ assertEquals(4 , shareGroupHeartbeatResponse.data.memberEpoch)
294306 } finally {
295307 admin.close()
296308 }
@@ -369,7 +381,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
369381 }, msg = s " Could not get partitions assigned. Last response $shareGroupHeartbeatResponse. " )
370382
371383 // Verify the response.
372- assertEquals(2 , shareGroupHeartbeatResponse.data.memberEpoch)
384+ assertEquals(3 , shareGroupHeartbeatResponse.data.memberEpoch)
373385
374386 // Member leaves the group.
375387 shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequest .Builder (
@@ -402,7 +414,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
402414 shareGroupHeartbeatResponse = connectAndReceive(shareGroupHeartbeatRequest)
403415
404416 // Verify the response for member 1.
405- assertEquals(4 , shareGroupHeartbeatResponse.data.memberEpoch)
417+ assertEquals(5 , shareGroupHeartbeatResponse.data.memberEpoch)
406418 assertEquals(memberId, shareGroupHeartbeatResponse.data.memberId)
407419 // Partition assignment remains intact on rejoining.
408420 assertEquals(expectedAssignment, shareGroupHeartbeatResponse.data.assignment)
@@ -491,7 +503,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
491503 shareGroupHeartbeatResponse.data.assignment.topicPartitions.containsAll(expectedAssignment.topicPartitions)
492504 }, msg = s " Could not get partitions for topic foo and bar assigned. Last response $shareGroupHeartbeatResponse. " )
493505 // Verify the response.
494- assertEquals(2 , shareGroupHeartbeatResponse.data.memberEpoch)
506+ assertEquals(3 , shareGroupHeartbeatResponse.data.memberEpoch)
495507 // Create the topic baz.
496508 val bazTopicId = TestUtils .createTopicWithAdminRaw(
497509 admin = admin,
@@ -515,7 +527,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
515527 new ShareGroupHeartbeatRequestData ()
516528 .setGroupId(" grp" )
517529 .setMemberId(memberId)
518- .setMemberEpoch(2 ),
530+ .setMemberEpoch(3 ),
519531 true
520532 ).build()
521533
@@ -527,7 +539,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
527539 shareGroupHeartbeatResponse.data.assignment.topicPartitions.containsAll(expectedAssignment.topicPartitions)
528540 }, msg = s " Could not get partitions for topic baz assigned. Last response $shareGroupHeartbeatResponse. " )
529541 // Verify the response.
530- assertEquals(3 , shareGroupHeartbeatResponse.data.memberEpoch)
542+ assertEquals(5 , shareGroupHeartbeatResponse.data.memberEpoch)
531543 // Increasing the partitions of topic bar which is already being consumed in the share group.
532544 increasePartitions(admin, " bar" , 6 , Seq .empty)
533545
@@ -547,7 +559,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
547559 new ShareGroupHeartbeatRequestData ()
548560 .setGroupId(" grp" )
549561 .setMemberId(memberId)
550- .setMemberEpoch(3 ),
562+ .setMemberEpoch(5 ),
551563 true
552564 ).build()
553565
@@ -559,7 +571,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
559571 shareGroupHeartbeatResponse.data.assignment.topicPartitions.containsAll(expectedAssignment.topicPartitions)
560572 }, msg = s " Could not update partitions assignment for topic bar. Last response $shareGroupHeartbeatResponse. " )
561573 // Verify the response.
562- assertEquals(4 , shareGroupHeartbeatResponse.data.memberEpoch)
574+ assertEquals(7 , shareGroupHeartbeatResponse.data.memberEpoch)
563575 // Delete the topic foo.
564576 TestUtils .deleteTopicWithAdmin(
565577 admin = admin,
@@ -581,7 +593,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
581593 new ShareGroupHeartbeatRequestData ()
582594 .setGroupId(" grp" )
583595 .setMemberId(memberId)
584- .setMemberEpoch(4 ),
596+ .setMemberEpoch(7 ),
585597 true
586598 ).build()
587599
@@ -593,7 +605,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
593605 shareGroupHeartbeatResponse.data.assignment.topicPartitions.containsAll(expectedAssignment.topicPartitions)
594606 }, msg = s " Could not update partitions assignment for topic foo. Last response $shareGroupHeartbeatResponse. " )
595607 // Verify the response.
596- assertEquals(5 , shareGroupHeartbeatResponse.data.memberEpoch)
608+ assertEquals(8 , shareGroupHeartbeatResponse.data.memberEpoch)
597609 } finally {
598610 admin.close()
599611 }
@@ -704,12 +716,24 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
704716 .setTopicId(barId)
705717 .setPartitions(List [Integer ](0 ).asJava)).asJava)
706718
719+ shareGroupHeartbeatResponse = null
720+
707721 TestUtils .waitUntilTrue(() => {
708722 shareGroupHeartbeatResponse = connectAndReceive(shareGroupHeartbeatRequest)
709- shareGroupHeartbeatResponse.data.errorCode == Errors .NONE .code &&
710- shareGroupHeartbeatResponse.data.assignment != null &&
723+ if (shareGroupHeartbeatResponse.data.assignment != null &&
711724 expectedAssignment.topicPartitions.containsAll(shareGroupHeartbeatResponse.data.assignment.topicPartitions) &&
712- shareGroupHeartbeatResponse.data.assignment.topicPartitions.containsAll(expectedAssignment.topicPartitions)
725+ shareGroupHeartbeatResponse.data.assignment.topicPartitions.containsAll(expectedAssignment.topicPartitions)) {
726+ true
727+ } else {
728+ shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequest .Builder (
729+ new ShareGroupHeartbeatRequestData ()
730+ .setGroupId(" grp" )
731+ .setMemberId(memberId)
732+ .setMemberEpoch(shareGroupHeartbeatResponse.data.memberEpoch),
733+ true
734+ ).build()
735+ false
736+ }
713737 }, msg = s " Could not get bar partitions assigned. Last response $shareGroupHeartbeatResponse. " )
714738
715739 // Verify the response, the epoch should have been bumped.
@@ -840,7 +864,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
840864 shareGroupHeartbeatResponse.data.assignment == expectedAssignment
841865 }, msg = s " Could not get partitions assigned. Last response $shareGroupHeartbeatResponse. " )
842866 // Verify the response.
843- assertEquals(2 , shareGroupHeartbeatResponse.data.memberEpoch)
867+ assertEquals(3 , shareGroupHeartbeatResponse.data.memberEpoch)
844868
845869 // Restart the only running broker.
846870 val broker = cluster.brokers().values().iterator().next()
@@ -864,7 +888,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
864888
865889 // Verify the response. Epoch should not have changed and null assignments determines that no
866890 // change in old assignment.
867- assertEquals(2 , shareGroupHeartbeatResponse.data.memberEpoch)
891+ assertEquals(3 , shareGroupHeartbeatResponse.data.memberEpoch)
868892 assertNull(shareGroupHeartbeatResponse.data.assignment)
869893 } finally {
870894 admin.close()
0 commit comments