diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 2e2b02f9432b1..5ec1ab1d75eb0 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -2968,7 +2968,8 @@ private InitializeShareGroupStateParameters buildInitializeShareGroupStateReques )).build(); } - private void addInitializingTopicsRecords(String groupId, List records, Map> topicPartitionMap) { + // Visibility for tests + void addInitializingTopicsRecords(String groupId, List records, Map> topicPartitionMap) { if (topicPartitionMap == null || topicPartitionMap.isEmpty()) { return; } @@ -2982,12 +2983,23 @@ private void addInitializingTopicsRecords(String groupId, List> finalInitializingMap = mergeShareGroupInitMaps(currentMap.initializingTopics(), topicPartitionMap); + // If any initializing topics are also present in the deleting state + // we should remove them from deleting. + Set currentDeleting = new HashSet<>(currentMap.deletingTopics()); + if (!currentDeleting.isEmpty()) { + finalInitializingMap.keySet().forEach(key -> { + if (currentDeleting.remove(key)) { + log.warn("Initializing topic {} for share group {} found in deleting state as well, removing from deleting.", metadataImage.topics().getTopic(key).name(), groupId); + } + }); + } + records.add( newShareGroupStatePartitionMetadataRecord( groupId, attachTopicName(finalInitializingMap), attachTopicName(currentMap.initializedTopics()), - attachTopicName(currentMap.deletingTopics()) + attachTopicName(currentDeleting) ) ); } @@ -8166,6 +8178,24 @@ public Optional shareGroupBuildPartitionDeleteR shareGroupPartitionMetadata.get(shareGroupId).initializingTopics() ); + // Ideally the deleting should be empty - if it is not then it implies + // that some previous share group delete or delete offsets command + // did not complete successfully. So, set up the delete request such that + // a retry for the same is possible. Since this is part of an admin operation + // retrying delete should not pose issues related to + // performance. Also, the share coordinator is idempotent on delete partitions. + Map> deletingTopics = shareGroupPartitionMetadata.get(shareGroupId).deletingTopics().stream() + .map(tid -> Map.entry(tid, metadataImage.topics().getTopic(tid).partitions().keySet())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + if (!deletingTopics.isEmpty()) { + log.info("Existing deleting entries found in share group {} - {}", shareGroupId, deletingTopics); + deleteCandidates = mergeShareGroupInitMaps( + deleteCandidates, + deletingTopics + ); + } + if (deleteCandidates.isEmpty()) { return Optional.empty(); } @@ -8181,7 +8211,8 @@ public Optional shareGroupBuildPartitionDeleteR )); } - // Remove all initializing and initialized topic info from record and add deleting. + // Remove all initializing and initialized topic info from record and add deleting. There + // could be previous deleting topics due to offsets delete, we need to account for them as well. records.add(GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord( shareGroupId, Map.of(), diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 4665e81e9bee6..459a3f4cbc463 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -129,6 +129,7 @@ import org.apache.kafka.server.authorizer.Action; import org.apache.kafka.server.authorizer.AuthorizationResult; import org.apache.kafka.server.authorizer.Authorizer; +import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.share.persister.DeleteShareGroupStateParameters; import org.apache.kafka.server.share.persister.InitializeShareGroupStateParameters; import org.apache.kafka.server.share.persister.PartitionIdData; @@ -20744,7 +20745,7 @@ barTopicName, new TopicMetadata(barTopicId, barTopicName, 3) } @Test - public void testShareGroupDeleteRequest() { + public void testShareGroupDeleteRequestNoDeletingTopics() { MockPartitionAssignor assignor = new MockPartitionAssignor("range"); assignor.prepareGroupAssignment(new GroupAssignment(Map.of())); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() @@ -20807,6 +20808,74 @@ public void testShareGroupDeleteRequest() { assertRecordsEquals(expectedRecords, records); } + @Test + public void testShareGroupDeleteRequestWithAlreadyDeletingTopics() { + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + assignor.prepareGroupAssignment(new GroupAssignment(Map.of())); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) + .build(); + + Uuid t1Uuid = Uuid.randomUuid(); + Uuid t2Uuid = Uuid.randomUuid(); + Uuid t3Uuid = Uuid.randomUuid(); + String t1Name = "t1"; + String t2Name = "t2"; + String t3Name = "t3"; + + String groupId = "share-group"; + ShareGroup shareGroup = mock(ShareGroup.class); + when(shareGroup.groupId()).thenReturn(groupId); + when(shareGroup.isEmpty()).thenReturn(false); + + MetadataImage image = new MetadataImageBuilder() + .addTopic(t1Uuid, t1Name, 2) + .addTopic(t2Uuid, t2Name, 2) + .addTopic(t3Uuid, t3Name, 2) + .build(); + + MetadataDelta delta = new MetadataDelta(image); + context.groupMetadataManager.onNewMetadataImage(image, delta); + + context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 0)); + + context.replay( + GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord( + groupId, + Map.of(t1Uuid, Map.entry(t1Name, Set.of(0, 1))), + Map.of(t2Uuid, Map.entry(t2Name, Set.of(0, 1))), + Map.of(t3Uuid, t3Name) + ) + ); + + context.commit(); + + Map> expectedTopicPartitionMap = Map.of( + t1Uuid, Set.of(0, 1), + t2Uuid, Set.of(0, 1), + t3Uuid, Set.of(0, 1) + ); + + List expectedRecords = List.of( + newShareGroupStatePartitionMetadataRecord( + groupId, + Map.of(), + Map.of(), + Map.of(t1Uuid, t1Name, t2Uuid, t2Name, t3Uuid, t3Name) // Existing deleting topics should be included here. + ) + ); + + List records = new ArrayList<>(); + Optional params = context.groupMetadataManager.shareGroupBuildPartitionDeleteRequest(groupId, records); + verifyShareGroupDeleteRequest( + params, + expectedTopicPartitionMap, + groupId, + true + ); + assertRecordsEquals(expectedRecords, records); + } + @Test public void testSharePartitionsEligibleForOffsetDeletionSuccess() { MockPartitionAssignor assignor = new MockPartitionAssignor("range"); @@ -21351,6 +21420,69 @@ public void testShareGroupHeartbeatNoPersisterRequestWithInitializing() { ); } + @Test + public void testShareGroupInitializingClearsCommonDeleting() { + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + assignor.prepareGroupAssignment(new GroupAssignment(Map.of())); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withShareGroupAssignor(assignor) + .build(); + + Uuid t1Uuid = Uuid.randomUuid(); + String t1Name = "t1"; + MetadataImage image = new MetadataImageBuilder() + .addTopic(t1Uuid, t1Name, 2) + .build(); + + String groupId = "share-group"; + + context.groupMetadataManager.onNewMetadataImage(image, mock(MetadataDelta.class)); + context.groupMetadataManager.replay( + new ShareGroupMetadataKey() + .setGroupId(groupId), + new ShareGroupMetadataValue() + .setEpoch(0) + ); + + // Replay a deleting record. + context.groupMetadataManager.replay( + new ShareGroupStatePartitionMetadataKey() + .setGroupId(groupId), + new ShareGroupStatePartitionMetadataValue() + .setInitializingTopics(List.of()) + .setInitializedTopics(List.of()) + .setDeletingTopics(List.of( + new ShareGroupStatePartitionMetadataValue.TopicInfo() + .setTopicId(t1Uuid) + .setTopicName(t1Name) + )) + ); + + List records = new ArrayList<>(); + context.groupMetadataManager.addInitializingTopicsRecords(groupId, records, Map.of(t1Uuid, Set.of(0, 1))); + + List expectedRecords = List.of( + CoordinatorRecord.record( + new ShareGroupStatePartitionMetadataKey() + .setGroupId(groupId), + new ApiMessageAndVersion( + new ShareGroupStatePartitionMetadataValue() + .setInitializingTopics(List.of( + new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo() + .setTopicId(t1Uuid) + .setTopicName(t1Name) + .setPartitions(List.of(0, 1)) + )) + .setInitializedTopics(List.of()) + .setDeletingTopics(List.of()), + (short) 0 + ) + ) + ); + + assertEquals(expectedRecords, records); + } + @Test public void testShareGroupInitializeSuccess() { String groupId = "groupId";