Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -879,20 +879,24 @@ default DescribeConsumerGroupsResult describeConsumerGroups(Collection<String> g

/**
* List the consumer groups available in the cluster.
* @deprecated Since 4.1. Use {@link Admin#listGroups(ListGroupsOptions)} instead.
*
* @param options The options to use when listing the consumer groups.
* @return The ListConsumerGroupsResult.
*/
@Deprecated
ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions options);

/**
* List the consumer groups available in the cluster with the default options.
* <p>
* This is a convenience method for {@link #listConsumerGroups(ListConsumerGroupsOptions)} with default options.
* See the overload for more details.
* @deprecated Since 4.1. Use {@link Admin#listGroups(ListGroupsOptions)} instead.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use {@link Admin#listGroups()} instead.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. If you want Admin.listGroups() to return just consumer groups, you need to apply a filter to group type and protocol type.

*
* @return The ListConsumerGroupsResult.
*/
@Deprecated
default ListConsumerGroupsResult listConsumerGroups() {
return listConsumerGroups(new ListConsumerGroupsOptions());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@

/**
* A listing of a consumer group in the cluster.
* @deprecated Since 4.1. Use {@link Admin#listGroups(ListGroupsOptions)} and {@link GroupListing} instead.
*/
@Deprecated
public class ConsumerGroupListing {
private final String groupId;
private final boolean isSimpleConsumerGroup;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ public DescribeConsumerGroupsResult describeConsumerGroups(Collection<String> gr
}

@Override
@Deprecated
public ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions options) {
return delegate.listConsumerGroups(options);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3614,6 +3614,7 @@ public DescribeConsumerGroupsResult describeConsumerGroups(final Collection<Stri
.collect(Collectors.toMap(entry -> entry.getKey().idValue, Map.Entry::getValue)));
}

@Deprecated
private static final class ListConsumerGroupsResults {
private final List<Throwable> errors;
private final HashMap<String, ConsumerGroupListing> listings;
Expand Down Expand Up @@ -3657,6 +3658,7 @@ private synchronized void tryComplete() {
}

@Override
@Deprecated
public ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions options) {
final KafkaFutureImpl<Collection<Object>> all = new KafkaFutureImpl<>();
final long nowMetadata = time.milliseconds();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@

/**
* Options for {@link Admin#listConsumerGroups()}.
* @deprecated Since 4.1. Use {@link Admin#listGroups(ListGroupsOptions)} instead.
*/
@Deprecated
public class ListConsumerGroupsOptions extends AbstractOptions<ListConsumerGroupsOptions> {

private Set<GroupState> groupStates = Collections.emptySet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@

/**
* The result of the {@link Admin#listConsumerGroups()} call.
* <p>
* @deprecated Since 4.1. Use {@link Admin#listGroups(ListGroupsOptions)} instead.
*/
@Deprecated
public class ListConsumerGroupsResult {
private final KafkaFutureImpl<Collection<ConsumerGroupListing>> all;
private final KafkaFutureImpl<Collection<ConsumerGroupListing>> valid;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.kafka.clients.admin;

import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.annotation.InterfaceStability;
Expand All @@ -32,8 +33,19 @@
@InterfaceStability.Evolving
public class ListGroupsOptions extends AbstractOptions<ListGroupsOptions> {

private Set<GroupState> groupStates = Collections.emptySet();
private Set<GroupType> types = Collections.emptySet();
private Set<GroupState> groupStates = Set.of();
private Set<GroupType> types = Set.of();
private Set<String> protocolTypes = Set.of();

/**
* Only consumer groups will be returned by listGroups().
* This operation sets filters on group type and protocol type which select consumer groups.
*/
public ListGroupsOptions forConsumerGroups() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new API is not in KIP-1043, therefore its code style is probably open for discussion 😃

Given that this API appears to be a helper function for creating a specific query, perhaps we could refactor it as a static method.

    public static ListGroupsOptions forConsumerGroups() {
        return new ListGroupsOptions()
                .withTypes(Set.of(GroupType.CLASSIC, GroupType.CONSUMER))
                .withProtocolTypes(Set.of("", ConsumerProtocol.PROTOCOL_TYPE));
    }

and then the code new ListGroupsOptions().forConsumerGroups() can be replaced by ListGroupsOptions.forConsumerGroups()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree. I was trying to see what the best way to express this would be and I like the static method suggestion.

this.types = Set.of(GroupType.CLASSIC, GroupType.CONSUMER);
this.protocolTypes = Set.of("", ConsumerProtocol.PROTOCOL_TYPE);
return this;
}

/**
* If groupStates is set, only groups in these states will be returned by listGroups().
Expand All @@ -45,6 +57,11 @@ public ListGroupsOptions inGroupStates(Set<GroupState> groupStates) {
return this;
}

public ListGroupsOptions withProtocolTypes(Set<String> protocolTypes) {
this.protocolTypes = (protocolTypes == null || protocolTypes.isEmpty()) ? Set.of() : Set.copyOf(protocolTypes);
return this;
}

/**
* If types is set, only groups of these types will be returned by listGroups().
* Otherwise, all groups are returned.
Expand All @@ -61,6 +78,13 @@ public Set<GroupState> groupStates() {
return groupStates;
}

/**
* Returns the list of protocol types that are requested or empty if no protocol types have been specified.
*/
public Set<String> protocolTypes() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess there will be a follow-up to use this field?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'll add a few more tests. And update the KIP of course.

return protocolTypes;
}

/**
* Returns the list of group types that are requested or empty if no types have been specified.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
package org.apache.kafka.connect.mirror;

import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.clients.admin.GroupListing;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
import org.apache.kafka.clients.admin.ListGroupsOptions;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.Config;
Expand Down Expand Up @@ -225,7 +226,7 @@ private void loadInitialConsumerGroups()
Set<String> findConsumerGroups()
throws InterruptedException, ExecutionException {
List<String> filteredGroups = listConsumerGroups().stream()
.map(ConsumerGroupListing::groupId)
.map(GroupListing::groupId)
.filter(this::shouldReplicateByGroupFilter)
.collect(Collectors.toList());

Expand All @@ -252,10 +253,10 @@ Set<String> findConsumerGroups()
return checkpointGroups;
}

Collection<ConsumerGroupListing> listConsumerGroups()
Collection<GroupListing> listConsumerGroups()
throws InterruptedException, ExecutionException {
return adminCall(
() -> sourceAdminClient.listConsumerGroups().valid().get(),
() -> sourceAdminClient.listGroups(new ListGroupsOptions().forConsumerGroups()).valid().get(),
() -> "list consumer groups on " + config.sourceClusterAlias() + " cluster"
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
*/
package org.apache.kafka.connect.mirror;

import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.clients.admin.GroupListing;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
Expand All @@ -31,6 +33,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -144,9 +147,9 @@ public void testFindConsumerGroups() throws Exception {
MirrorCheckpointConnector connector = new MirrorCheckpointConnector(Collections.emptySet(), config);
connector = spy(connector);

Collection<ConsumerGroupListing> groups = Arrays.asList(
new ConsumerGroupListing("g1", true),
new ConsumerGroupListing("g2", false));
Collection<GroupListing> groups = Arrays.asList(
new GroupListing("g1", Optional.of(GroupType.CLASSIC), "", Optional.empty()),
new GroupListing("g2", Optional.of(GroupType.CLASSIC), ConsumerProtocol.PROTOCOL_TYPE, Optional.empty()));
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(new TopicPartition("t1", 0), new OffsetAndMetadata(0));
doReturn(groups).when(connector).listConsumerGroups();
Expand All @@ -159,7 +162,7 @@ public void testFindConsumerGroups() throws Exception {
doReturn(groupToOffsets).when(connector).listConsumerGroupOffsets(anyList());
Set<String> groupFound = connector.findConsumerGroups();

Set<String> expectedGroups = groups.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toSet());
Set<String> expectedGroups = groups.stream().map(GroupListing::groupId).collect(Collectors.toSet());
assertEquals(expectedGroups, groupFound,
"Expected groups are not the same as findConsumerGroups");

Expand All @@ -174,11 +177,11 @@ public void testFindConsumerGroupsInCommonScenarios() throws Exception {
MirrorCheckpointConnector connector = new MirrorCheckpointConnector(Collections.emptySet(), config);
connector = spy(connector);

Collection<ConsumerGroupListing> groups = Arrays.asList(
new ConsumerGroupListing("g1", true),
new ConsumerGroupListing("g2", false),
new ConsumerGroupListing("g3", false),
new ConsumerGroupListing("g4", false));
Collection<GroupListing> groups = Arrays.asList(
new GroupListing("g1", Optional.of(GroupType.CLASSIC), "", Optional.empty()),
new GroupListing("g2", Optional.of(GroupType.CLASSIC), ConsumerProtocol.PROTOCOL_TYPE, Optional.empty()),
new GroupListing("g3", Optional.of(GroupType.CLASSIC), ConsumerProtocol.PROTOCOL_TYPE, Optional.empty()),
new GroupListing("g4", Optional.of(GroupType.CLASSIC), ConsumerProtocol.PROTOCOL_TYPE, Optional.empty()));
Map<TopicPartition, OffsetAndMetadata> offsetsForGroup1 = new HashMap<>();
Map<TopicPartition, OffsetAndMetadata> offsetsForGroup2 = new HashMap<>();
Map<TopicPartition, OffsetAndMetadata> offsetsForGroup3 = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,17 @@
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterConsumerGroupOffsetsOptions;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.clients.admin.DeleteConsumerGroupOffsetsOptions;
import org.apache.kafka.clients.admin.DeleteConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.DeleteConsumerGroupsOptions;
import org.apache.kafka.clients.admin.DescribeConsumerGroupsOptions;
import org.apache.kafka.clients.admin.DescribeTopicsOptions;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.GroupListing;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
import org.apache.kafka.clients.admin.ListConsumerGroupsOptions;
import org.apache.kafka.clients.admin.ListConsumerGroupsResult;
import org.apache.kafka.clients.admin.ListGroupsOptions;
import org.apache.kafka.clients.admin.ListGroupsResult;
import org.apache.kafka.clients.admin.ListOffsetsOptions;
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
import org.apache.kafka.clients.admin.MemberDescription;
Expand Down Expand Up @@ -241,7 +241,7 @@ void listGroups() throws ExecutionException, InterruptedException {
if (includeType || includeState) {
Set<GroupType> types = typeValues();
Set<GroupState> states = stateValues();
List<ConsumerGroupListing> listings = listConsumerGroupsWithFilters(types, states);
List<GroupListing> listings = listConsumerGroupsWithFilters(types, states);

printGroupInfo(listings, includeType, includeState);
} else {
Expand All @@ -263,17 +263,17 @@ private Set<GroupType> typeValues() {
: consumerGroupTypesFromString(typeValue);
}

private void printGroupInfo(List<ConsumerGroupListing> groups, boolean includeType, boolean includeState) {
Function<ConsumerGroupListing, String> groupId = ConsumerGroupListing::groupId;
Function<ConsumerGroupListing, String> groupType = groupListing -> groupListing.type().orElse(GroupType.UNKNOWN).toString();
Function<ConsumerGroupListing, String> groupState = groupListing -> groupListing.groupState().orElse(GroupState.UNKNOWN).toString();
private void printGroupInfo(List<GroupListing> groups, boolean includeType, boolean includeState) {
Function<GroupListing, String> groupId = GroupListing::groupId;
Function<GroupListing, String> groupType = groupListing -> groupListing.type().orElse(GroupType.UNKNOWN).toString();
Function<GroupListing, String> groupState = groupListing -> groupListing.groupState().orElse(GroupState.UNKNOWN).toString();

OptionalInt maybeMax = groups.stream().mapToInt(groupListing -> Math.max(15, groupId.apply(groupListing).length())).max();
int maxGroupLen = maybeMax.orElse(15) + 10;
String format = "%-" + maxGroupLen + "s";
List<String> header = new ArrayList<>();
header.add("GROUP");
List<Function<ConsumerGroupListing, String>> extractors = new ArrayList<>();
List<Function<GroupListing, String>> extractors = new ArrayList<>();
extractors.add(groupId);

if (includeType) {
Expand All @@ -290,28 +290,29 @@ private void printGroupInfo(List<ConsumerGroupListing> groups, boolean includeTy

System.out.printf(format + "%n", header.toArray(new Object[0]));

for (ConsumerGroupListing groupListing : groups) {
for (GroupListing groupListing : groups) {
Object[] info = extractors.stream().map(extractor -> extractor.apply(groupListing)).toArray(Object[]::new);
System.out.printf(format + "%n", info);
}
}

List<String> listConsumerGroups() {
try {
ListConsumerGroupsResult result = adminClient.listConsumerGroups(withTimeoutMs(new ListConsumerGroupsOptions()));
Collection<ConsumerGroupListing> listings = result.all().get();
return listings.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList());
ListGroupsResult result = adminClient.listGroups(withTimeoutMs(new ListGroupsOptions().forConsumerGroups()));
Collection<GroupListing> listings = result.all().get();
return listings.stream().map(GroupListing::groupId).collect(Collectors.toList());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}

List<ConsumerGroupListing> listConsumerGroupsWithFilters(Set<GroupType> types, Set<GroupState> states) throws ExecutionException, InterruptedException {
ListConsumerGroupsOptions listConsumerGroupsOptions = withTimeoutMs(new ListConsumerGroupsOptions());
listConsumerGroupsOptions
List<GroupListing> listConsumerGroupsWithFilters(Set<GroupType> types, Set<GroupState> states) throws ExecutionException, InterruptedException {
ListGroupsOptions listGroupsOptions = withTimeoutMs(new ListGroupsOptions());
listGroupsOptions
.forConsumerGroups()
.inGroupStates(states)
.withTypes(types);
ListConsumerGroupsResult result = adminClient.listConsumerGroups(listConsumerGroupsOptions);
ListGroupsResult result = adminClient.listGroups(listGroupsOptions);
return new ArrayList<>(result.all().get());
}

Expand Down
Loading