Skip to content

Commit dbe0fda

Browse files
authored
MINOR: Consistently apply timeout in group commands (apache#20764)
Group commands sometimes apply the timeout from --timeout, and sometimes don't. This change applies the timeout in every call to adminClient. Reviewers: Shivsundar R <shr@confluent.io>, Andrew Schofield <aschofield@confluent.io>, Alieh Saeedi <asaeedi@confluent.io>
1 parent bb0b62d commit dbe0fda

8 files changed

Lines changed: 119 additions & 114 deletions

File tree

tools/src/main/java/org/apache/kafka/tools/OffsetsUtils.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -460,7 +460,7 @@ public List<TopicPartition> filterNoneLeaderPartitions(Collection<TopicPartition
460460
Set<String> topics = topicPartitions.stream().map(TopicPartition::topic).collect(Collectors.toSet());
461461

462462
try {
463-
return adminClient.describeTopics(topics).allTopicNames().get().entrySet()
463+
return adminClient.describeTopics(topics, withTimeoutMs(new DescribeTopicsOptions())).allTopicNames().get().entrySet()
464464
.stream()
465465
.flatMap(entry -> entry.getValue().partitions().stream()
466466
.filter(partitionInfo -> partitionInfo.leader() == null)
@@ -476,7 +476,7 @@ public List<TopicPartition> filterNonExistentPartitions(Collection<TopicPartitio
476476
// collect all topics
477477
Set<String> topics = topicPartitions.stream().map(TopicPartition::topic).collect(Collectors.toSet());
478478
try {
479-
List<TopicPartition> existPartitions = adminClient.describeTopics(topics).allTopicNames().get().entrySet()
479+
List<TopicPartition> existPartitions = adminClient.describeTopics(topics, withTimeoutMs(new DescribeTopicsOptions())).allTopicNames().get().entrySet()
480480
.stream()
481481
.flatMap(entry -> entry.getValue().partitions().stream()
482482
.map(partitionInfo -> new TopicPartition(entry.getKey(), partitionInfo.partition())))

tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandOptions.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,6 @@
1919
import org.apache.kafka.server.util.CommandDefaultOptions;
2020
import org.apache.kafka.server.util.CommandLineUtils;
2121

22-
import org.slf4j.Logger;
23-
import org.slf4j.LoggerFactory;
24-
2522
import java.util.List;
2623
import java.util.Set;
2724
import java.util.stream.Collectors;
@@ -31,7 +28,6 @@
3128
import static org.apache.kafka.tools.ToolsUtils.minus;
3229

3330
public class ConsumerGroupCommandOptions extends CommandDefaultOptions {
34-
private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerGroupCommandOptions.class);
3531

3632
private static final String BOOTSTRAP_SERVER_DOC = "The server(s) to connect to. REQUIRED for all options except for --validate-regex.";
3733
private static final String GROUP_DOC = "The consumer group we wish to act on.";
@@ -230,9 +226,6 @@ void checkArgs() {
230226
if (options.has(stateOpt) && options.valueOf(stateOpt) != null)
231227
CommandLineUtils.printUsageAndExit(parser,
232228
"Option " + describeOpt + " does not take a value for " + stateOpt);
233-
} else {
234-
if (options.has(timeoutMsOpt))
235-
LOGGER.debug("Option " + timeoutMsOpt + " is applicable only when " + describeOpt + " is used.");
236229
}
237230

238231
if (options.has(deleteOpt)) {

tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@
1919
import org.apache.kafka.clients.CommonClientConfigs;
2020
import org.apache.kafka.clients.admin.AbstractOptions;
2121
import org.apache.kafka.clients.admin.Admin;
22+
import org.apache.kafka.clients.admin.AlterShareGroupOffsetsOptions;
2223
import org.apache.kafka.clients.admin.DeleteShareGroupOffsetsOptions;
2324
import org.apache.kafka.clients.admin.DeleteShareGroupOffsetsResult;
2425
import org.apache.kafka.clients.admin.DeleteShareGroupsOptions;
2526
import org.apache.kafka.clients.admin.DescribeShareGroupsOptions;
2627
import org.apache.kafka.clients.admin.GroupListing;
2728
import org.apache.kafka.clients.admin.ListGroupsOptions;
2829
import org.apache.kafka.clients.admin.ListGroupsResult;
30+
import org.apache.kafka.clients.admin.ListShareGroupOffsetsOptions;
2931
import org.apache.kafka.clients.admin.ListShareGroupOffsetsSpec;
3032
import org.apache.kafka.clients.admin.ShareGroupDescription;
3133
import org.apache.kafka.clients.admin.ShareMemberAssignment;
@@ -411,7 +413,8 @@ private void resetOffsetsForInactiveGroup(String groupId) {
411413
offsetsToReset.entrySet().stream()
412414
.collect(Collectors.toMap(
413415
Entry::getKey, entry -> entry.getValue().offset()
414-
))
416+
)),
417+
withTimeoutMs(new AlterShareGroupOffsetsOptions())
415418
).all().get();
416419
}
417420
OffsetsUtils.printOffsetsToReset(Map.of(groupId, offsetsToReset));
@@ -434,7 +437,10 @@ private Collection<TopicPartition> getPartitionsToReset(String groupId) throws E
434437
partitionsToReset = offsetsUtils.parseTopicPartitionsToReset(opts.options.valuesOf(opts.topicOpt));
435438
} else {
436439
Map<String, ListShareGroupOffsetsSpec> groupSpecs = Map.of(groupId, new ListShareGroupOffsetsSpec());
437-
Map<TopicPartition, OffsetAndMetadata> offsetsByTopicPartitions = adminClient.listShareGroupOffsets(groupSpecs).all().get().get(groupId);
440+
Map<TopicPartition, OffsetAndMetadata> offsetsByTopicPartitions = adminClient.listShareGroupOffsets(
441+
groupSpecs,
442+
withTimeoutMs(new ListShareGroupOffsetsOptions())
443+
).all().get().get(groupId);
438444
partitionsToReset = offsetsByTopicPartitions.keySet();
439445
}
440446

@@ -488,7 +494,10 @@ TreeMap<String, Entry<ShareGroupDescription, Collection<SharePartitionOffsetInfo
488494
Map<String, ListShareGroupOffsetsSpec> groupSpecs = Map.of(groupId, new ListShareGroupOffsetsSpec());
489495

490496
try {
491-
Map<TopicPartition, OffsetAndMetadata> startOffsets = adminClient.listShareGroupOffsets(groupSpecs).all().get().get(groupId);
497+
Map<TopicPartition, OffsetAndMetadata> startOffsets = adminClient.listShareGroupOffsets(
498+
groupSpecs,
499+
withTimeoutMs(new ListShareGroupOffsetsOptions())
500+
).all().get().get(groupId);
492501
Set<SharePartitionOffsetInformation> partitionOffsets = mapOffsetsToSharePartitionInformation(groupId, startOffsets);
493502

494503
groupOffsets.put(groupId, new SimpleImmutableEntry<>(shareGroup, partitionOffsets));

tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java

Lines changed: 43 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@
1919
import org.apache.kafka.clients.CommonClientConfigs;
2020
import org.apache.kafka.clients.admin.AbstractOptions;
2121
import org.apache.kafka.clients.admin.Admin;
22+
import org.apache.kafka.clients.admin.AlterStreamsGroupOffsetsOptions;
2223
import org.apache.kafka.clients.admin.DeleteStreamsGroupOffsetsOptions;
2324
import org.apache.kafka.clients.admin.DeleteStreamsGroupOffsetsResult;
2425
import org.apache.kafka.clients.admin.DeleteStreamsGroupsOptions;
26+
import org.apache.kafka.clients.admin.DeleteTopicsOptions;
2527
import org.apache.kafka.clients.admin.DeleteTopicsResult;
2628
import org.apache.kafka.clients.admin.DescribeStreamsGroupsOptions;
2729
import org.apache.kafka.clients.admin.DescribeStreamsGroupsResult;
@@ -30,8 +32,11 @@
3032
import org.apache.kafka.clients.admin.GroupListing;
3133
import org.apache.kafka.clients.admin.ListGroupsOptions;
3234
import org.apache.kafka.clients.admin.ListGroupsResult;
35+
import org.apache.kafka.clients.admin.ListOffsetsOptions;
3336
import org.apache.kafka.clients.admin.ListOffsetsResult;
37+
import org.apache.kafka.clients.admin.ListStreamsGroupOffsetsOptions;
3438
import org.apache.kafka.clients.admin.ListStreamsGroupOffsetsSpec;
39+
import org.apache.kafka.clients.admin.ListTopicsOptions;
3540
import org.apache.kafka.clients.admin.OffsetSpec;
3641
import org.apache.kafka.clients.admin.StreamsGroupDescription;
3742
import org.apache.kafka.clients.admin.StreamsGroupMemberAssignment;
@@ -279,7 +284,7 @@ public void describeGroups() throws ExecutionException, InterruptedException {
279284
StreamsGroupDescription getDescribeGroup(String group) throws ExecutionException, InterruptedException {
280285
DescribeStreamsGroupsResult result = adminClient.describeStreamsGroups(
281286
List.of(group),
282-
new DescribeStreamsGroupsOptions().timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue()));
287+
withTimeoutMs(new DescribeStreamsGroupsOptions()));
283288
Map<String, StreamsGroupDescription> descriptionMap = result.all().get();
284289
return descriptionMap.get(group);
285290
}
@@ -428,8 +433,14 @@ Map<TopicPartition, OffsetsInfo> getOffsets(StreamsGroupDescription description)
428433
earliest.put(tp, OffsetSpec.earliest());
429434
latest.put(tp, OffsetSpec.latest());
430435
}
431-
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> earliestResult = adminClient.listOffsets(earliest).all().get();
432-
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> latestResult = adminClient.listOffsets(latest).all().get();
436+
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> earliestResult = adminClient.listOffsets(
437+
earliest,
438+
withTimeoutMs(new ListOffsetsOptions())
439+
).all().get();
440+
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> latestResult = adminClient.listOffsets(
441+
latest,
442+
withTimeoutMs(new ListOffsetsOptions())
443+
).all().get();
433444
Map<TopicPartition, OffsetAndMetadata> committedOffsets = getCommittedOffsets(description.groupId());
434445

435446
Map<TopicPartition, OffsetsInfo> output = new HashMap<>();
@@ -449,14 +460,18 @@ Map<TopicPartition, OffsetsInfo> getOffsets(StreamsGroupDescription description)
449460

450461
Map<TopicPartition, OffsetAndMetadata> getCommittedOffsets(String groupId) {
451462
try {
452-
var sourceTopics = adminClient.describeStreamsGroups(List.of(groupId))
453-
.all().get().get(groupId)
463+
var sourceTopics = adminClient.describeStreamsGroups(
464+
List.of(groupId),
465+
withTimeoutMs(new DescribeStreamsGroupsOptions())
466+
).all().get().get(groupId)
454467
.subtopologies().stream()
455468
.flatMap(subtopology -> subtopology.sourceTopics().stream())
456469
.collect(Collectors.toSet());
457470

458-
var allTopicPartitions = adminClient.listStreamsGroupOffsets(Map.of(groupId, new ListStreamsGroupOffsetsSpec()))
459-
.partitionsToOffsetAndMetadata(groupId).get();
471+
var allTopicPartitions = adminClient.listStreamsGroupOffsets(
472+
Map.of(groupId, new ListStreamsGroupOffsetsSpec()),
473+
withTimeoutMs(new ListStreamsGroupOffsetsOptions())
474+
).partitionsToOffsetAndMetadata(groupId).get();
460475

461476
allTopicPartitions.keySet().removeIf(tp -> !sourceTopics.contains(tp.topic()));
462477
return allTopicPartitions;
@@ -467,8 +482,10 @@ Map<TopicPartition, OffsetAndMetadata> getCommittedOffsets(String groupId) {
467482

468483
private List<TopicPartition> filterExistingGroupTopics(String groupId, List<TopicPartition> topicPartitions) {
469484
try {
470-
var allTopicPartitions = adminClient.listStreamsGroupOffsets(Map.of(groupId, new ListStreamsGroupOffsetsSpec()))
471-
.partitionsToOffsetAndMetadata(groupId).get();
485+
var allTopicPartitions = adminClient.listStreamsGroupOffsets(
486+
Map.of(groupId, new ListStreamsGroupOffsetsSpec()),
487+
withTimeoutMs(new ListStreamsGroupOffsetsOptions())
488+
).partitionsToOffsetAndMetadata(groupId).get();
472489
boolean allPresent = topicPartitions.stream().allMatch(allTopicPartitions::containsKey);
473490
if (!allPresent) {
474491
printError("One or more topics are not part of the group '" + groupId + "'.", Optional.empty());
@@ -490,7 +507,8 @@ Map<String, Map<TopicPartition, OffsetAndMetadata>> resetOffsets() {
490507
: opts.options.valuesOf(opts.groupOpt);
491508
if (!groupIds.isEmpty()) {
492509
Map<String, KafkaFuture<StreamsGroupDescription>> streamsGroups = adminClient.describeStreamsGroups(
493-
groupIds
510+
groupIds,
511+
withTimeoutMs(new DescribeStreamsGroupsOptions())
494512
).describedGroups();
495513

496514
streamsGroups.forEach((groupId, groupDescription) -> {
@@ -506,7 +524,10 @@ Map<String, Map<TopicPartition, OffsetAndMetadata>> resetOffsets() {
506524
List<String> internalTopics = getInternalTopicsToBeDeleted(groupId);
507525
if (!internalTopics.isEmpty()) {
508526
try {
509-
adminClient.deleteTopics(internalTopics).all().get();
527+
adminClient.deleteTopics(
528+
internalTopics,
529+
withTimeoutMs(new DeleteTopicsOptions())
530+
).all().get();
510531
} catch (InterruptedException | ExecutionException e) {
511532
if (e.getCause() instanceof UnknownTopicOrPartitionException) {
512533
printError("Deleting internal topics for group '" + groupId + "' failed because the topics do not exist.", Optional.empty());
@@ -742,7 +763,10 @@ private Map<String, Throwable> maybeDeleteInternalTopics(Map<String, Throwable>
742763
if (internalTopicsToDelete != null && !internalTopicsToDelete.isEmpty()) {
743764
DeleteTopicsResult deleteTopicsResult = null;
744765
try {
745-
deleteTopicsResult = adminClient.deleteTopics(internalTopicsToDelete);
766+
deleteTopicsResult = adminClient.deleteTopics(
767+
internalTopicsToDelete,
768+
withTimeoutMs(new DeleteTopicsOptions())
769+
);
746770
deleteTopicsResult.all().get();
747771
} catch (InterruptedException | ExecutionException e) {
748772
if (deleteTopicsResult != null) {
@@ -820,7 +844,10 @@ private void printInternalTopicErrors(Map<String, Throwable> internalTopicsDelet
820844
Map<String, List<String>> retrieveInternalTopics(List<String> groupIds) {
821845
Map<String, List<String>> groupToInternalTopics = new HashMap<>();
822846
try {
823-
Map<String, StreamsGroupDescription> descriptionMap = adminClient.describeStreamsGroups(groupIds).all().get();
847+
Map<String, StreamsGroupDescription> descriptionMap = adminClient.describeStreamsGroups(
848+
groupIds,
849+
withTimeoutMs(new DescribeStreamsGroupsOptions())
850+
).all().get();
824851
for (StreamsGroupDescription description : descriptionMap.values()) {
825852

826853
List<String> sourceTopics = description.subtopologies().stream()
@@ -848,7 +875,7 @@ Map<String, List<String>> retrieveInternalTopics(List<String> groupIds) {
848875
if (e.getCause() instanceof UnsupportedVersionException) {
849876
try {
850877
// Retrieve internal topic list if possible, and add the list of topic names to error message
851-
Set<String> allTopics = adminClient.listTopics().names().get();
878+
Set<String> allTopics = adminClient.listTopics(withTimeoutMs(new ListTopicsOptions())).names().get();
852879
List<String> internalTopics = allTopics.stream()
853880
.filter(topic -> groupIds.stream().anyMatch(groupId -> isInferredInternalTopic(topic, groupId)))
854881
.collect(Collectors.toList());
@@ -873,7 +900,8 @@ private Map<TopicPartition, OffsetAndMetadata> resetOffsetsForInactiveGroup(Stri
873900
if (!dryRun) {
874901
adminClient.alterStreamsGroupOffsets(
875902
groupId,
876-
preparedOffsets
903+
preparedOffsets,
904+
withTimeoutMs(new AlterStreamsGroupOffsetsOptions())
877905
).all().get();
878906
}
879907

@@ -947,22 +975,6 @@ public static boolean matchesInternalTopicFormat(final String topicName) {
947975
|| topicName.matches(".+-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-\\d+-topic");
948976
}
949977

950-
List<String> collectAllTopics(String groupId) {
951-
try {
952-
return adminClient.describeStreamsGroups(List.of(groupId))
953-
.all().get().get(groupId)
954-
.subtopologies().stream()
955-
.flatMap(subtopology -> Stream.of(
956-
subtopology.sourceTopics().stream(),
957-
subtopology.repartitionSinkTopics().stream(),
958-
subtopology.repartitionSourceTopics().keySet().stream(),
959-
subtopology.stateChangelogTopics().keySet().stream()
960-
).flatMap(s -> s)).distinct().collect(Collectors.toList());
961-
} catch (InterruptedException | ExecutionException e) {
962-
throw new RuntimeException(e);
963-
}
964-
}
965-
966978
Collection<StreamsGroupMemberDescription> collectGroupMembers(String groupId) throws Exception {
967979
return getDescribeGroup(groupId).members();
968980
}

tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,6 @@
1919
import org.apache.kafka.server.util.CommandDefaultOptions;
2020
import org.apache.kafka.server.util.CommandLineUtils;
2121

22-
import org.slf4j.Logger;
23-
import org.slf4j.LoggerFactory;
24-
2522
import java.util.List;
2623
import java.util.Set;
2724
import java.util.stream.Collectors;
@@ -32,7 +29,6 @@
3229

3330
public class StreamsGroupCommandOptions extends CommandDefaultOptions {
3431
private static final String NL = System.lineSeparator();
35-
static final Logger LOGGER = LoggerFactory.getLogger(StreamsGroupCommandOptions.class);
3632

3733
private static final String BOOTSTRAP_SERVER_DOC = "REQUIRED: The server(s) to connect to.";
3834
private static final String GROUP_DOC = "The streams group we wish to act on.";
@@ -144,11 +140,10 @@ public StreamsGroupCommandOptions(String[] args) {
144140
deleteOpt = parser.accepts("delete", DELETE_DOC);
145141
deleteOffsetsOpt = parser.accepts("delete-offsets", DELETE_OFFSETS_DOC);
146142
timeoutMsOpt = parser.accepts("timeout", TIMEOUT_MS_DOC)
147-
.availableIf(describeOpt)
148143
.withRequiredArg()
149144
.describedAs("timeout (ms)")
150145
.ofType(Long.class)
151-
.defaultsTo(5000L);
146+
.defaultsTo(30000L);
152147
commandConfigOpt = parser.accepts("command-config", COMMAND_CONFIG_DOC)
153148
.withRequiredArg()
154149
.describedAs("command config property file")
@@ -214,9 +209,6 @@ void checkArgs() {
214209

215210
if (options.has(describeOpt)) {
216211
checkDescribeArgs();
217-
} else {
218-
if (options.has(timeoutMsOpt))
219-
LOGGER.debug("Option " + timeoutMsOpt + " is applicable only when " + describeOpt + " is used.");
220212
}
221213

222214
if (options.has(deleteOpt)) {

0 commit comments

Comments
 (0)