Skip to content

KAFKA-18629: Account for existing deleting topics in share group delete. #19463

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Apr 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2968,7 +2968,8 @@ private InitializeShareGroupStateParameters buildInitializeShareGroupStateReques
)).build();
}

private void addInitializingTopicsRecords(String groupId, List<CoordinatorRecord> records, Map<Uuid, Set<Integer>> topicPartitionMap) {
// Visibility for tests
void addInitializingTopicsRecords(String groupId, List<CoordinatorRecord> records, Map<Uuid, Set<Integer>> topicPartitionMap) {
if (topicPartitionMap == null || topicPartitionMap.isEmpty()) {
return;
}
Expand All @@ -2982,12 +2983,23 @@ private void addInitializingTopicsRecords(String groupId, List<CoordinatorRecord
// We must combine the existing information in the record with the topicPartitionMap argument.
Map<Uuid, Set<Integer>> finalInitializingMap = mergeShareGroupInitMaps(currentMap.initializingTopics(), topicPartitionMap);

// If any initializing topics are also present in the deleting state
// we should remove them from deleting.
Set<Uuid> currentDeleting = new HashSet<>(currentMap.deletingTopics());
if (!currentDeleting.isEmpty()) {
finalInitializingMap.keySet().forEach(key -> {
if (currentDeleting.remove(key)) {
log.warn("Initializing topic {} for share group {} found in deleting state as well, removing from deleting.", metadataImage.topics().getTopic(key).name(), groupId);
}
});
}

records.add(
newShareGroupStatePartitionMetadataRecord(
groupId,
attachTopicName(finalInitializingMap),
attachTopicName(currentMap.initializedTopics()),
attachTopicName(currentMap.deletingTopics())
attachTopicName(currentDeleting)
)
);
}
Expand Down Expand Up @@ -8166,6 +8178,24 @@ public Optional<DeleteShareGroupStateParameters> shareGroupBuildPartitionDeleteR
shareGroupPartitionMetadata.get(shareGroupId).initializingTopics()
);

// Ideally the deleting should be empty - if it is not then it implies
// that some previous share group delete or delete offsets command
// did not complete successfully. So, set up the delete request such that
// a retry for the same is possible. Since this is part of an admin operation
// retrying delete should not pose issues related to
// performance. Also, the share coordinator is idempotent on delete partitions.
Map<Uuid, Set<Integer>> deletingTopics = shareGroupPartitionMetadata.get(shareGroupId).deletingTopics().stream()
.map(tid -> Map.entry(tid, metadataImage.topics().getTopic(tid).partitions().keySet()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

if (!deletingTopics.isEmpty()) {
log.info("Existing deleting entries found in share group {} - {}", shareGroupId, deletingTopics);
deleteCandidates = mergeShareGroupInitMaps(
deleteCandidates,
deletingTopics
);
}

if (deleteCandidates.isEmpty()) {
return Optional.empty();
}
Expand All @@ -8181,7 +8211,8 @@ public Optional<DeleteShareGroupStateParameters> shareGroupBuildPartitionDeleteR
));
}

