From 8ef757f189e0431ee3396d5a650cfa232abc065b Mon Sep 17 00:00:00 2001 From: Sushant Mahajan Date: Mon, 14 Apr 2025 16:09:46 +0530 Subject: [PATCH 1/6] KAFKA-18629: Account for existing deleting topics in share group delete. --- .../group/GroupMetadataManager.java | 7 +- .../group/GroupMetadataManagerTest.java | 69 ++++++++++++++++++- 2 files changed, 73 insertions(+), 3 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index f8a114d999f5d..639ce0d81a830 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -8178,12 +8178,15 @@ public Optional shareGroupBuildPartitionDeleteR )); } - // Remove all initializing and initialized topic info from record and add deleting. + // Remove all initializing and initialized topic info from record and add deleting. There + // could be previous deleting topics due to offsets delete, we need to account for them as well. + Set finalDeleteSet = new HashSet<>(shareGroupPartitionMetadata.get(shareGroupId).deletingTopics()); + finalDeleteSet.addAll(deleteCandidates.keySet()); records.add(GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord( shareGroupId, Map.of(), Map.of(), - attachTopicName(deleteCandidates.keySet()) + attachTopicName(finalDeleteSet) )); return Optional.of(new DeleteShareGroupStateParameters.Builder() diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 35574d35ca324..571f8a614cd81 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -20741,7 +20741,7 @@ barTopicName, new TopicMetadata(barTopicId, barTopicName, 3) } @Test - public void testShareGroupDeleteRequest() { + public void testShareGroupDeleteRequestNoDeletingTopics() { MockPartitionAssignor assignor = new MockPartitionAssignor("range"); assignor.prepareGroupAssignment(new GroupAssignment(Map.of())); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() @@ -20804,6 +20804,73 @@ public void testShareGroupDeleteRequest() { assertRecordsEquals(expectedRecords, records); } + @Test + public void testShareGroupDeleteRequestWithAlreadyDeletingTopics() { + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + assignor.prepareGroupAssignment(new GroupAssignment(Map.of())); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) + .build(); + + Uuid t1Uuid = Uuid.randomUuid(); + Uuid t2Uuid = Uuid.randomUuid(); + Uuid t3Uuid = Uuid.randomUuid(); + String t1Name = "t1"; + String t2Name = "t2"; + String t3Name = "t3"; + + String groupId = "share-group"; + ShareGroup shareGroup = mock(ShareGroup.class); + when(shareGroup.groupId()).thenReturn(groupId); + when(shareGroup.isEmpty()).thenReturn(false); + + MetadataImage image = new MetadataImageBuilder() + .addTopic(t1Uuid, t1Name, 2) + .addTopic(t2Uuid, t2Name, 2) + .addTopic(t3Uuid, t3Name, 2) + .build(); + + MetadataDelta delta = new MetadataDelta(image); + context.groupMetadataManager.onNewMetadataImage(image, delta); + + context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 0)); + + context.replay( + GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord( + groupId, + Map.of(t1Uuid, Map.entry(t1Name, Set.of(0, 1))), + Map.of(t2Uuid, Map.entry(t2Name, Set.of(0, 1))), + Map.of(t3Uuid, t3Name) + ) + ); + + context.commit(); + + Map> expectedTopicPartitionMap = Map.of( + t1Uuid, Set.of(0, 1), + t2Uuid, Set.of(0, 1) + ); + + List expectedRecords = List.of( + newShareGroupStatePartitionMetadataRecord( + groupId, + Map.of(), + Map.of(), + Map.of(t1Uuid, t1Name, t2Uuid, t2Name, t3Uuid, t3Name) // Existing deleting topics should be included here. + ) + ); + + List records = new ArrayList<>(); + Optional params = context.groupMetadataManager.shareGroupBuildPartitionDeleteRequest(groupId, records); + verifyShareGroupDeleteRequest( + params, + expectedTopicPartitionMap, + groupId, + true + ); + assertRecordsEquals(expectedRecords, records); + } + @Test public void testShareGroupHeartbeatInitializeOnPartitionUpdate() { MockPartitionAssignor assignor = new MockPartitionAssignor("range"); From 61e5c73943130f056fefa072b6817e8a4624c77b Mon Sep 17 00:00:00 2001 From: Sushant Mahajan Date: Tue, 15 Apr 2025 11:44:22 +0530 Subject: [PATCH 2/6] add deleting topics to share group delete so errors are retried. --- .../coordinator/group/GroupMetadataManager.java | 17 +++++++++++++++++ .../group/GroupMetadataManagerTest.java | 3 ++- 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 059ed5f744870..bddbe629c90d1 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -8166,6 +8166,23 @@ public Optional shareGroupBuildPartitionDeleteR shareGroupPartitionMetadata.get(shareGroupId).initializingTopics() ); + // Ideally the deleting should be empty - if it is not then it implies + // then some previous share group delete or delete offsets command + // did not complete successfully. So, set up the delete request such that + // a retry for the same is possible. Since this is part of admin operation + // deleting entries in already deleted should not pose issues related to + // performance. Also, the share coordinator is idempotent on delete partitions. + Map> deletingTopics = shareGroupPartitionMetadata.get(shareGroupId).deletingTopics().stream() + .map(tid -> Map.entry(tid, metadataImage.topics().getTopic(tid).partitions().keySet())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + if (!deletingTopics.isEmpty()) { + deleteCandidates = mergeShareGroupInitMaps( + deleteCandidates, + deletingTopics + ); + } + if (deleteCandidates.isEmpty()) { return Optional.empty(); } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 62ff435258515..6c9fd9941418b 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -20851,7 +20851,8 @@ public void testShareGroupDeleteRequestWithAlreadyDeletingTopics() { Map> expectedTopicPartitionMap = Map.of( t1Uuid, Set.of(0, 1), - t2Uuid, Set.of(0, 1) + t2Uuid, Set.of(0, 1), + t3Uuid, Set.of(0, 1) ); List expectedRecords = List.of( From d8bed207c5d395e66e5b6344edc858ed50d09a66 Mon Sep 17 00:00:00 2001 From: Sushant Mahajan Date: Tue, 15 Apr 2025 11:51:40 +0530 Subject: [PATCH 3/6] fixed typo --- .../kafka/coordinator/group/GroupMetadataManager.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index bddbe629c90d1..72b0a97298fc9 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -8167,10 +8167,10 @@ public Optional shareGroupBuildPartitionDeleteR ); // Ideally the deleting should be empty - if it is not then it implies - // then some previous share group delete or delete offsets command + // that some previous share group delete or delete offsets command // did not complete successfully. So, set up the delete request such that - // a retry for the same is possible. Since this is part of admin operation - // deleting entries in already deleted should not pose issues related to + // a retry for the same is possible. Since this is part of an admin operation + // retrying delete should not pose issues related to // performance. Also, the share coordinator is idempotent on delete partitions. Map> deletingTopics = shareGroupPartitionMetadata.get(shareGroupId).deletingTopics().stream() .map(tid -> Map.entry(tid, metadataImage.topics().getTopic(tid).partitions().keySet())) From 8aa39648c876cc8df946d0dc0a4af39361f02af4 Mon Sep 17 00:00:00 2001 From: Sushant Mahajan Date: Tue, 15 Apr 2025 11:54:38 +0530 Subject: [PATCH 4/6] add log msg for deleting --- .../org/apache/kafka/coordinator/group/GroupMetadataManager.java | 1 + 1 file changed, 1 insertion(+) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 72b0a97298fc9..c63aeae4e8141 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -8177,6 +8177,7 @@ public Optional shareGroupBuildPartitionDeleteR .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); if (!deletingTopics.isEmpty()) { + log.info("Existing deleting entries found in share group {} - {}", shareGroupId, deletingTopics); deleteCandidates = mergeShareGroupInitMaps( deleteCandidates, deletingTopics From 2ddb2c1562fcfe61fb52c943761f6f3336b75d4f Mon Sep 17 00:00:00 2001 From: Sushant Mahajan Date: Tue, 15 Apr 2025 14:56:33 +0530 Subject: [PATCH 5/6] remove from deleting when initializing --- .../group/GroupMetadataManager.java | 16 ++++- .../group/GroupMetadataManagerTest.java | 64 +++++++++++++++++++ 2 files changed, 78 insertions(+), 2 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index c63aeae4e8141..a4e947091d913 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -2968,7 +2968,8 @@ private InitializeShareGroupStateParameters buildInitializeShareGroupStateReques )).build(); } - private void addInitializingTopicsRecords(String groupId, List records, Map> topicPartitionMap) { + // Visibility for tests + void addInitializingTopicsRecords(String groupId, List records, Map> topicPartitionMap) { if (topicPartitionMap == null || topicPartitionMap.isEmpty()) { return; } @@ -2982,12 +2983,23 @@ private void addInitializingTopicsRecords(String groupId, List> finalInitializingMap = mergeShareGroupInitMaps(currentMap.initializingTopics(), topicPartitionMap); + // If any initializing topics are also present in the deleting state + // we should remove them from deleting. + Set currentDeleting = new HashSet<>(currentMap.deletingTopics()); + if (!currentDeleting.isEmpty()) { + finalInitializingMap.keySet().forEach(key -> { + if (currentDeleting.remove(key)) { + log.warn("Initializing topic {} for share group {} found in deleting state as well, removing from deleting.", metadataImage.topics().getTopic(key).name(), groupId); + } + }); + } + records.add( newShareGroupStatePartitionMetadataRecord( groupId, attachTopicName(finalInitializingMap), attachTopicName(currentMap.initializedTopics()), - attachTopicName(currentMap.deletingTopics()) + attachTopicName(currentDeleting) ) ); } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 6c9fd9941418b..459a3f4cbc463 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -129,6 +129,7 @@ import org.apache.kafka.server.authorizer.Action; import org.apache.kafka.server.authorizer.AuthorizationResult; import org.apache.kafka.server.authorizer.Authorizer; +import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.share.persister.DeleteShareGroupStateParameters; import org.apache.kafka.server.share.persister.InitializeShareGroupStateParameters; import org.apache.kafka.server.share.persister.PartitionIdData; @@ -21419,6 +21420,69 @@ public void testShareGroupHeartbeatNoPersisterRequestWithInitializing() { ); } + @Test + public void testShareGroupInitializingClearsCommonDeleting() { + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + assignor.prepareGroupAssignment(new GroupAssignment(Map.of())); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withShareGroupAssignor(assignor) + .build(); + + Uuid t1Uuid = Uuid.randomUuid(); + String t1Name = "t1"; + MetadataImage image = new MetadataImageBuilder() + .addTopic(t1Uuid, t1Name, 2) + .build(); + + String groupId = "share-group"; + + context.groupMetadataManager.onNewMetadataImage(image, mock(MetadataDelta.class)); + context.groupMetadataManager.replay( + new ShareGroupMetadataKey() + .setGroupId(groupId), + new ShareGroupMetadataValue() + .setEpoch(0) + ); + + // Replay a deleting record. + context.groupMetadataManager.replay( + new ShareGroupStatePartitionMetadataKey() + .setGroupId(groupId), + new ShareGroupStatePartitionMetadataValue() + .setInitializingTopics(List.of()) + .setInitializedTopics(List.of()) + .setDeletingTopics(List.of( + new ShareGroupStatePartitionMetadataValue.TopicInfo() + .setTopicId(t1Uuid) + .setTopicName(t1Name) + )) + ); + + List records = new ArrayList<>(); + context.groupMetadataManager.addInitializingTopicsRecords(groupId, records, Map.of(t1Uuid, Set.of(0, 1))); + + List expectedRecords = List.of( + CoordinatorRecord.record( + new ShareGroupStatePartitionMetadataKey() + .setGroupId(groupId), + new ApiMessageAndVersion( + new ShareGroupStatePartitionMetadataValue() + .setInitializingTopics(List.of( + new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo() + .setTopicId(t1Uuid) + .setTopicName(t1Name) + .setPartitions(List.of(0, 1)) + )) + .setInitializedTopics(List.of()) + .setDeletingTopics(List.of()), + (short) 0 + ) + ) + ); + + assertEquals(expectedRecords, records); + } + @Test public void testShareGroupInitializeSuccess() { String groupId = "groupId"; From 6d39b91bc4ba842024dc9c1e9142cddfa83261c8 Mon Sep 17 00:00:00 2001 From: Sushant Mahajan Date: Tue, 15 Apr 2025 15:40:44 +0530 Subject: [PATCH 6/6] remove redundant code --- .../apache/kafka/coordinator/group/GroupMetadataManager.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index a4e947091d913..5ec1ab1d75eb0 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -8213,13 +8213,11 @@ public Optional shareGroupBuildPartitionDeleteR // Remove all initializing and initialized topic info from record and add deleting. There // could be previous deleting topics due to offsets delete, we need to account for them as well. - Set finalDeleteSet = new HashSet<>(shareGroupPartitionMetadata.get(shareGroupId).deletingTopics()); - finalDeleteSet.addAll(deleteCandidates.keySet()); records.add(GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord( shareGroupId, Map.of(), Map.of(), - attachTopicName(finalDeleteSet) + attachTopicName(deleteCandidates.keySet()) )); return Optional.of(new DeleteShareGroupStateParameters.Builder()