Skip to content

Commit 6d9ba76

Browse files
authored
MINOR: Various small cleanups in group-coordinator (#21450)
Reviewers: David Jacot <djacot@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
1 parent a603bf1 commit 6d9ba76

File tree

8 files changed

+61
-102
lines changed

8 files changed

+61
-102
lines changed

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ public static Set<String> configNames() {
199199
*/
200200
public static void validateNames(Properties props) {
201201
Set<String> names = configNames();
202-
for (Object name : props.keySet()) {
202+
for (String name : props.stringPropertyNames()) {
203203
if (!names.contains(name)) {
204204
throw new InvalidConfigurationException("Unknown group config name: " + name);
205205
}

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java

Lines changed: 37 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -1289,8 +1289,8 @@ private void convertToClassicGroup(
12891289
metadataImage
12901290
);
12911291
} catch (SchemaException e) {
1292-
log.warn("Cannot downgrade the consumer group " + consumerGroup.groupId() + ": fail to parse " +
1293-
"the Consumer Protocol " + ConsumerProtocol.PROTOCOL_TYPE + ".", e);
1292+
log.warn("Cannot downgrade the consumer group {}: fail to parse the Consumer Protocol {}.",
1293+
consumerGroup.groupId(), ConsumerProtocol.PROTOCOL_TYPE, e);
12941294

12951295
throw new GroupIdNotFoundException(String.format("Cannot downgrade the classic group %s: %s.",
12961296
consumerGroup.groupId(), e.getMessage()));
@@ -1370,16 +1370,14 @@ ConsumerGroup convertToConsumerGroup(ClassicGroup classicGroup, List<Coordinator
13701370
metadataImage
13711371
);
13721372
} catch (SchemaException e) {
1373-
log.warn("Cannot upgrade classic group " + classicGroup.groupId() +
1374-
" to consumer group because the embedded consumer protocol is malformed: "
1375-
+ e.getMessage() + ".", e);
1373+
log.warn("Cannot upgrade classic group {} to consumer group because the embedded consumer protocol is malformed: {}.",
1374+
classicGroup.groupId(), e.getMessage(), e);
13761375

13771376
throw new GroupIdNotFoundException(
13781377
String.format("Cannot upgrade classic group %s to consumer group because the embedded consumer protocol is malformed.", classicGroup.groupId())
13791378
);
13801379
} catch (UnsupportedVersionException e) {
1381-
log.warn("Cannot upgrade classic group " + classicGroup.groupId() +
1382-
" to consumer group: " + e.getMessage() + ".", e);
1380+
log.warn("Cannot upgrade classic group {} to consumer group: {}.", classicGroup.groupId(), e.getMessage(), e);
13831381

13841382
throw new GroupIdNotFoundException(
13851383
String.format("Cannot upgrade classic group %s to consumer group because an unsupported custom assignor is in use. " +
@@ -3530,7 +3528,7 @@ private boolean hasStreamsMemberMetadataChanged(
35303528
String memberId = updatedMember.memberId();
35313529
if (!updatedMember.equals(member)) {
35323530
records.add(newStreamsGroupMemberRecord(groupId, updatedMember));
3533-
log.info("[GroupId {}][MemberId {}] Member updated its member metdata to {}.",
3531+
log.info("[GroupId {}][MemberId {}] Member updated its member metadata to {}.",
35343532
groupId, memberId, updatedMember);
35353533

35363534
return true;
@@ -4027,7 +4025,7 @@ private CoordinatorResult<Void, CoordinatorRecord> computeDelayedTargetAssignmen
40274025
throw new IllegalStateException("Group epoch should be always larger to assignment epoch");
40284026
}
40294027

4030-
if (!group.configuredTopology().isPresent()) {
4028+
if (group.configuredTopology().isEmpty()) {
40314029
log.warn("[GroupId {}] Cannot compute delayed target assignment: configured topology is not present", groupId);
40324030
return EMPTY_RESULT;
40334031
}
@@ -4350,9 +4348,8 @@ private <T> CoordinatorResult<T, CoordinatorRecord> streamsGroupFenceMember(
43504348
StreamsGroupMember member,
43514349
T response
43524350
) {
4353-
List<CoordinatorRecord> records = new ArrayList<>();
43544351

4355-
records.addAll(removeStreamsMember(group.groupId(), member.memberId()));
4352+
List<CoordinatorRecord> records = new ArrayList<>(removeStreamsMember(group.groupId(), member.memberId()));
43564353

43574354
// We bump the group epoch.
43584355
int groupEpoch = group.groupEpoch() + 1;
@@ -7287,30 +7284,26 @@ private CoordinatorResult<Void, CoordinatorRecord> expirePendingSync(
72877284
* @return whether the group can accept a joining member.
72887285
*/
72897286
private boolean acceptJoiningMember(ClassicGroup group, String memberId) {
7290-
switch (group.currentState()) {
7291-
case EMPTY:
7292-
case DEAD:
7287+
return switch (group.currentState()) {
7288+
case EMPTY, DEAD ->
72937289
// Always accept the request when the group is empty or dead
7294-
return true;
7295-
case PREPARING_REBALANCE:
7290+
true;
7291+
case PREPARING_REBALANCE ->
72967292
// An existing member is accepted if it is already awaiting. New members are accepted
72977293
// up to the max group size. Note that the number of awaiting members is used here
72987294
// for two reasons:
72997295
// 1) the group size is not reliable as it could already be above the max group size
73007296
// if the max group size was reduced.
73017297
// 2) using the number of awaiting members allows to kick out the last rejoining
73027298
// members of the group.
7303-
return (group.hasMember(memberId) && group.member(memberId).isAwaitingJoin()) ||
7304-
group.numAwaitingJoinResponse() < config.classicGroupMaxSize();
7305-
case COMPLETING_REBALANCE:
7306-
case STABLE:
7299+
(group.hasMember(memberId) && group.member(memberId).isAwaitingJoin()) ||
7300+
group.numAwaitingJoinResponse() < config.classicGroupMaxSize();
7301+
case COMPLETING_REBALANCE, STABLE ->
73077302
// An existing member is accepted. New members are accepted up to the max group size.
73087303
// Note that the group size is used here. When the group transitions to CompletingRebalance,
73097304
// members who haven't rejoined are removed.
7310-
return group.hasMember(memberId) || group.numMembers() < config.classicGroupMaxSize();
7311-
default:
7312-
throw new IllegalStateException("Unknown group state: " + group.stateAsString());
7313-
}
7305+
group.hasMember(memberId) || group.numMembers() < config.classicGroupMaxSize();
7306+
};
73147307
}
73157308

73167309
/**
@@ -7645,24 +7638,12 @@ private byte[] prepareAssignment(ConsumerGroupMember member) {
76457638

76467639
// Visible for testing
76477640
static Errors appendGroupMetadataErrorToResponseError(Errors appendError) {
7648-
switch (appendError) {
7649-
case UNKNOWN_TOPIC_OR_PARTITION:
7650-
case NOT_ENOUGH_REPLICAS:
7651-
case REQUEST_TIMED_OUT:
7652-
return COORDINATOR_NOT_AVAILABLE;
7653-
7654-
case NOT_LEADER_OR_FOLLOWER:
7655-
case KAFKA_STORAGE_ERROR:
7656-
return NOT_COORDINATOR;
7657-
7658-
case MESSAGE_TOO_LARGE:
7659-
case RECORD_LIST_TOO_LARGE:
7660-
case INVALID_FETCH_SIZE:
7661-
return UNKNOWN_SERVER_ERROR;
7662-
7663-
default:
7664-
return appendError;
7665-
}
7641+
return switch (appendError) {
7642+
case UNKNOWN_TOPIC_OR_PARTITION, NOT_ENOUGH_REPLICAS, REQUEST_TIMED_OUT -> COORDINATOR_NOT_AVAILABLE;
7643+
case NOT_LEADER_OR_FOLLOWER, KAFKA_STORAGE_ERROR -> NOT_COORDINATOR;
7644+
case MESSAGE_TOO_LARGE, RECORD_LIST_TOO_LARGE, INVALID_FETCH_SIZE -> UNKNOWN_SERVER_ERROR;
7645+
default -> appendError;
7646+
};
76667647
}
76677648

76687649
private Optional<Errors> validateSyncGroup(
@@ -7768,35 +7749,31 @@ private CoordinatorResult<HeartbeatResponseData, CoordinatorRecord> classicGroup
77687749
) {
77697750
validateClassicGroupHeartbeat(group, request.memberId(), request.groupInstanceId(), request.generationId());
77707751

7771-
switch (group.currentState()) {
7772-
case EMPTY:
7773-
return new CoordinatorResult<>(
7774-
List.of(),
7775-
new HeartbeatResponseData().setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
7776-
);
7777-
7778-
case PREPARING_REBALANCE:
7752+
return switch (group.currentState()) {
7753+
case EMPTY -> new CoordinatorResult<>(
7754+
List.of(),
7755+
new HeartbeatResponseData().setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
7756+
);
7757+
case PREPARING_REBALANCE -> {
77797758
rescheduleClassicGroupMemberHeartbeat(group, group.member(request.memberId()));
7780-
return new CoordinatorResult<>(
7759+
yield new CoordinatorResult<>(
77817760
List.of(),
77827761
new HeartbeatResponseData().setErrorCode(Errors.REBALANCE_IN_PROGRESS.code())
77837762
);
7784-
7785-
case COMPLETING_REBALANCE:
7786-
case STABLE:
7763+
}
7764+
case COMPLETING_REBALANCE, STABLE -> {
77877765
// Consumers may start sending heartbeats after join-group response, while the group
77887766
// is in CompletingRebalance state. In this case, we should treat them as
77897767
// normal heartbeat requests and reset the timer
77907768
rescheduleClassicGroupMemberHeartbeat(group, group.member(request.memberId()));
7791-
return new CoordinatorResult<>(
7769+
yield new CoordinatorResult<>(
77927770
List.of(),
77937771
new HeartbeatResponseData()
77947772
);
7795-
7796-
default:
7797-
throw new IllegalStateException("Reached unexpected state " +
7773+
}
7774+
default -> throw new IllegalStateException("Reached unexpected state " +
77987775
group.currentState() + " for group " + group.groupId());
7799-
}
7776+
};
78007777
}
78017778

78027779
/**
@@ -8652,7 +8629,7 @@ private boolean maybeDeleteEmptyStreamsGroup(String groupId, List<CoordinatorRec
86528629
* Checks whether the given protocol type or name in the request is inconsistent with the group's.
86538630
*
86548631
* @param protocolTypeOrName The request's protocol type or name.
8655-
* @param groupProtocolTypeOrName The group's protoocl type or name.
8632+
* @param groupProtocolTypeOrName The group's protocol type or name.
86568633
*
86578634
* @return True if protocol is inconsistent, false otherwise.
86588635
*/

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,12 @@ public static class DeadlineAndEpoch {
188188
*/
189189
private final TimelineObject<Optional<ConfiguredTopology>> configuredTopology;
190190

191+
/**
192+
* The last used assignment configurations for this streams group.
193+
* This is used to determine when assignment configuration changes should trigger a rebalance.
194+
*/
195+
private final TimelineHashMap<String, String> lastAssignmentConfigs;
196+
191197
/**
192198
* The metadata refresh deadline. It consists of a timestamp in milliseconds together with the group epoch at the time of setting it.
193199
* The metadata refresh time is considered as a soft state (read that it is not stored in a timeline data structure). It is like this
@@ -211,12 +217,6 @@ public static class DeadlineAndEpoch {
211217
*/
212218
private int endpointInformationEpoch = 0;
213219

214-
/**
215-
* The last used assignment configurations for this streams group.
216-
* This is used to determine when assignment configuration changes should trigger a rebalance.
217-
*/
218-
private TimelineHashMap<String, String> lastAssignmentConfigs;
219-
220220
public StreamsGroup(
221221
LogContext logContext,
222222
SnapshotRegistry snapshotRegistry,

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ private boolean assignStandbyToMemberWithLeastLoad(PriorityQueue<ProcessState> q
277277
* @param taskId The taskId, to check if the previous member already has the task. Can be null, if we assign it
278278
* for the first time (e.g., during active task assignment).
279279
*
280-
* @return Previous member with the least load that deoes not have the task, or null if no such member exists.
280+
* @return Previous member with the least load that does not have the task, or null if no such member exists.
281281
*/
282282
private Member findPrevMemberWithLeastLoad(final ArrayList<Member> members, final TaskId taskId) {
283283
if (members == null || members.isEmpty()) {

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public static ConfiguredTopology configureTopics(LogContext logContext,
7979
throwOnMissingSourceTopics(topology, metadataImage);
8080

8181
Map<String, Integer> decidedPartitionCountsForInternalTopics =
82-
decidePartitionCounts(logContext, topology, metadataImage, copartitionGroupsBySubtopology, log);
82+
decidePartitionCounts(logContext, topology, metadataImage, copartitionGroupsBySubtopology);
8383

8484
final SortedMap<String, ConfiguredSubtopology> configuredSubtopologies =
8585
subtopologies.stream()
@@ -146,8 +146,7 @@ private static void throwOnMissingSourceTopics(final StreamsTopology topology,
146146
private static Map<String, Integer> decidePartitionCounts(final LogContext logContext,
147147
final StreamsTopology topology,
148148
final CoordinatorMetadataImage metadataImage,
149-
final Map<String, Collection<Set<String>>> copartitionGroupsBySubtopology,
150-
final Logger log) {
149+
final Map<String, Collection<Set<String>>> copartitionGroupsBySubtopology) {
151150
final Map<String, Integer> decidedPartitionCountsForInternalTopics = new HashMap<>();
152151
final Function<String, OptionalInt> topicPartitionCountProvider =
153152
topic -> getPartitionCount(metadataImage, topic, decidedPartitionCountsForInternalTopics);

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3899,7 +3899,7 @@ public void testDescribeShareGroupOffsetsWithDefaultPersisterReadSummaryPartitio
38993899
}
39003900

39013901
@Test
3902-
public void testDescribeShareGroupOffsetsWithDefaultPersisterLatestOffsetThrowsError() throws InterruptedException, ExecutionException {
3902+
public void testDescribeShareGroupOffsetsWithDefaultPersisterLatestOffsetThrowsError() {
39033903
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
39043904
Persister persister = mock(DefaultStatePersister.class);
39053905

@@ -4034,7 +4034,7 @@ public void testDescribeShareGroupOffsetsWithDefaultPersisterLatestOffsetReturns
40344034
}
40354035

40364036
@Test
4037-
public void testDescribeShareGroupAllOffsetsLatestOffsetError() throws InterruptedException, ExecutionException {
4037+
public void testDescribeShareGroupAllOffsetsLatestOffsetError() {
40384038
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
40394039
Persister persister = mock(DefaultStatePersister.class);
40404040

@@ -5880,11 +5880,6 @@ public void testPersisterInitializeForAlterShareGroupOffsetsResponseSuccess() {
58805880
verify(mockPersister, times(1)).initializeState(ArgumentMatchers.any());
58815881
}
58825882

5883-
@FunctionalInterface
5884-
private interface TriFunction<A, B, C, R> {
5885-
R apply(A a, B b, C c);
5886-
}
5887-
58885883
private static class GroupCoordinatorServiceBuilder {
58895884
private final LogContext logContext = new LogContext();
58905885
private final GroupConfigManager configManager = createConfigManager();

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4347,7 +4347,7 @@ public void testSessionTimeoutExpiration() {
43474347

43484348
// Verify the expired timeout.
43494349
assertEquals(
4350-
List.of(new ExpiredTimeout<CoordinatorRecord>(
4350+
List.of(new ExpiredTimeout<>(
43514351
groupSessionTimeoutKey(groupId, memberId),
43524352
new CoordinatorResult<>(
43534353
List.of(
@@ -4412,7 +4412,7 @@ fooTopicName, computeTopicHash(fooTopicName, new KRaftCoordinatorMetadataImage(m
44124412
// Verify the expired timeout.
44134413
assertEquals(
44144414
List.of(
4415-
new ExpiredTimeout<CoordinatorRecord>(
4415+
new ExpiredTimeout<>(
44164416
groupSessionTimeoutKey(groupId, memberId),
44174417
new CoordinatorResult<>(
44184418
List.of(
@@ -4473,7 +4473,7 @@ public void testSessionTimeoutExpirationForShareMember() {
44734473

44744474
// Verify the expired timeout.
44754475
assertEquals(
4476-
List.of(new ExpiredTimeout<CoordinatorRecord>(
4476+
List.of(new ExpiredTimeout<>(
44774477
groupSessionTimeoutKey(groupId, memberId),
44784478
new CoordinatorResult<>(
44794479
List.of(
@@ -4536,7 +4536,7 @@ fooTopicName, computeTopicHash(fooTopicName, new KRaftCoordinatorMetadataImage(m
45364536
// Verify the expired timeout.
45374537
assertEquals(
45384538
List.of(
4539-
new ExpiredTimeout<CoordinatorRecord>(
4539+
new ExpiredTimeout<>(
45404540
groupSessionTimeoutKey(groupId, memberId),
45414541
new CoordinatorResult<>(
45424542
List.of(
@@ -4616,7 +4616,7 @@ public void testSessionTimeoutExpirationStaticMember() {
46164616

46174617
// Verify the expired timeout.
46184618
assertEquals(
4619-
List.of(new ExpiredTimeout<CoordinatorRecord>(
4619+
List.of(new ExpiredTimeout<>(
46204620
groupSessionTimeoutKey(groupId, memberId),
46214621
new CoordinatorResult<>(
46224622
List.of(
@@ -4894,7 +4894,7 @@ memberId2, new MemberAssignmentImpl(mkAssignment(
48944894

48954895
// Verify the expired timeout.
48964896
assertEquals(
4897-
List.of(new ExpiredTimeout<CoordinatorRecord>(
4897+
List.of(new ExpiredTimeout<>(
48984898
groupRebalanceTimeoutKey(groupId, memberId1),
48994899
new CoordinatorResult<>(
49004900
List.of(
@@ -8039,7 +8039,7 @@ public void testStaticMemberRejoinWithUnknownMemberIdAndChangeOfProtocolWhileSel
80398039
}
80408040

80418041
@Test
8042-
public void testStaticMemberRejoinWithKnownLeaderIdToTriggerRebalanceAndFollowerWithChangeofProtocol()
8042+
public void testStaticMemberRejoinWithKnownLeaderIdToTriggerRebalanceAndFollowerWithChangeOfProtocol()
80438043
throws Exception {
80448044

80458045
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
@@ -19344,7 +19344,7 @@ fooTopicName, computeTopicHash(fooTopicName, metadataImage)
1934419344

1934519345
// Verify the expired timeout.
1934619346
assertEquals(
19347-
List.of(new ExpiredTimeout<CoordinatorRecord>(
19347+
List.of(new ExpiredTimeout<>(
1934819348
groupSessionTimeoutKey(groupId, memberId),
1934919349
new CoordinatorResult<>(
1935019350
List.of(
@@ -19653,7 +19653,7 @@ fooTopicName, computeTopicHash(fooTopicName, metadataImage)
1965319653

1965419654
// Verify the expired timeout.
1965519655
assertEquals(
19656-
List.of(new ExpiredTimeout<CoordinatorRecord>(
19656+
List.of(new ExpiredTimeout<>(
1965719657
groupRebalanceTimeoutKey(groupId, memberId1),
1965819658
new CoordinatorResult<>(
1965919659
List.of(
@@ -23509,7 +23509,7 @@ public void testResolvedRegularExpressionsRemovedWhenMembersLeaveOrFenced() {
2350923509

2351023510
// Verify the expired timeout.
2351123511
assertEquals(
23512-
List.of(new ExpiredTimeout<CoordinatorRecord>(
23512+
List.of(new ExpiredTimeout<>(
2351323513
groupSessionTimeoutKey(groupId, memberId2),
2351423514
new CoordinatorResult<>(
2351523515
List.of(
@@ -25038,7 +25038,7 @@ public void testUninitializeTopics() {
2503825038

2503925039
context.groupMetadataManager.onMetadataUpdate(image.emptyDelta(), image);
2504025040

25041-
// Cleanup happens from initialzing state only.
25041+
// Cleanup happens from initializing state only.
2504225042
context.groupMetadataManager.replay(
2504325043
new ShareGroupMetadataKey()
2504425044
.setGroupId(groupId),
@@ -25582,7 +25582,7 @@ public void testReplayStreamsGroupCurrentMemberAssignmentUnownedTopologyWithComp
2558225582

2558325583
// Verify foo-0 is unassigned and bar-0 is assigned to member A.
2558425584
StreamsGroup group = context.groupMetadataManager.streamsGroup(groupId);
25585-
assertEquals(null, group.currentActiveTaskProcessId(subtopologyFoo, 0));
25585+
assertNull(group.currentActiveTaskProcessId(subtopologyFoo, 0));
2558625586
assertEquals(processIdA, group.currentActiveTaskProcessId(subtopologyBar, 0));
2558725587
}
2558825588

0 commit comments

Comments
 (0)