// Remove all initializing and initialized topic info from record and add deleting.
// Remove all initializing and initialized topic info from record and add deleting. There
// could be previous deleting topics due to offsets delete, we need to account for them as well.
records.add(GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(
shareGroupId,
Map.of(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@
import org.apache.kafka.server.authorizer.Action;
import org.apache.kafka.server.authorizer.AuthorizationResult;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.share.persister.DeleteShareGroupStateParameters;
import org.apache.kafka.server.share.persister.InitializeShareGroupStateParameters;
import org.apache.kafka.server.share.persister.PartitionIdData;
Expand Down Expand Up @@ -20744,7 +20745,7 @@ barTopicName, new TopicMetadata(barTopicId, barTopicName, 3)
}

@Test
public void testShareGroupDeleteRequest() {
public void testShareGroupDeleteRequestNoDeletingTopics() {
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
Expand Down Expand Up @@ -20807,6 +20808,74 @@ public void testShareGroupDeleteRequest() {
assertRecordsEquals(expectedRecords, records);
}

@Test
public void testShareGroupDeleteRequestWithAlreadyDeletingTopics() {
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor))
.build();

Uuid t1Uuid = Uuid.randomUuid();
Uuid t2Uuid = Uuid.randomUuid();
Uuid t3Uuid = Uuid.randomUuid();
String t1Name = "t1";
String t2Name = "t2";
String t3Name = "t3";

String groupId = "share-group";
ShareGroup shareGroup = mock(ShareGroup.class);
when(shareGroup.groupId()).thenReturn(groupId);
when(shareGroup.isEmpty()).thenReturn(false);

MetadataImage image = new MetadataImageBuilder()
.addTopic(t1Uuid, t1Name, 2)
.addTopic(t2Uuid, t2Name, 2)
.addTopic(t3Uuid, t3Name, 2)
.build();

MetadataDelta delta = new MetadataDelta(image);
context.groupMetadataManager.onNewMetadataImage(image, delta);

context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 0));

context.replay(
GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(
groupId,
Map.of(t1Uuid, Map.entry(t1Name, Set.of(0, 1))),
Map.of(t2Uuid, Map.entry(t2Name, Set.of(0, 1))),
Map.of(t3Uuid, t3Name)
)
);

context.commit();

Map<Uuid, Set<Integer>> expectedTopicPartitionMap = Map.of(
t1Uuid, Set.of(0, 1),
t2Uuid, Set.of(0, 1),
t3Uuid, Set.of(0, 1)
);

List<CoordinatorRecord> expectedRecords = List.of(
newShareGroupStatePartitionMetadataRecord(
groupId,
Map.of(),
Map.of(),
Map.of(t1Uuid, t1Name, t2Uuid, t2Name, t3Uuid, t3Name) // Existing deleting topics should be included here.
)
);

List<CoordinatorRecord> records = new ArrayList<>();
Optional<DeleteShareGroupStateParameters> params = context.groupMetadataManager.shareGroupBuildPartitionDeleteRequest(groupId, records);
verifyShareGroupDeleteRequest(
params,
expectedTopicPartitionMap,
groupId,
true
);
assertRecordsEquals(expectedRecords, records);
}

@Test
public void testSharePartitionsEligibleForOffsetDeletionSuccess() {
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
Expand Down Expand Up @@ -21351,6 +21420,69 @@ public void testShareGroupHeartbeatNoPersisterRequestWithInitializing() {
);
}

@Test
public void testShareGroupInitializingClearsCommonDeleting() {
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withShareGroupAssignor(assignor)
.build();

Uuid t1Uuid = Uuid.randomUuid();
String t1Name = "t1";
MetadataImage image = new MetadataImageBuilder()
.addTopic(t1Uuid, t1Name, 2)
.build();

String groupId = "share-group";

context.groupMetadataManager.onNewMetadataImage(image, mock(MetadataDelta.class));
context.groupMetadataManager.replay(
new ShareGroupMetadataKey()
.setGroupId(groupId),
new ShareGroupMetadataValue()
.setEpoch(0)
);

// Replay a deleting record.
context.groupMetadataManager.replay(
new ShareGroupStatePartitionMetadataKey()
.setGroupId(groupId),
new ShareGroupStatePartitionMetadataValue()
.setInitializingTopics(List.of())
.setInitializedTopics(List.of())
.setDeletingTopics(List.of(
new ShareGroupStatePartitionMetadataValue.TopicInfo()
.setTopicId(t1Uuid)
.setTopicName(t1Name)
))
);

List<CoordinatorRecord> records = new ArrayList<>();
context.groupMetadataManager.addInitializingTopicsRecords(groupId, records, Map.of(t1Uuid, Set.of(0, 1)));

List<CoordinatorRecord> expectedRecords = List.of(
CoordinatorRecord.record(
new ShareGroupStatePartitionMetadataKey()
.setGroupId(groupId),
new ApiMessageAndVersion(
new ShareGroupStatePartitionMetadataValue()
.setInitializingTopics(List.of(
new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
.setTopicId(t1Uuid)
.setTopicName(t1Name)
.setPartitions(List.of(0, 1))
))
.setInitializedTopics(List.of())
.setDeletingTopics(List.of()),
(short) 0
)
)
);

assertEquals(expectedRecords, records);
}

@Test
public void testShareGroupInitializeSuccess() {
String groupId = "groupId";
Expand Down