Skip to content

Commit cdbb8fa

Browse files
authored
KAFKA-18629: Account for existing deleting topics in share group delete. (#19463)
* When deleting share groups, only initialized and initializing information in the metadata record is considered. However, it could happen that the deleting topics also contains info due to other RPCs (share group offsets delete). * We need to account for existing information while writing the metadata record in the delete flow. * This PR aims to add the impl for the same. New tests have been added to check the functionality. Reviewers: Andrew Schofield <[email protected]>
1 parent c527530 commit cdbb8fa

File tree

2 files changed

+167
-4
lines changed

2 files changed

+167
-4
lines changed

Diff for: group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java

+34-3
Original file line numberDiff line numberDiff line change
@@ -2968,7 +2968,8 @@ private InitializeShareGroupStateParameters buildInitializeShareGroupStateReques
29682968
)).build();
29692969
}
29702970

2971-
private void addInitializingTopicsRecords(String groupId, List<CoordinatorRecord> records, Map<Uuid, Set<Integer>> topicPartitionMap) {
2971+
// Visibility for tests
2972+
void addInitializingTopicsRecords(String groupId, List<CoordinatorRecord> records, Map<Uuid, Set<Integer>> topicPartitionMap) {
29722973
if (topicPartitionMap == null || topicPartitionMap.isEmpty()) {
29732974
return;
29742975
}
@@ -2982,12 +2983,23 @@ private void addInitializingTopicsRecords(String groupId, List<CoordinatorRecord
29822983
// We must combine the existing information in the record with the topicPartitionMap argument.
29832984
Map<Uuid, Set<Integer>> finalInitializingMap = mergeShareGroupInitMaps(currentMap.initializingTopics(), topicPartitionMap);
29842985

2986+
// If any initializing topics are also present in the deleting state
2987+
// we should remove them from deleting.
2988+
Set<Uuid> currentDeleting = new HashSet<>(currentMap.deletingTopics());
2989+
if (!currentDeleting.isEmpty()) {
2990+
finalInitializingMap.keySet().forEach(key -> {
2991+
if (currentDeleting.remove(key)) {
2992+
log.warn("Initializing topic {} for share group {} found in deleting state as well, removing from deleting.", metadataImage.topics().getTopic(key).name(), groupId);
2993+
}
2994+
});
2995+
}
2996+
29852997
records.add(
29862998
newShareGroupStatePartitionMetadataRecord(
29872999
groupId,
29883000
attachTopicName(finalInitializingMap),
29893001
attachTopicName(currentMap.initializedTopics()),
2990-
attachTopicName(currentMap.deletingTopics())
3002+
attachTopicName(currentDeleting)
29913003
)
29923004
);
29933005
}
@@ -8166,6 +8178,24 @@ public Optional<DeleteShareGroupStateParameters> shareGroupBuildPartitionDeleteR
81668178
shareGroupPartitionMetadata.get(shareGroupId).initializingTopics()
81678179
);
81688180

8181+
// Ideally the deleting should be empty - if it is not then it implies
8182+
// that some previous share group delete or delete offsets command
8183+
// did not complete successfully. So, set up the delete request such that
8184+
// a retry for the same is possible. Since this is part of an admin operation
8185+
// retrying delete should not pose issues related to
8186+
// performance. Also, the share coordinator is idempotent on delete partitions.
8187+
Map<Uuid, Set<Integer>> deletingTopics = shareGroupPartitionMetadata.get(shareGroupId).deletingTopics().stream()
8188+
.map(tid -> Map.entry(tid, metadataImage.topics().getTopic(tid).partitions().keySet()))
8189+
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
8190+
8191+
if (!deletingTopics.isEmpty()) {
8192+
log.info("Existing deleting entries found in share group {} - {}", shareGroupId, deletingTopics);
8193+
deleteCandidates = mergeShareGroupInitMaps(
8194+
deleteCandidates,
8195+
deletingTopics
8196+
);
8197+
}
8198+
81698199
if (deleteCandidates.isEmpty()) {
81708200
return Optional.empty();
81718201
}
@@ -8181,7 +8211,8 @@ public Optional<DeleteShareGroupStateParameters> shareGroupBuildPartitionDeleteR
81818211
));
81828212
}
81838213

