Skip to content

Commit 03e6aa4

Browse files
KAFKA-20266: Add tracking of assignment timestamps (apache#21652)
Add Timestamp fields to TargetAssignmentMetadata records which default to 0 when absent. The new timestamp field records when the last target assignment calculation finished. When upgrading a classic group, the new timestamp field is initialized to 0. Reviewers: khilesh Chaganti <akhileshchg@users.noreply.github.com>, Chia-Ping Tsai <chia7712@gmail.com>, David Jacot <djacot@confluent.io>
1 parent 15036f8 commit 03e6aa4

26 files changed

Lines changed: 467 additions & 205 deletions

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -235,20 +235,23 @@ public static CoordinatorRecord newConsumerGroupTargetAssignmentTombstoneRecord(
235235
/**
236236
* Creates a ConsumerGroupTargetAssignmentMetadata record.
237237
*
238-
* @param groupId The consumer group id.
239-
* @param assignmentEpoch The consumer group epoch.
238+
* @param groupId The consumer group id.
239+
* @param assignmentEpoch The consumer group epoch.
240+
* @param assignmentTimestamp The time at which the target assignment calculation finished.
240241
* @return The record.
241242
*/
242-
public static CoordinatorRecord newConsumerGroupTargetAssignmentEpochRecord(
243+
public static CoordinatorRecord newConsumerGroupTargetAssignmentMetadataRecord(
243244
String groupId,
244-
int assignmentEpoch
245+
int assignmentEpoch,
246+
long assignmentTimestamp
245247
) {
246248
return CoordinatorRecord.record(
247249
new ConsumerGroupTargetAssignmentMetadataKey()
248250
.setGroupId(groupId),
249251
new ApiMessageAndVersion(
250252
new ConsumerGroupTargetAssignmentMetadataValue()
251-
.setAssignmentEpoch(assignmentEpoch),
253+
.setAssignmentEpoch(assignmentEpoch)
254+
.setAssignmentTimestamp(assignmentTimestamp),
252255
(short) 0
253256
)
254257
);
@@ -663,20 +666,23 @@ public static CoordinatorRecord newShareGroupTargetAssignmentTombstoneRecord(
663666
/**
664667
* Creates a ShareGroupTargetAssignmentMetadata record.
665668
*
666-
* @param groupId The group id.
667-
* @param assignmentEpoch The group epoch.
669+
* @param groupId The group id.
670+
* @param assignmentEpoch The group epoch.
671+
* @param assignmentTimestamp The time at which the target assignment calculation finished.
668672
* @return The record.
669673
*/
670-
public static CoordinatorRecord newShareGroupTargetAssignmentEpochRecord(
674+
public static CoordinatorRecord newShareGroupTargetAssignmentMetadataRecord(
671675
String groupId,
672-
int assignmentEpoch
676+
int assignmentEpoch,
677+
long assignmentTimestamp
673678
) {
674679
return CoordinatorRecord.record(
675680
new ShareGroupTargetAssignmentMetadataKey()
676681
.setGroupId(groupId),
677682
new ApiMessageAndVersion(
678683
new ShareGroupTargetAssignmentMetadataValue()
679-
.setAssignmentEpoch(assignmentEpoch),
684+
.setAssignmentEpoch(assignmentEpoch)
685+
.setAssignmentTimestamp(assignmentTimestamp),
680686
(short) 0
681687
)
682688
);

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3821,6 +3821,7 @@ private Assignment updateTargetAssignment(
38213821
try {
38223822
TargetAssignmentBuilder.ConsumerTargetAssignmentBuilder assignmentResultBuilder =
38233823
new TargetAssignmentBuilder.ConsumerTargetAssignmentBuilder(group.groupId(), groupEpoch, consumerGroupAssignors.get(preferredServerAssignor))
3824+
.withTime(time)
38243825
.withMembers(group.members())
38253826
.withStaticMembers(group.staticMembers())
38263827
.withSubscriptionType(subscriptionType)
@@ -3890,6 +3891,7 @@ private Assignment updateTargetAssignment(
38903891

38913892
TargetAssignmentBuilder.ShareTargetAssignmentBuilder assignmentResultBuilder =
38923893
new TargetAssignmentBuilder.ShareTargetAssignmentBuilder(group.groupId(), groupEpoch, shareGroupAssignor)
3894+
.withTime(time)
38933895
.withMembers(group.members())
38943896
.withSubscriptionType(subscriptionType)
38953897
.withTargetAssignment(group.targetAssignment())
@@ -3955,6 +3957,7 @@ private TasksTuple updateStreamsTargetAssignment(
39553957
assignor,
39563958
assignmentConfigs
39573959
)
3960+
.withTime(time)
39583961
.withMembers(group.members())
39593962
.withTopology(configuredTopology)
39603963
.withStaticMembers(group.staticMembers())
@@ -5337,7 +5340,7 @@ public void replay(
53375340

53385341
if (value != null) {
53395342
ConsumerGroup group = getOrMaybeCreatePersistedConsumerGroup(groupId, true);
5340-
group.setTargetAssignmentEpoch(value.assignmentEpoch());
5343+
group.setTargetAssignmentMetadata(value.assignmentEpoch(), value.assignmentTimestamp());
53415344
} else {
53425345
ConsumerGroup group;
53435346
try {
@@ -5350,7 +5353,7 @@ public void replay(
53505353
throw new IllegalStateException("Received a tombstone record to delete target assignment of " + groupId
53515354
+ " but the assignment still has " + group.targetAssignment().size() + " members.");
53525355
}
5353-
group.setTargetAssignmentEpoch(-1);
5356+
group.setTargetAssignmentMetadata(-1, 0L);
53545357
}
53555358
}
53565359

@@ -5653,7 +5656,7 @@ public void replay(
56535656

56545657
if (value != null) {
56555658
StreamsGroup streamsGroup = getOrMaybeCreatePersistedStreamsGroup(groupId, true);
5656-
streamsGroup.setTargetAssignmentEpoch(value.assignmentEpoch());
5659+
streamsGroup.setTargetAssignmentMetadata(value.assignmentEpoch(), value.assignmentTimestamp());
56575660
} else {
56585661
StreamsGroup streamsGroup;
56595662
try {
@@ -5666,7 +5669,7 @@ public void replay(
56665669
throw new IllegalStateException("Received a tombstone record to delete target assignment of " + groupId
56675670
+ " but the assignment still has " + streamsGroup.targetAssignment().size() + " members.");
56685671
}
5669-
streamsGroup.setTargetAssignmentEpoch(-1);
5672+
streamsGroup.setTargetAssignmentMetadata(-1, 0L);
56705673
}
56715674
}
56725675

@@ -5801,13 +5804,13 @@ public void replay(
58015804
}
58025805

58035806
if (value != null) {
5804-
group.setTargetAssignmentEpoch(value.assignmentEpoch());
5807+
group.setTargetAssignmentMetadata(value.assignmentEpoch(), value.assignmentTimestamp());
58055808
} else {
58065809
if (!group.targetAssignment().isEmpty()) {
58075810
throw new IllegalStateException("Received a tombstone record to delete target assignment of " + groupId
58085811
+ " but the assignment still has " + group.targetAssignment().size() + " members.");
58095812
}
5810-
group.setTargetAssignmentEpoch(-1);
5813+
group.setTargetAssignmentMetadata(-1, 0L);
58115814
}
58125815
}
58135816

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.coordinator.group;
18+
19+
/**
20+
* The target assignment metadata.
21+
*
22+
* @param assignmentEpoch The target assignment epoch. An assignment epoch smaller than the
23+
* group epoch means that a new assignment is required. The
24+
* assignment epoch is updated when a new assignment is installed.
25+
* @param assignmentTimestamp The time at which the target assignment calculation finished.
26+
*/
27+
public record TargetAssignmentMetadata(int assignmentEpoch, long assignmentTimestamp) {
28+
/**
29+
* The initial target assignment metadata for groups.
30+
* This is different to tombstoned assignment metadata which has an assignment epoch of -1.
31+
*/
32+
public static final TargetAssignmentMetadata ZERO = new TargetAssignmentMetadata(0, 0L);
33+
34+
public TargetAssignmentMetadata {
35+
if (assignmentEpoch < 0 && assignmentEpoch != -1) {
36+
throw new IllegalArgumentException("The assignment epoch must be non-negative or -1.");
37+
}
38+
if (assignmentTimestamp < 0) {
39+
throw new IllegalArgumentException("The assignment timestamp must be non-negative.");
40+
}
41+
}
42+
}

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.kafka.common.message.ListGroupsResponseData;
2222
import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
2323
import org.apache.kafka.coordinator.group.Group;
24+
import org.apache.kafka.coordinator.group.TargetAssignmentMetadata;
2425
import org.apache.kafka.coordinator.group.Utils;
2526
import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType;
2627
import org.apache.kafka.timeline.SnapshotRegistry;
@@ -96,11 +97,9 @@ public static class DeadlineAndEpoch {
9697
protected final TimelineObject<SubscriptionType> subscriptionType;
9798

9899
/**
99-
* The target assignment epoch. An assignment epoch smaller than the group epoch
100-
* means that a new assignment is required. The assignment epoch is updated when
101-
* a new assignment is installed.
100+
* The target assignment metadata.
102101
*/
103-
protected final TimelineInteger targetAssignmentEpoch;
102+
protected final TimelineObject<TargetAssignmentMetadata> targetAssignmentMetadata;
104103

105104
/**
106105
* The target assignment per member id.
@@ -136,7 +135,7 @@ protected ModernGroup(
136135
this.subscribedTopicNames = new TimelineHashMap<>(snapshotRegistry, 0);
137136
this.metadataHash = new TimelineLong(snapshotRegistry);
138137
this.subscriptionType = new TimelineObject<>(snapshotRegistry, HOMOGENEOUS);
139-
this.targetAssignmentEpoch = new TimelineInteger(snapshotRegistry);
138+
this.targetAssignmentMetadata = new TimelineObject<>(snapshotRegistry, TargetAssignmentMetadata.ZERO);
140139
this.targetAssignment = new TimelineHashMap<>(snapshotRegistry, 0);
141140
this.invertedTargetAssignment = new TimelineHashMap<>(snapshotRegistry, 0);
142141
}
@@ -181,16 +180,24 @@ public void setGroupEpoch(int groupEpoch) {
181180
* @return The target assignment epoch.
182181
*/
183182
public int assignmentEpoch() {
184-
return targetAssignmentEpoch.get();
183+
return targetAssignmentMetadata.get().assignmentEpoch();
185184
}
186185

187186
/**
188-
* Sets the assignment epoch.
187+
* @return The time at which the target assignment calculation finished.
188+
*/
189+
public long assignmentTimestamp() {
190+
return targetAssignmentMetadata.get().assignmentTimestamp();
191+
}
192+
193+
/**
194+
* Sets the assignment metadata.
189195
*
190196
* @param targetAssignmentEpoch The new assignment epoch.
197+
* @param targetAssignmentTimestamp The time at which the assignment calculation finished.
191198
*/
192-
public void setTargetAssignmentEpoch(int targetAssignmentEpoch) {
193-
this.targetAssignmentEpoch.set(targetAssignmentEpoch);
199+
public void setTargetAssignmentMetadata(int targetAssignmentEpoch, long targetAssignmentTimestamp) {
200+
this.targetAssignmentMetadata.set(new TargetAssignmentMetadata(targetAssignmentEpoch, targetAssignmentTimestamp));
194201
maybeUpdateGroupState();
195202
}
196203

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilder.java

Lines changed: 37 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.apache.kafka.coordinator.group.modern;
1818

1919
import org.apache.kafka.common.Uuid;
20+
import org.apache.kafka.common.utils.Time;
2021
import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
2122
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
2223
import org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers;
@@ -139,10 +140,15 @@ protected CoordinatorRecord newTargetAssignmentRecord(
139140
}
140141

141142
@Override
142-
protected CoordinatorRecord newTargetAssignmentEpochRecord(String groupId, int assignmentEpoch) {
143-
return GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(
143+
protected CoordinatorRecord newTargetAssignmentMetadataRecord(
144+
String groupId,
145+
int assignmentEpoch,
146+
long assignmentTimestamp
147+
) {
148+
return GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(
144149
groupId,
145-
assignmentEpoch
150+
assignmentEpoch,
151+
assignmentTimestamp
146152
);
147153
}
148154

@@ -208,10 +214,15 @@ protected CoordinatorRecord newTargetAssignmentRecord(
208214
}
209215

210216
@Override
211-
protected CoordinatorRecord newTargetAssignmentEpochRecord(String groupId, int assignmentEpoch) {
212-
return GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentEpochRecord(
217+
protected CoordinatorRecord newTargetAssignmentMetadataRecord(
218+
String groupId,
219+
int assignmentEpoch,
220+
long assignmentTimestamp
221+
) {
222+
return GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentMetadataRecord(
213223
groupId,
214-
assignmentEpoch
224+
assignmentEpoch,
225+
assignmentTimestamp
215226
);
216227
}
217228

@@ -230,6 +241,11 @@ protected MemberSubscriptionAndAssignmentImpl newMemberSubscriptionAndAssignment
230241
}
231242
}
232243

244+
/**
245+
* The time.
246+
*/
247+
private Time time;
248+
233249
/**
234250
* The group id.
235251
*/
@@ -304,6 +320,17 @@ public TargetAssignmentBuilder(
304320
this.assignor = Objects.requireNonNull(assignor);
305321
}
306322

323+
/**
324+
* Sets the time.
325+
*
326+
* @param time The time.
327+
* @return This object.
328+
*/
329+
public U withTime(Time time) {
330+
this.time = time;
331+
return self();
332+
}
333+
307334
/**
308335
* Adds all the existing members.
309336
*
@@ -491,7 +518,7 @@ public TargetAssignmentResult build() throws PartitionAssignorException {
491518
}
492519

493520
// Bump the target assignment epoch.
494-
records.add(newTargetAssignmentEpochRecord(groupId, groupEpoch));
521+
records.add(newTargetAssignmentMetadataRecord(groupId, groupEpoch, time.milliseconds()));
495522

496523
return new TargetAssignmentResult(records, newGroupAssignment.members());
497524
}
@@ -504,9 +531,10 @@ protected abstract CoordinatorRecord newTargetAssignmentRecord(
504531
Map<Uuid, Set<Integer>> partitions
505532
);
506533

507-
protected abstract CoordinatorRecord newTargetAssignmentEpochRecord(
534+
protected abstract CoordinatorRecord newTargetAssignmentMetadataRecord(
508535
String groupId,
509-
int assignmentEpoch
536+
int assignmentEpoch,
537+
long timestampMs
510538
);
511539

512540
protected abstract MemberSubscriptionAndAssignmentImpl newMemberSubscriptionAndAssignment(

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -896,11 +896,11 @@ protected void maybeUpdateGroupState() {
896896
ConsumerGroupState newState = STABLE;
897897
if (members.isEmpty()) {
898898
newState = EMPTY;
899-
} else if (groupEpoch.get() > targetAssignmentEpoch.get()) {
899+
} else if (groupEpoch.get() > assignmentEpoch()) {
900900
newState = ASSIGNING;
901901
} else {
902902
for (ModernGroupMember member : members.values()) {
903-
if (!member.isReconciledTo(targetAssignmentEpoch.get())) {
903+
if (!member.isReconciledTo(assignmentEpoch())) {
904904
newState = RECONCILING;
905905
break;
906906
}
@@ -1121,7 +1121,7 @@ public ConsumerGroupDescribeResponseData.DescribedGroup asDescribedGroup(
11211121
.setAssignorName(preferredServerAssignor(committedOffset).orElse(defaultAssignor))
11221122
.setGroupEpoch(groupEpoch.get(committedOffset))
11231123
.setGroupState(state.get(committedOffset).toString())
1124-
.setAssignmentEpoch(targetAssignmentEpoch.get(committedOffset));
1124+
.setAssignmentEpoch(targetAssignmentMetadata.get(committedOffset).assignmentEpoch());
11251125
members.entrySet(committedOffset).forEach(
11261126
entry -> describedGroup.members().add(
11271127
entry.getValue().asConsumerGroupDescribeMember(
@@ -1156,7 +1156,7 @@ public static ConsumerGroup fromClassicGroup(
11561156
String groupId = classicGroup.groupId();
11571157
ConsumerGroup consumerGroup = new ConsumerGroup(logContext, snapshotRegistry, groupId);
11581158
consumerGroup.setGroupEpoch(classicGroup.generationId());
1159-
consumerGroup.setTargetAssignmentEpoch(classicGroup.generationId());
1159+
consumerGroup.setTargetAssignmentMetadata(classicGroup.generationId(), 0L);
11601160

11611161
classicGroup.allMembers().forEach(classicGroupMember -> {
11621162
// The assigned partition can be empty if the member just joined and has never synced.
@@ -1238,7 +1238,7 @@ public void createConsumerGroupRecords(
12381238
))
12391239
);
12401240

1241-
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId(), groupEpoch()));
1241+
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId(), assignmentEpoch(), assignmentTimestamp()));
12421242

12431243
members().forEach((__, consumerGroupMember) ->
12441244
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId(), consumerGroupMember))

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,7 @@ public ShareGroupDescribeResponseData.DescribedGroup asDescribedGroup(
313313
.setAssignorName(defaultAssignor)
314314
.setGroupEpoch(groupEpoch.get(committedOffset))
315315
.setGroupState(state.get(committedOffset).toString())
316-
.setAssignmentEpoch(targetAssignmentEpoch.get(committedOffset));
316+
.setAssignmentEpoch(targetAssignmentMetadata.get(committedOffset).assignmentEpoch());
317317
members.entrySet(committedOffset).forEach(
318318
entry -> describedGroup.members().add(
319319
entry.getValue().asShareGroupDescribeMember(

0 commit comments

Comments
 (0)