Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ public static Set<String> configNames() {
*/
public static void validateNames(Properties props) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider refactoring the call path to use Map instead. I didn't see the specific benefit of using Properties here

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can address that in a follow-up

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to do that change let's do it in a separate PR. The caller path comes all the way from ControllerConfigurationValidator which uses Properties for all resource types.

Set<String> names = configNames();
for (Object name : props.keySet()) {
for (String name : props.stringPropertyNames()) {
if (!names.contains(name)) {
throw new InvalidConfigurationException("Unknown group config name: " + name);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1289,8 +1289,8 @@ private void convertToClassicGroup(
metadataImage
);
} catch (SchemaException e) {
log.warn("Cannot downgrade the consumer group " + consumerGroup.groupId() + ": fail to parse " +
"the Consumer Protocol " + ConsumerProtocol.PROTOCOL_TYPE + ".", e);
log.warn("Cannot downgrade the consumer group {}: fail to parse the Consumer Protocol {}.",
consumerGroup.groupId(), ConsumerProtocol.PROTOCOL_TYPE, e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove the unnecessary indent

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


throw new GroupIdNotFoundException(String.format("Cannot downgrade the classic group %s: %s.",
consumerGroup.groupId(), e.getMessage()));
Expand Down Expand Up @@ -1370,16 +1370,14 @@ ConsumerGroup convertToConsumerGroup(ClassicGroup classicGroup, List<Coordinator
metadataImage
);
} catch (SchemaException e) {
log.warn("Cannot upgrade classic group " + classicGroup.groupId() +
" to consumer group because the embedded consumer protocol is malformed: "
+ e.getMessage() + ".", e);
log.warn("Cannot upgrade classic group {} to consumer group because the embedded consumer protocol is malformed: {}.",
classicGroup.groupId(), e.getMessage(), e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


throw new GroupIdNotFoundException(
String.format("Cannot upgrade classic group %s to consumer group because the embedded consumer protocol is malformed.", classicGroup.groupId())
);
} catch (UnsupportedVersionException e) {
log.warn("Cannot upgrade classic group " + classicGroup.groupId() +
" to consumer group: " + e.getMessage() + ".", e);
log.warn("Cannot upgrade classic group {} to consumer group: {}.", classicGroup.groupId(), e.getMessage(), e);

throw new GroupIdNotFoundException(
String.format("Cannot upgrade classic group %s to consumer group because an unsupported custom assignor is in use. " +
Expand Down Expand Up @@ -3530,7 +3528,7 @@ private boolean hasStreamsMemberMetadataChanged(
String memberId = updatedMember.memberId();
if (!updatedMember.equals(member)) {
records.add(newStreamsGroupMemberRecord(groupId, updatedMember));
log.info("[GroupId {}][MemberId {}] Member updated its member metdata to {}.",
log.info("[GroupId {}][MemberId {}] Member updated its member metadata to {}.",
groupId, memberId, updatedMember);

return true;
Expand Down Expand Up @@ -4027,7 +4025,7 @@ private CoordinatorResult<Void, CoordinatorRecord> computeDelayedTargetAssignmen
throw new IllegalStateException("Group epoch should be always larger to assignment epoch");
}

if (!group.configuredTopology().isPresent()) {
if (group.configuredTopology().isEmpty()) {
log.warn("[GroupId {}] Cannot compute delayed target assignment: configured topology is not present", groupId);
return EMPTY_RESULT;
}
Expand Down Expand Up @@ -4350,9 +4348,8 @@ private <T> CoordinatorResult<T, CoordinatorRecord> streamsGroupFenceMember(
StreamsGroupMember member,
T response
) {
List<CoordinatorRecord> records = new ArrayList<>();

records.addAll(removeStreamsMember(group.groupId(), member.memberId()));
List<CoordinatorRecord> records = new ArrayList<>(removeStreamsMember(group.groupId(), member.memberId()));

// We bump the group epoch.
int groupEpoch = group.groupEpoch() + 1;
Expand Down Expand Up @@ -7287,30 +7284,26 @@ private CoordinatorResult<Void, CoordinatorRecord> expirePendingSync(
* @return whether the group can accept a joining member.
*/
private boolean acceptJoiningMember(ClassicGroup group, String memberId) {
switch (group.currentState()) {
case EMPTY:
case DEAD:
return switch (group.currentState()) {
case EMPTY, DEAD ->
// Always accept the request when the group is empty or dead
return true;
case PREPARING_REBALANCE:
true;
case PREPARING_REBALANCE ->
// An existing member is accepted if it is already awaiting. New members are accepted
// up to the max group size. Note that the number of awaiting members is used here
// for two reasons:
// 1) the group size is not reliable as it could already be above the max group size
// if the max group size was reduced.
// 2) using the number of awaiting members allows to kick out the last rejoining
// members of the group.
return (group.hasMember(memberId) && group.member(memberId).isAwaitingJoin()) ||
group.numAwaitingJoinResponse() < config.classicGroupMaxSize();
case COMPLETING_REBALANCE:
case STABLE:
(group.hasMember(memberId) && group.member(memberId).isAwaitingJoin()) ||
group.numAwaitingJoinResponse() < config.classicGroupMaxSize();
case COMPLETING_REBALANCE, STABLE ->
// An existing member is accepted. New members are accepted up to the max group size.
// Note that the group size is used here. When the group transitions to CompletingRebalance,
// members who haven't rejoined are removed.
return group.hasMember(memberId) || group.numMembers() < config.classicGroupMaxSize();
default:
throw new IllegalStateException("Unknown group state: " + group.stateAsString());
}
group.hasMember(memberId) || group.numMembers() < config.classicGroupMaxSize();
};
}

/**
Expand Down Expand Up @@ -7645,24 +7638,12 @@ private byte[] prepareAssignment(ConsumerGroupMember member) {

// Visible for testing
static Errors appendGroupMetadataErrorToResponseError(Errors appendError) {
switch (appendError) {
case UNKNOWN_TOPIC_OR_PARTITION:
case NOT_ENOUGH_REPLICAS:
case REQUEST_TIMED_OUT:
return COORDINATOR_NOT_AVAILABLE;

case NOT_LEADER_OR_FOLLOWER:
case KAFKA_STORAGE_ERROR:
return NOT_COORDINATOR;

case MESSAGE_TOO_LARGE:
case RECORD_LIST_TOO_LARGE:
case INVALID_FETCH_SIZE:
return UNKNOWN_SERVER_ERROR;

default:
return appendError;
}
return switch (appendError) {
case UNKNOWN_TOPIC_OR_PARTITION, NOT_ENOUGH_REPLICAS, REQUEST_TIMED_OUT -> COORDINATOR_NOT_AVAILABLE;
case NOT_LEADER_OR_FOLLOWER, KAFKA_STORAGE_ERROR -> NOT_COORDINATOR;
case MESSAGE_TOO_LARGE, RECORD_LIST_TOO_LARGE, INVALID_FETCH_SIZE -> UNKNOWN_SERVER_ERROR;
default -> appendError;
};
}

private Optional<Errors> validateSyncGroup(
Expand Down Expand Up @@ -7768,35 +7749,31 @@ private CoordinatorResult<HeartbeatResponseData, CoordinatorRecord> classicGroup
) {
validateClassicGroupHeartbeat(group, request.memberId(), request.groupInstanceId(), request.generationId());

switch (group.currentState()) {
case EMPTY:
return new CoordinatorResult<>(
List.of(),
new HeartbeatResponseData().setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
);

case PREPARING_REBALANCE:
return switch (group.currentState()) {
case EMPTY -> new CoordinatorResult<>(
List.of(),
new HeartbeatResponseData().setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
);
case PREPARING_REBALANCE -> {
rescheduleClassicGroupMemberHeartbeat(group, group.member(request.memberId()));
return new CoordinatorResult<>(
yield new CoordinatorResult<>(
List.of(),
new HeartbeatResponseData().setErrorCode(Errors.REBALANCE_IN_PROGRESS.code())
);

case COMPLETING_REBALANCE:
case STABLE:
}
case COMPLETING_REBALANCE, STABLE -> {
// Consumers may start sending heartbeats after join-group response, while the group
// is in CompletingRebalance state. In this case, we should treat them as
// normal heartbeat requests and reset the timer
rescheduleClassicGroupMemberHeartbeat(group, group.member(request.memberId()));
return new CoordinatorResult<>(
yield new CoordinatorResult<>(
List.of(),
new HeartbeatResponseData()
);

default:
throw new IllegalStateException("Reached unexpected state " +
}
default -> throw new IllegalStateException("Reached unexpected state " +
group.currentState() + " for group " + group.groupId());
}
};
}

/**
Expand Down Expand Up @@ -8652,7 +8629,7 @@ private boolean maybeDeleteEmptyStreamsGroup(String groupId, List<CoordinatorRec
* Checks whether the given protocol type or name in the request is inconsistent with the group's.
*
* @param protocolTypeOrName The request's protocol type or name.
* @param groupProtocolTypeOrName The group's protoocl type or name.
* @param groupProtocolTypeOrName The group's protocol type or name.
*
* @return True if protocol is inconsistent, false otherwise.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,12 @@ public static class DeadlineAndEpoch {
*/
private final TimelineObject<Optional<ConfiguredTopology>> configuredTopology;

/**
* The last used assignment configurations for this streams group.
* This is used to determine when assignment configuration changes should trigger a rebalance.
*/
private final TimelineHashMap<String, String> lastAssignmentConfigs;

/**
* The metadata refresh deadline. It consists of a timestamp in milliseconds together with the group epoch at the time of setting it.
* The metadata refresh time is considered as a soft state (read that it is not stored in a timeline data structure). It is like this
Expand All @@ -211,12 +217,6 @@ public static class DeadlineAndEpoch {
*/
private int endpointInformationEpoch = 0;

/**
* The last used assignment configurations for this streams group.
* This is used to determine when assignment configuration changes should trigger a rebalance.
*/
private TimelineHashMap<String, String> lastAssignmentConfigs;

public StreamsGroup(
LogContext logContext,
SnapshotRegistry snapshotRegistry,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ private boolean assignStandbyToMemberWithLeastLoad(PriorityQueue<ProcessState> q
* @param taskId The taskId, to check if the previous member already has the task. Can be null, if we assign it
* for the first time (e.g., during active task assignment).
*
* @return Previous member with the least load that deoes not have the task, or null if no such member exists.
* @return Previous member with the least load that does not have the task, or null if no such member exists.
*/
private Member findPrevMemberWithLeastLoad(final ArrayList<Member> members, final TaskId taskId) {
if (members == null || members.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public static ConfiguredTopology configureTopics(LogContext logContext,
throwOnMissingSourceTopics(topology, metadataImage);

Map<String, Integer> decidedPartitionCountsForInternalTopics =
decidePartitionCounts(logContext, topology, metadataImage, copartitionGroupsBySubtopology, log);
decidePartitionCounts(logContext, topology, metadataImage, copartitionGroupsBySubtopology);

final SortedMap<String, ConfiguredSubtopology> configuredSubtopologies =
subtopologies.stream()
Expand Down Expand Up @@ -146,8 +146,7 @@ private static void throwOnMissingSourceTopics(final StreamsTopology topology,
private static Map<String, Integer> decidePartitionCounts(final LogContext logContext,
final StreamsTopology topology,
final CoordinatorMetadataImage metadataImage,
final Map<String, Collection<Set<String>>> copartitionGroupsBySubtopology,
final Logger log) {
final Map<String, Collection<Set<String>>> copartitionGroupsBySubtopology) {
final Map<String, Integer> decidedPartitionCountsForInternalTopics = new HashMap<>();
final Function<String, OptionalInt> topicPartitionCountProvider =
topic -> getPartitionCount(metadataImage, topic, decidedPartitionCountsForInternalTopics);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3899,7 +3899,7 @@ public void testDescribeShareGroupOffsetsWithDefaultPersisterReadSummaryPartitio
}

@Test
public void testDescribeShareGroupOffsetsWithDefaultPersisterLatestOffsetThrowsError() throws InterruptedException, ExecutionException {
public void testDescribeShareGroupOffsetsWithDefaultPersisterLatestOffsetThrowsError() {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
Persister persister = mock(DefaultStatePersister.class);

Expand Down Expand Up @@ -4034,7 +4034,7 @@ public void testDescribeShareGroupOffsetsWithDefaultPersisterLatestOffsetReturns
}

@Test
public void testDescribeShareGroupAllOffsetsLatestOffsetError() throws InterruptedException, ExecutionException {
public void testDescribeShareGroupAllOffsetsLatestOffsetError() {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
Persister persister = mock(DefaultStatePersister.class);

Expand Down Expand Up @@ -5880,11 +5880,6 @@ public void testPersisterInitializeForAlterShareGroupOffsetsResponseSuccess() {
verify(mockPersister, times(1)).initializeState(ArgumentMatchers.any());
}

@FunctionalInterface
private interface TriFunction<A, B, C, R> {
R apply(A a, B b, C c);
}

private static class GroupCoordinatorServiceBuilder {
private final LogContext logContext = new LogContext();
private final GroupConfigManager configManager = createConfigManager();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4347,7 +4347,7 @@ public void testSessionTimeoutExpiration() {

// Verify the expired timeout.
assertEquals(
List.of(new ExpiredTimeout<CoordinatorRecord>(
List.of(new ExpiredTimeout<>(
groupSessionTimeoutKey(groupId, memberId),
new CoordinatorResult<>(
List.of(
Expand Down Expand Up @@ -4412,7 +4412,7 @@ fooTopicName, computeTopicHash(fooTopicName, new KRaftCoordinatorMetadataImage(m
// Verify the expired timeout.
assertEquals(
List.of(
new ExpiredTimeout<CoordinatorRecord>(
new ExpiredTimeout<>(
groupSessionTimeoutKey(groupId, memberId),
new CoordinatorResult<>(
List.of(
Expand Down Expand Up @@ -4473,7 +4473,7 @@ public void testSessionTimeoutExpirationForShareMember() {

// Verify the expired timeout.
assertEquals(
List.of(new ExpiredTimeout<CoordinatorRecord>(
List.of(new ExpiredTimeout<>(
groupSessionTimeoutKey(groupId, memberId),
new CoordinatorResult<>(
List.of(
Expand Down Expand Up @@ -4536,7 +4536,7 @@ fooTopicName, computeTopicHash(fooTopicName, new KRaftCoordinatorMetadataImage(m
// Verify the expired timeout.
assertEquals(
List.of(
new ExpiredTimeout<CoordinatorRecord>(
new ExpiredTimeout<>(
groupSessionTimeoutKey(groupId, memberId),
new CoordinatorResult<>(
List.of(
Expand Down Expand Up @@ -4616,7 +4616,7 @@ public void testSessionTimeoutExpirationStaticMember() {

// Verify the expired timeout.
assertEquals(
List.of(new ExpiredTimeout<CoordinatorRecord>(
List.of(new ExpiredTimeout<>(
groupSessionTimeoutKey(groupId, memberId),
new CoordinatorResult<>(
List.of(
Expand Down Expand Up @@ -4894,7 +4894,7 @@ memberId2, new MemberAssignmentImpl(mkAssignment(

// Verify the expired timeout.
assertEquals(
List.of(new ExpiredTimeout<CoordinatorRecord>(
List.of(new ExpiredTimeout<>(
groupRebalanceTimeoutKey(groupId, memberId1),
new CoordinatorResult<>(
List.of(
Expand Down Expand Up @@ -8039,7 +8039,7 @@ public void testStaticMemberRejoinWithUnknownMemberIdAndChangeOfProtocolWhileSel
}

@Test
public void testStaticMemberRejoinWithKnownLeaderIdToTriggerRebalanceAndFollowerWithChangeofProtocol()
public void testStaticMemberRejoinWithKnownLeaderIdToTriggerRebalanceAndFollowerWithChangeOfProtocol()
throws Exception {

GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
Expand Down Expand Up @@ -19344,7 +19344,7 @@ fooTopicName, computeTopicHash(fooTopicName, metadataImage)

// Verify the expired timeout.
assertEquals(
List.of(new ExpiredTimeout<CoordinatorRecord>(
List.of(new ExpiredTimeout<>(
groupSessionTimeoutKey(groupId, memberId),
new CoordinatorResult<>(
List.of(
Expand Down Expand Up @@ -19653,7 +19653,7 @@ fooTopicName, computeTopicHash(fooTopicName, metadataImage)

// Verify the expired timeout.
assertEquals(
List.of(new ExpiredTimeout<CoordinatorRecord>(
List.of(new ExpiredTimeout<>(
groupRebalanceTimeoutKey(groupId, memberId1),
new CoordinatorResult<>(
List.of(
Expand Down Expand Up @@ -23509,7 +23509,7 @@ public void testResolvedRegularExpressionsRemovedWhenMembersLeaveOrFenced() {

// Verify the expired timeout.
assertEquals(
List.of(new ExpiredTimeout<CoordinatorRecord>(
List.of(new ExpiredTimeout<>(
groupSessionTimeoutKey(groupId, memberId2),
new CoordinatorResult<>(
List.of(
Expand Down Expand Up @@ -25038,7 +25038,7 @@ public void testUninitializeTopics() {

context.groupMetadataManager.onMetadataUpdate(image.emptyDelta(), image);

// Cleanup happens from initialzing state only.
// Cleanup happens from initializing state only.
context.groupMetadataManager.replay(
new ShareGroupMetadataKey()
.setGroupId(groupId),
Expand Down Expand Up @@ -25582,7 +25582,7 @@ public void testReplayStreamsGroupCurrentMemberAssignmentUnownedTopologyWithComp

// Verify foo-0 is unassigned and bar-0 is assigned to member A.
StreamsGroup group = context.groupMetadataManager.streamsGroup(groupId);
assertEquals(null, group.currentActiveTaskProcessId(subtopologyFoo, 0));
assertNull(group.currentActiveTaskProcessId(subtopologyFoo, 0));
assertEquals(processIdA, group.currentActiveTaskProcessId(subtopologyBar, 0));
}

Expand Down
Loading