8184-
// Remove all initializing and initialized topic info from record and add deleting.
8214+
// Remove all initializing and initialized topic info from record and add deleting. There
8215+
// could be previous deleting topics due to offsets delete, we need to account for them as well.
81858216
records.add(GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(
81868217
shareGroupId,
81878218
Map.of(),

Diff for: group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java

+133-1
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@
129129
import org.apache.kafka.server.authorizer.Action;
130130
import org.apache.kafka.server.authorizer.AuthorizationResult;
131131
import org.apache.kafka.server.authorizer.Authorizer;
132+
import org.apache.kafka.server.common.ApiMessageAndVersion;
132133
import org.apache.kafka.server.share.persister.DeleteShareGroupStateParameters;
133134
import org.apache.kafka.server.share.persister.InitializeShareGroupStateParameters;
134135
import org.apache.kafka.server.share.persister.PartitionIdData;
@@ -20744,7 +20745,7 @@ barTopicName, new TopicMetadata(barTopicId, barTopicName, 3)
2074420745
}
2074520746

2074620747
@Test
20747-
public void testShareGroupDeleteRequest() {
20748+
public void testShareGroupDeleteRequestNoDeletingTopics() {
2074820749
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
2074920750
assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
2075020751
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
@@ -20807,6 +20808,74 @@ public void testShareGroupDeleteRequest() {
2080720808
assertRecordsEquals(expectedRecords, records);
2080820809
}
2080920810

20811+
@Test
20812+
public void testShareGroupDeleteRequestWithAlreadyDeletingTopics() {
20813+
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
20814+
assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
20815+
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
20816+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor))
20817+
.build();
20818+
20819+
Uuid t1Uuid = Uuid.randomUuid();
20820+
Uuid t2Uuid = Uuid.randomUuid();
20821+
Uuid t3Uuid = Uuid.randomUuid();
20822+
String t1Name = "t1";
20823+
String t2Name = "t2";
20824+
String t3Name = "t3";
20825+
20826+
String groupId = "share-group";
20827+
ShareGroup shareGroup = mock(ShareGroup.class);
20828+
when(shareGroup.groupId()).thenReturn(groupId);
20829+
when(shareGroup.isEmpty()).thenReturn(false);
20830+
20831+
MetadataImage image = new MetadataImageBuilder()
20832+
.addTopic(t1Uuid, t1Name, 2)
20833+
.addTopic(t2Uuid, t2Name, 2)
20834+
.addTopic(t3Uuid, t3Name, 2)
20835+
.build();
20836+
20837+
MetadataDelta delta = new MetadataDelta(image);
20838+
context.groupMetadataManager.onNewMetadataImage(image, delta);
20839+
20840+
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 0));
20841+
20842+
context.replay(
20843+
GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(
20844+
groupId,
20845+
Map.of(t1Uuid, Map.entry(t1Name, Set.of(0, 1))),
20846+
Map.of(t2Uuid, Map.entry(t2Name, Set.of(0, 1))),
20847+
Map.of(t3Uuid, t3Name)
20848+
)
20849+
);
20850+
20851+
context.commit();
20852+
20853+
Map<Uuid, Set<Integer>> expectedTopicPartitionMap = Map.of(
20854+
t1Uuid, Set.of(0, 1),
20855+
t2Uuid, Set.of(0, 1),
20856+
t3Uuid, Set.of(0, 1)
20857+
);
20858+
20859+
List<CoordinatorRecord> expectedRecords = List.of(
20860+
newShareGroupStatePartitionMetadataRecord(
20861+
groupId,
20862+
Map.of(),
20863+
Map.of(),
20864+
Map.of(t1Uuid, t1Name, t2Uuid, t2Name, t3Uuid, t3Name) // Existing deleting topics should be included here.
20865+
)
20866+
);
20867+
20868+
List<CoordinatorRecord> records = new ArrayList<>();
20869+
Optional<DeleteShareGroupStateParameters> params = context.groupMetadataManager.shareGroupBuildPartitionDeleteRequest(groupId, records);
20870+
verifyShareGroupDeleteRequest(
20871+
params,
20872+
expectedTopicPartitionMap,
20873+
groupId,
20874+
true
20875+
);
20876+
assertRecordsEquals(expectedRecords, records);
20877+
}
20878+
2081020879
@Test
2081120880
public void testSharePartitionsEligibleForOffsetDeletionSuccess() {
2081220881
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
@@ -21351,6 +21420,69 @@ public void testShareGroupHeartbeatNoPersisterRequestWithInitializing() {
2135121420
);
2135221421
}
2135321422

21423+
@Test
21424+
public void testShareGroupInitializingClearsCommonDeleting() {
21425+
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
21426+
assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
21427+
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
21428+
.withShareGroupAssignor(assignor)
21429+
.build();
21430+
21431+
Uuid t1Uuid = Uuid.randomUuid();
21432+
String t1Name = "t1";
21433+
MetadataImage image = new MetadataImageBuilder()
21434+
.addTopic(t1Uuid, t1Name, 2)
21435+
.build();
21436+
21437+
String groupId = "share-group";
21438+
21439+
context.groupMetadataManager.onNewMetadataImage(image, mock(MetadataDelta.class));
21440+
context.groupMetadataManager.replay(
21441+
new ShareGroupMetadataKey()
21442+
.setGroupId(groupId),
21443+
new ShareGroupMetadataValue()
21444+
.setEpoch(0)
21445+
);
21446+
21447+
// Replay a deleting record.
21448+
context.groupMetadataManager.replay(
21449+
new ShareGroupStatePartitionMetadataKey()
21450+
.setGroupId(groupId),
21451+
new ShareGroupStatePartitionMetadataValue()
21452+
.setInitializingTopics(List.of())
21453+
.setInitializedTopics(List.of())
21454+
.setDeletingTopics(List.of(
21455+
new ShareGroupStatePartitionMetadataValue.TopicInfo()
21456+
.setTopicId(t1Uuid)
21457+
.setTopicName(t1Name)
21458+
))
21459+
);
21460+
21461+
List<CoordinatorRecord> records = new ArrayList<>();
21462+
context.groupMetadataManager.addInitializingTopicsRecords(groupId, records, Map.of(t1Uuid, Set.of(0, 1)));
21463+
21464+
List<CoordinatorRecord> expectedRecords = List.of(
21465+
CoordinatorRecord.record(
21466+
new ShareGroupStatePartitionMetadataKey()
21467+
.setGroupId(groupId),
21468+
new ApiMessageAndVersion(
21469+
new ShareGroupStatePartitionMetadataValue()
21470+
.setInitializingTopics(List.of(
21471+
new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
21472+
.setTopicId(t1Uuid)
21473+
.setTopicName(t1Name)
21474+
.setPartitions(List.of(0, 1))
21475+
))
21476+
.setInitializedTopics(List.of())
21477+
.setDeletingTopics(List.of()),
21478+
(short) 0
21479+
)
21480+
)
21481+
);
21482+
21483+
assertEquals(expectedRecords, records);
21484+
}
21485+
2135421486
@Test
2135521487
public void testShareGroupInitializeSuccess() {
2135621488
String groupId = "groupId";

0 commit comments

Comments
 (0)