Skip to content

Commit 8ef757f

Browse files
committed
KAFKA-18629: Account for existing deleting topics in share group delete.
1 parent fc25436 commit 8ef757f

File tree

2 files changed

+73
-3
lines changed

2 files changed

+73
-3
lines changed

Diff for: group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -8178,12 +8178,15 @@ public Optional<DeleteShareGroupStateParameters> shareGroupBuildPartitionDeleteR
81788178
));
81798179
}
81808180

8181-
// Remove all initializing and initialized topic info from record and add deleting.
8181+
// Remove all initializing and initialized topic info from record and add deleting. There
8182+
// could be previous deleting topics due to offsets delete, we need to account for them as well.
8183+
Set<Uuid> finalDeleteSet = new HashSet<>(shareGroupPartitionMetadata.get(shareGroupId).deletingTopics());
8184+
finalDeleteSet.addAll(deleteCandidates.keySet());
81828185
records.add(GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(
81838186
shareGroupId,
81848187
Map.of(),
81858188
Map.of(),
8186-
attachTopicName(deleteCandidates.keySet())
8189+
attachTopicName(finalDeleteSet)
81878190
));
81888191

81898192
return Optional.of(new DeleteShareGroupStateParameters.Builder()

Diff for: group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java

+68-1
Original file line numberDiff line numberDiff line change
@@ -20741,7 +20741,7 @@ barTopicName, new TopicMetadata(barTopicId, barTopicName, 3)
2074120741
}
2074220742

2074320743
@Test
20744-
public void testShareGroupDeleteRequest() {
20744+
public void testShareGroupDeleteRequestNoDeletingTopics() {
2074520745
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
2074620746
assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
2074720747
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
@@ -20804,6 +20804,73 @@ public void testShareGroupDeleteRequest() {
2080420804
assertRecordsEquals(expectedRecords, records);
2080520805
}
2080620806

20807+
@Test
20808+
public void testShareGroupDeleteRequestWithAlreadyDeletingTopics() {
20809+
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
20810+
assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
20811+
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
20812+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor))
20813+
.build();
20814+
20815+
Uuid t1Uuid = Uuid.randomUuid();
20816+
Uuid t2Uuid = Uuid.randomUuid();
20817+
Uuid t3Uuid = Uuid.randomUuid();
20818+
String t1Name = "t1";
20819+
String t2Name = "t2";
20820+
String t3Name = "t3";
20821+
20822+
String groupId = "share-group";
20823+
ShareGroup shareGroup = mock(ShareGroup.class);
20824+
when(shareGroup.groupId()).thenReturn(groupId);
20825+
when(shareGroup.isEmpty()).thenReturn(false);
20826+
20827+
MetadataImage image = new MetadataImageBuilder()
20828+
.addTopic(t1Uuid, t1Name, 2)
20829+
.addTopic(t2Uuid, t2Name, 2)
20830+
.addTopic(t3Uuid, t3Name, 2)
20831+
.build();
20832+
20833+
MetadataDelta delta = new MetadataDelta(image);
20834+
context.groupMetadataManager.onNewMetadataImage(image, delta);
20835+
20836+
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 0));
20837+
20838+
context.replay(
20839+
GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(
20840+
groupId,
20841+
Map.of(t1Uuid, Map.entry(t1Name, Set.of(0, 1))),
20842+
Map.of(t2Uuid, Map.entry(t2Name, Set.of(0, 1))),
20843+
Map.of(t3Uuid, t3Name)
20844+
)
20845+
);
20846+
20847+
context.commit();
20848+
20849+
Map<Uuid, Set<Integer>> expectedTopicPartitionMap = Map.of(
20850+
t1Uuid, Set.of(0, 1),
20851+
t2Uuid, Set.of(0, 1)
20852+
);
20853+
20854+
List<CoordinatorRecord> expectedRecords = List.of(
20855+
newShareGroupStatePartitionMetadataRecord(
20856+
groupId,
20857+
Map.of(),
20858+
Map.of(),
20859+
Map.of(t1Uuid, t1Name, t2Uuid, t2Name, t3Uuid, t3Name) // Existing deleting topics should be included here.
20860+
)
20861+
);
20862+
20863+
List<CoordinatorRecord> records = new ArrayList<>();
20864+
Optional<DeleteShareGroupStateParameters> params = context.groupMetadataManager.shareGroupBuildPartitionDeleteRequest(groupId, records);
20865+
verifyShareGroupDeleteRequest(
20866+
params,
20867+
expectedTopicPartitionMap,
20868+
groupId,
20869+
true
20870+
);
20871+
assertRecordsEquals(expectedRecords, records);
20872+
}
20873+
2080720874
@Test
2080820875
public void testShareGroupHeartbeatInitializeOnPartitionUpdate() {
2080920876
MockPartitionAssignor assignor = new MockPartitionAssignor("range");

0 commit comments

Comments
 (0)