From cb5209ecafb712ae281d5cb748ce5c79db80efc1 Mon Sep 17 00:00:00 2001 From: Armando Soriano Date: Wed, 29 Jan 2025 10:31:25 +0100 Subject: [PATCH] feat(clients): MemberToRemove accepting memberId field on constructor so it is used to build up the MemberIdentity, making it possible to use removeMembersFromConsumerGroup method from admin client to disconnect a member (denoted by memberId) from the given consumer group (denoted by groupInstanceId) --- .../org/apache/kafka/clients/admin/MemberToRemove.java | 9 ++++++++- .../RemoveMembersFromConsumerGroupResultTest.java | 10 +++++++++- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/MemberToRemove.java b/clients/src/main/java/org/apache/kafka/clients/admin/MemberToRemove.java index 5ca5463d3f285..7c52315ed1c96 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/MemberToRemove.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/MemberToRemove.java @@ -26,9 +26,16 @@ */ public class MemberToRemove { private final String groupInstanceId; + private final String memberId; public MemberToRemove(String groupInstanceId) { this.groupInstanceId = groupInstanceId; + this.memberId = null; + } + + public MemberToRemove(String groupInstanceId, String memberId) { + this.groupInstanceId = groupInstanceId; + this.memberId = memberId; } @Override @@ -49,7 +56,7 @@ public int hashCode() { MemberIdentity toMemberIdentity() { return new MemberIdentity() .setGroupInstanceId(groupInstanceId) - .setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID); + .setMemberId(memberId != null ? memberId : JoinGroupRequest.UNKNOWN_MEMBER_ID); } public String groupInstanceId() { diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupResultTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupResultTest.java index 1b0ac0ba0c462..da5f0ef002e92 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupResultTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupResultTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.JoinGroupRequest; import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.BeforeEach; @@ -32,6 +33,7 @@ import java.util.Set; import java.util.concurrent.ExecutionException; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -39,7 +41,7 @@ public class RemoveMembersFromConsumerGroupResultTest { private final MemberToRemove instanceOne = new MemberToRemove("instance-1"); - private final MemberToRemove instanceTwo = new MemberToRemove("instance-2"); + private final MemberToRemove instanceTwo = new MemberToRemove("instance-2", "member-id"); private Set membersToRemove; private Map errorsMap; @@ -105,6 +107,12 @@ public void testNoErrorConstructor() throws ExecutionException, InterruptedExcep assertNull(noErrorResult.memberResult(instanceTwo).get()); } + @Test + public void testMemberIdentityMemberId() { + assertEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, instanceOne.toMemberIdentity().memberId()); + assertEquals("member-id", instanceTwo.toMemberIdentity().memberId()); + } + private RemoveMembersFromConsumerGroupResult createAndVerifyMemberLevelError() throws InterruptedException, ExecutionException { memberFutures.complete(errorsMap); assertFalse(memberFutures.isCompletedExceptionally());