Skip to content

Commit 6ef6524

Browse files
committed
[INTERNAL] Don't report offsets and lag for dead consumer groups
1 parent 9da26b4 commit 6ef6524

File tree

2 files changed

+4
-2
lines changed

2 files changed

+4
-2
lines changed

e2e/producer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func (s *Service) produceMessage(ctx context.Context, partition int) {
6161
msg.state = EndToEndMessageStateProducedSuccessfully
6262
msg.produceLatency = ackDuration.Seconds()
6363

64-
// TODO: Enable again as soon as https://github.com/ReneKroon/ttlcache/issues/60 is fixed
64+
// TODO: Enable again as soon as https://github.com/jellydatora/ttlcache/issues/60 is fixed
6565
// Because we cannot update cache items in an atomic fashion we currently can't use this method
6666
// as this would cause a race condition which ends up in records being reported as lost/expired.
6767
// s.messageTracker.updateItemIfExists(msg)

minion/consumer_group_offsets.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ func (s *Service) ListAllConsumerGroupOffsetsAdminAPI(ctx context.Context) (map[
2424
}
2525
groupIDs := make([]string, len(groupsRes.AllowedGroups.Groups))
2626
for i, group := range groupsRes.AllowedGroups.Groups {
27-
groupIDs[i] = group.Group
27+
if group.GroupState != "Dead" {
28+
groupIDs[i] = group.Group
29+
}
2830
}
2931

3032
return s.listConsumerGroupOffsetsBulk(ctx, groupIDs)

0 commit comments

Comments
 (0)