Skip to content

Commit 8733798

Browse files
authored
KAFKA-19435 Optimize kafka-consumer-groups.sh to return the offset info when some partitions without leaders (apache#20064)
1. Optimize the corresponding logic in the `ConsumerGroupCommand` by first checking if a leader exists for the partition before invoking the `admin.listOffsets`. Finally, concatenate the data and return 2. Add integration test, create a cluster with 3 brokers, then shutdown a broker and observe whether the output meets the expectations Reviewers: Ken Huang <s7133700@gmail.com>, PoAn Yang <payang@apache.org>, TaiJuWu <tjwu1217@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
1 parent e1ff387 commit 8733798

3 files changed

Lines changed: 110 additions & 9 deletions

File tree

tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,6 @@
5151
import com.google.re2j.Pattern;
5252
import com.google.re2j.PatternSyntaxException;
5353

54-
import org.slf4j.Logger;
55-
import org.slf4j.LoggerFactory;
56-
5754
import java.io.IOException;
5855
import java.util.AbstractMap.SimpleImmutableEntry;
5956
import java.util.ArrayList;
@@ -82,7 +79,6 @@
8279
import joptsimple.OptionSpec;
8380

8481
public class ConsumerGroupCommand {
85-
private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerGroupCommand.class);
8682

8783
static final String MISSING_COLUMN_VALUE = "-";
8884

@@ -592,8 +588,7 @@ private Collection<PartitionAssignmentState> collectConsumerAssignment(
592588
getLag(Optional.empty(), Optional.empty()), consumerIdOpt, hostOpt, clientIdOpt, Optional.empty(), Optional.empty())
593589
);
594590
} else {
595-
List<TopicPartition> topicPartitionsSorted = topicPartitions.stream().sorted(Comparator.comparingInt(TopicPartition::partition)).collect(Collectors.toList());
596-
return describePartitions(group, coordinator, topicPartitionsSorted, committedOffsets, consumerIdOpt, hostOpt, clientIdOpt);
591+
return describePartitions(group, coordinator, topicPartitions, committedOffsets, consumerIdOpt, hostOpt, clientIdOpt);
597592
}
598593
}
599594

@@ -604,7 +599,7 @@ private Optional<Long> getLag(Optional<Long> offset, Optional<Long> logEndOffset
604599
private Collection<PartitionAssignmentState> describePartitions(
605600
String group,
606601
Optional<Node> coordinator,
607-
List<TopicPartition> topicPartitions,
602+
Collection<TopicPartition> topicPartitions,
608603
Map<TopicPartition, OffsetAndMetadata> committedOffsets,
609604
Optional<String> consumerIdOpt,
610605
Optional<String> hostOpt,
@@ -619,7 +614,11 @@ private Collection<PartitionAssignmentState> describePartitions(
619614
consumerIdOpt, hostOpt, clientIdOpt, logEndOffsetOpt, leaderEpoch);
620615
};
621616

622-
return offsetsUtils.getLogEndOffsets(topicPartitions).entrySet().stream().map(logEndOffsetResult -> {
617+
List<TopicPartition> topicPartitionsWithoutLeader = filterNoneLeaderPartitions(topicPartitions);
618+
List<TopicPartition> topicPartitionsWithLeader = topicPartitions.stream().filter(tp -> !topicPartitionsWithoutLeader.contains(tp)).toList();
619+
620+
// prepare data for partitions with leaders
621+
List<PartitionAssignmentState> existLeaderAssignments = offsetsUtils.getLogEndOffsets(topicPartitionsWithLeader).entrySet().stream().map(logEndOffsetResult -> {
623622
if (logEndOffsetResult.getValue() instanceof OffsetsUtils.LogOffset)
624623
return getDescribePartitionResult.apply(
625624
logEndOffsetResult.getKey(),
@@ -631,7 +630,34 @@ else if (logEndOffsetResult.getValue() instanceof OffsetsUtils.Ignore)
631630
return null;
632631

633632
throw new IllegalStateException("Unknown LogOffset subclass: " + logEndOffsetResult.getValue());
634-
}).collect(Collectors.toList());
633+
}).toList();
634+
635+
// prepare data for partitions without leaders
636+
List<PartitionAssignmentState> noneLeaderAssignments = topicPartitionsWithoutLeader.stream()
637+
.map(tp -> getDescribePartitionResult.apply(tp, Optional.empty())).toList();
638+
639+
// concat the data and then sort them
640+
return Stream.concat(existLeaderAssignments.stream(), noneLeaderAssignments.stream())
641+
.sorted(Comparator.<PartitionAssignmentState, String>comparing(
642+
state -> state.topic.orElse(""), String::compareTo)
643+
.thenComparingInt(state -> state.partition.orElse(-1)))
644+
.collect(Collectors.toList());
645+
}
646+
647+
private List<TopicPartition> filterNoneLeaderPartitions(Collection<TopicPartition> topicPartitions) {
648+
// collect all topics
649+
Set<String> topics = topicPartitions.stream().map(TopicPartition::topic).collect(Collectors.toSet());
650+
651+
try {
652+
return adminClient.describeTopics(topics).allTopicNames().get().entrySet()
653+
.stream()
654+
.flatMap(entry -> entry.getValue().partitions().stream()
655+
.filter(partitionInfo -> partitionInfo.leader() == null)
656+
.map(partitionInfo -> new TopicPartition(entry.getKey(), partitionInfo.partition())))
657+
.toList();
658+
} catch (Exception e) {
659+
throw new RuntimeException(e);
660+
}
635661
}
636662

637663
Map<String, Map<TopicPartition, OffsetAndMetadata>> resetOffsets() {

tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ public void testAdminRequestsForDescribeOffsets() throws Exception {
9191
.thenReturn(listGroupOffsetsResult(GROUP));
9292
when(admin.listOffsets(offsetsArgMatcher(), any()))
9393
.thenReturn(listOffsetsResult());
94+
when(admin.describeTopics(ArgumentMatchers.anySet()))
95+
.thenReturn(describeTopicsResult());
9496

9597
Entry<Optional<GroupState>, Optional<Collection<PartitionAssignmentState>>> statesAndAssignments = groupService.collectGroupOffsets(GROUP);
9698
assertEquals(Optional.of(GroupState.STABLE), statesAndAssignments.getKey());
@@ -174,6 +176,7 @@ public void testAdminRequestsForDescribeNegativeOffsets() throws Exception {
174176
any()
175177
)).thenReturn(new ListOffsetsResult(endOffsets.entrySet().stream().filter(e -> unassignedTopicPartitions.contains(e.getKey()))
176178
.collect(Collectors.toMap(Entry::getKey, Entry::getValue))));
179+
when(admin.describeTopics(ArgumentMatchers.anySet())).thenReturn(describeTopicsResult());
177180

178181
Entry<Optional<GroupState>, Optional<Collection<PartitionAssignmentState>>> statesAndAssignments = groupService.collectGroupOffsets(GROUP);
179182
Optional<GroupState> state = statesAndAssignments.getKey();
@@ -289,6 +292,18 @@ private Map<TopicPartition, OffsetSpec> offsetsArgMatcher() {
289292
);
290293
}
291294

295+
private DescribeTopicsResult describeTopicsResult() {
296+
Map<String, TopicDescription> topicDescriptionMap = TOPICS.stream().collect(Collectors.toMap(
297+
Function.identity(),
298+
topic -> new TopicDescription(
299+
topic,
300+
false,
301+
IntStream.range(0, NUM_PARTITIONS)
302+
.mapToObj(i -> new TopicPartitionInfo(i, Node.noNode(), List.of(), List.of()))
303+
.toList())));
304+
return AdminClientTestUtils.describeTopicsResult(topicDescriptionMap);
305+
}
306+
292307
private ListOffsetsResult listOffsetsResult() {
293308
ListOffsetsResultInfo resultInfo = new ListOffsetsResultInfo(100, System.currentTimeMillis(), Optional.of(1));
294309
Map<TopicPartition, KafkaFuture<ListOffsetsResultInfo>> futures = TOPIC_PARTITIONS.stream().collect(Collectors.toMap(

tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747

4848
import org.junit.jupiter.api.Assertions;
4949
import org.junit.jupiter.api.Test;
50+
import org.junit.jupiter.api.Timeout;
5051

5152
import java.util.ArrayList;
5253
import java.util.Arrays;
@@ -59,6 +60,7 @@
5960
import java.util.Objects;
6061
import java.util.Optional;
6162
import java.util.Set;
63+
import java.util.UUID;
6264
import java.util.concurrent.ExecutionException;
6365
import java.util.concurrent.atomic.AtomicInteger;
6466
import java.util.concurrent.atomic.AtomicReference;
@@ -1053,6 +1055,64 @@ public void testDescribeNonOffsetCommitGroup(ClusterInstance clusterInstance) th
10531055
}
10541056
}
10551057

1058+
/**
1059+
* The config `OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG` needs to be set to a value greater than 1 to ensure the
1060+
* normal invocation of APIs such as `FIND_COORDINATOR` when a broker has shutdown
1061+
*/
1062+
@Timeout(60)
1063+
@ClusterTest(brokers = 3, serverProperties = {@ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "2")})
1064+
public void testDescribeConsumerGroupWithoutLeaders(ClusterInstance clusterInstance) throws Exception {
1065+
int brokerNum = 3;
1066+
this.clusterInstance = clusterInstance;
1067+
1068+
// define topic and group, then send 5 records to each partition
1069+
String topic = TOPIC_PREFIX + UUID.randomUUID();
1070+
String group = GROUP_PREFIX + UUID.randomUUID();
1071+
clusterInstance.createTopic(topic, brokerNum, (short) 1);
1072+
for (int i = 0; i < brokerNum; i++) {
1073+
sendRecords(topic, i, 5);
1074+
}
1075+
1076+
// append the command
1077+
List<String> cgcArgs = List.of("--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group, "--all-topics");
1078+
1079+
try (AutoCloseable protocolConsumerGroupExecutor = consumerGroupClosable(GroupProtocol.CLASSIC, group, topic, Map.of());
1080+
ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(cgcArgs.toArray(new String[0]));
1081+
Admin admin = clusterInstance.admin()
1082+
) {
1083+
// shutdown the target broker
1084+
int noneLeaderPartition = 2;
1085+
int shutdownBrokerId = clusterInstance.getLeaderBrokerId(new TopicPartition(topic, noneLeaderPartition));
1086+
clusterInstance.shutdownBroker(shutdownBrokerId);
1087+
1088+
TestUtils.waitForCondition(() -> {
1089+
Entry<String, String> res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service));
1090+
String[] lines = res.getKey().trim().split("\n");
1091+
if (lines.length != 4 || !res.getValue().isEmpty()) {
1092+
return false;
1093+
}
1094+
1095+
// get the client data, such as `consumerId,host,clientId`, to append the expected output
1096+
ConsumerGroupDescription consumerGroupDescription = admin.describeConsumerGroups(Set.of(group)).describedGroups().get(group).get();
1097+
MemberDescription memberDescription = consumerGroupDescription.members().iterator().next();
1098+
String consumerId = memberDescription.consumerId();
1099+
String host = memberDescription.host();
1100+
String clientId = memberDescription.clientId();
1101+
1102+
// the expected output
1103+
List<String> partition0content = List.of(group, topic, "0", "5", "5", "0", consumerId, host, clientId);
1104+
List<String> partition1content = List.of(group, topic, "1", "5", "5", "0", consumerId, host, clientId);
1105+
List<String> partition2content = List.of(group, topic, "2", "-", "-", "-", consumerId, host, clientId);
1106+
1107+
return checkArgsHeaderOutput(cgcArgs, lines[0])
1108+
&& Arrays.stream(lines[1].trim().split("\\s+")).toList().equals(partition0content)
1109+
&& Arrays.stream(lines[2].trim().split("\\s+")).toList().equals(partition1content)
1110+
&& Arrays.stream(lines[3].trim().split("\\s+")).toList().equals(partition2content);
1111+
}, "Expected 3 data rows excluding the header and no error in describe groups when a broker shutdown.");
1112+
}
1113+
}
1114+
1115+
10561116
@Test
10571117
public void testDescribeWithUnrecognizedNewConsumerOption() {
10581118
String group = GROUP_PREFIX + "unrecognized";

0 commit comments

Comments
 (0)