Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -879,20 +879,24 @@ default DescribeConsumerGroupsResult describeConsumerGroups(Collection<String> g

/**
* List the consumer groups available in the cluster.
* @deprecated Since 4.1. Use {@link Admin#listGroups(ListGroupsOptions)} instead.
*
* @param options The options to use when listing the consumer groups.
* @return The ListConsumerGroupsResult.
*/
@Deprecated(since = "4.1", forRemoval = true)
ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions options);

/**
* List the consumer groups available in the cluster with the default options.
* <p>
* This is a convenience method for {@link #listConsumerGroups(ListConsumerGroupsOptions)} with default options.
* See the overload for more details.
* @deprecated Since 4.1. Use {@link Admin#listGroups(ListGroupsOptions)} instead.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use {@link Admin#listGroups()} instead.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. If you want Admin.listGroups() to return just consumer groups, you need to apply a filter to group type and protocol type.

*
* @return The ListConsumerGroupsResult.
*/
@Deprecated(since = "4.1", forRemoval = true)
default ListConsumerGroupsResult listConsumerGroups() {
return listConsumerGroups(new ListConsumerGroupsOptions());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@

/**
* A listing of a consumer group in the cluster.
* @deprecated Since 4.1. Use {@link Admin#listGroups(ListGroupsOptions)} and {@link GroupListing} instead.
*/
@Deprecated(since = "4.1")
public class ConsumerGroupListing {
private final String groupId;
private final boolean isSimpleConsumerGroup;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ public DescribeConsumerGroupsResult describeConsumerGroups(Collection<String> gr
}

@Override
@Deprecated
@SuppressWarnings("removal")
public ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions options) {
return delegate.listConsumerGroups(options);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3614,6 +3614,7 @@ public DescribeConsumerGroupsResult describeConsumerGroups(final Collection<Stri
.collect(Collectors.toMap(entry -> entry.getKey().idValue, Map.Entry::getValue)));
}

@Deprecated
private static final class ListConsumerGroupsResults {
private final List<Throwable> errors;
private final HashMap<String, ConsumerGroupListing> listings;
Expand Down Expand Up @@ -3657,6 +3658,8 @@ private synchronized void tryComplete() {
}

@Override
@SuppressWarnings("removal")
@Deprecated(since = "4.1", forRemoval = true)
public ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions options) {
final KafkaFutureImpl<Collection<Object>> all = new KafkaFutureImpl<>();
final long nowMetadata = time.milliseconds();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@

/**
* Options for {@link Admin#listConsumerGroups()}.
* @deprecated Since 4.1. Use {@link Admin#listGroups(ListGroupsOptions)} instead.
*/
@Deprecated(since = "4.1")
@SuppressWarnings("removal")
public class ListConsumerGroupsOptions extends AbstractOptions<ListConsumerGroupsOptions> {

private Set<GroupState> groupStates = Collections.emptySet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@

/**
* The result of the {@link Admin#listConsumerGroups()} call.
* <p>
* @deprecated Since 4.1. Use {@link Admin#listGroups(ListGroupsOptions)} instead.
*/
@Deprecated(since = "4.1")
@SuppressWarnings("removal")
public class ListConsumerGroupsResult {
private final KafkaFutureImpl<Collection<ConsumerGroupListing>> all;
private final KafkaFutureImpl<Collection<ConsumerGroupListing>> valid;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@

package org.apache.kafka.clients.admin;

import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.annotation.InterfaceStability;

import java.util.Collections;
import java.util.Set;

/**
Expand All @@ -32,16 +32,32 @@
@InterfaceStability.Evolving
public class ListGroupsOptions extends AbstractOptions<ListGroupsOptions> {

private Set<GroupState> groupStates = Collections.emptySet();
private Set<GroupType> types = Collections.emptySet();
private Set<GroupState> groupStates = Set.of();
private Set<GroupType> types = Set.of();
private Set<String> protocolTypes = Set.of();

/**
* Only consumer groups will be returned by listGroups().
* This operation sets filters on group type and protocol type which select consumer groups.
*/
public static ListGroupsOptions forConsumerGroups() {
return new ListGroupsOptions()
.withTypes(Set.of(GroupType.CLASSIC, GroupType.CONSUMER))
.withProtocolTypes(Set.of("", ConsumerProtocol.PROTOCOL_TYPE));
}

/**
* If groupStates is set, only groups in these states will be returned by listGroups().
* Otherwise, all groups are returned.
* This operation is supported by brokers with version 2.6.0 or later.
*/
public ListGroupsOptions inGroupStates(Set<GroupState> groupStates) {
this.groupStates = (groupStates == null || groupStates.isEmpty()) ? Collections.emptySet() : Set.copyOf(groupStates);
this.groupStates = (groupStates == null || groupStates.isEmpty()) ? Set.of() : Set.copyOf(groupStates);
return this;
}

public ListGroupsOptions withProtocolTypes(Set<String> protocolTypes) {
this.protocolTypes = (protocolTypes == null || protocolTypes.isEmpty()) ? Set.of() : Set.copyOf(protocolTypes);
return this;
}

Expand All @@ -61,6 +77,13 @@ public Set<GroupState> groupStates() {
return groupStates;
}

/**
* Returns the list of protocol types that are requested or empty if no protocol types have been specified.
*/
public Set<String> protocolTypes() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess there will be a follow-up to use this field?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'll add a few more tests. And update the KIP of course.

return protocolTypes;
}

/**
* Returns the list of group types that are requested or empty if no types have been specified.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3268,6 +3268,7 @@ public void testDescribeClusterHandleUnsupportedVersionForIncludingFencedBrokers
}

@Test
@SuppressWarnings("removal")
public void testListConsumerGroups() throws Exception {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(4, 0),
AdminClientConfig.RETRIES_CONFIG, "2")) {
Expand Down Expand Up @@ -3377,6 +3378,7 @@ public void testListConsumerGroups() throws Exception {
}

@Test
@SuppressWarnings("removal")
public void testListConsumerGroupsMetadataFailure() throws Exception {
final Cluster cluster = mockCluster(3, 0);
final Time time = new MockTime();
Expand All @@ -3400,6 +3402,7 @@ public void testListConsumerGroupsMetadataFailure() throws Exception {
}

@Test
@SuppressWarnings("removal")
public void testListConsumerGroupsWithStates() throws Exception {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
Expand Down Expand Up @@ -3433,6 +3436,7 @@ public void testListConsumerGroupsWithStates() throws Exception {
}

@Test
@SuppressWarnings("removal")
public void testListConsumerGroupsWithTypes() throws Exception {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
Expand Down Expand Up @@ -3495,6 +3499,7 @@ public void testListConsumerGroupsWithTypes() throws Exception {
}

@Test
@SuppressWarnings("removal")
public void testListConsumerGroupsWithStatesOlderBrokerVersion() throws Exception {
ApiVersion listGroupV3 = new ApiVersion()
.setApiKey(ApiKeys.LIST_GROUPS.id)
Expand Down Expand Up @@ -3533,6 +3538,7 @@ public void testListConsumerGroupsWithStatesOlderBrokerVersion() throws Exceptio
}

@Test
@SuppressWarnings("removal")
public void testListConsumerGroupsWithTypesOlderBrokerVersion() throws Exception {
ApiVersion listGroupV4 = new ApiVersion()
.setApiKey(ApiKeys.LIST_GROUPS.id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,7 @@ public synchronized DescribeConsumerGroupsResult describeConsumerGroups(Collecti
}

@Override
@SuppressWarnings("removal")
public synchronized ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions options) {
KafkaFutureImpl<Collection<Object>> future = new KafkaFutureImpl<>();
future.complete(groupConfigs.keySet().stream().map(g -> new ConsumerGroupListing(g, false)).collect(Collectors.toList()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
package org.apache.kafka.connect.mirror;

import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.clients.admin.GroupListing;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
import org.apache.kafka.clients.admin.ListGroupsOptions;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.Config;
Expand Down Expand Up @@ -225,7 +226,7 @@ private void loadInitialConsumerGroups()
Set<String> findConsumerGroups()
throws InterruptedException, ExecutionException {
List<String> filteredGroups = listConsumerGroups().stream()
.map(ConsumerGroupListing::groupId)
.map(GroupListing::groupId)
.filter(this::shouldReplicateByGroupFilter)
.collect(Collectors.toList());

Expand All @@ -252,10 +253,10 @@ Set<String> findConsumerGroups()
return checkpointGroups;
}

Collection<ConsumerGroupListing> listConsumerGroups()
Collection<GroupListing> listConsumerGroups()
throws InterruptedException, ExecutionException {
return adminCall(
() -> sourceAdminClient.listConsumerGroups().valid().get(),
() -> sourceAdminClient.listGroups(ListGroupsOptions.forConsumerGroups()).valid().get(),
() -> "list consumer groups on " + config.sourceClusterAlias() + " cluster"
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
*/
package org.apache.kafka.connect.mirror;

import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.clients.admin.GroupListing;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
Expand All @@ -31,6 +33,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -144,9 +147,9 @@ public void testFindConsumerGroups() throws Exception {
MirrorCheckpointConnector connector = new MirrorCheckpointConnector(Collections.emptySet(), config);
connector = spy(connector);

Collection<ConsumerGroupListing> groups = Arrays.asList(
new ConsumerGroupListing("g1", true),
new ConsumerGroupListing("g2", false));
Collection<GroupListing> groups = Arrays.asList(
new GroupListing("g1", Optional.of(GroupType.CLASSIC), "", Optional.empty()),
new GroupListing("g2", Optional.of(GroupType.CLASSIC), ConsumerProtocol.PROTOCOL_TYPE, Optional.empty()));
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(new TopicPartition("t1", 0), new OffsetAndMetadata(0));
doReturn(groups).when(connector).listConsumerGroups();
Expand All @@ -159,7 +162,7 @@ public void testFindConsumerGroups() throws Exception {
doReturn(groupToOffsets).when(connector).listConsumerGroupOffsets(anyList());
Set<String> groupFound = connector.findConsumerGroups();

Set<String> expectedGroups = groups.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toSet());
Set<String> expectedGroups = groups.stream().map(GroupListing::groupId).collect(Collectors.toSet());
assertEquals(expectedGroups, groupFound,
"Expected groups are not the same as findConsumerGroups");

Expand All @@ -174,11 +177,11 @@ public void testFindConsumerGroupsInCommonScenarios() throws Exception {
MirrorCheckpointConnector connector = new MirrorCheckpointConnector(Collections.emptySet(), config);
connector = spy(connector);

Collection<ConsumerGroupListing> groups = Arrays.asList(
new ConsumerGroupListing("g1", true),
new ConsumerGroupListing("g2", false),
new ConsumerGroupListing("g3", false),
new ConsumerGroupListing("g4", false));
Collection<GroupListing> groups = Arrays.asList(
new GroupListing("g1", Optional.of(GroupType.CLASSIC), "", Optional.empty()),
new GroupListing("g2", Optional.of(GroupType.CLASSIC), ConsumerProtocol.PROTOCOL_TYPE, Optional.empty()),
new GroupListing("g3", Optional.of(GroupType.CLASSIC), ConsumerProtocol.PROTOCOL_TYPE, Optional.empty()),
new GroupListing("g4", Optional.of(GroupType.CLASSIC), ConsumerProtocol.PROTOCOL_TYPE, Optional.empty()));
Map<TopicPartition, OffsetAndMetadata> offsetsForGroup1 = new HashMap<>();
Map<TopicPartition, OffsetAndMetadata> offsetsForGroup2 = new HashMap<>();
Map<TopicPartition, OffsetAndMetadata> offsetsForGroup3 = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.clients.admin.GroupListing;
import org.apache.kafka.clients.admin.ListGroupsOptions;
import org.apache.kafka.common.test.api.Flaky;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.SourceConnectorConfig;
Expand Down Expand Up @@ -184,7 +185,7 @@ public void testGetSinkConnectorOffsetsOverriddenConsumerGroupId() throws Except

// Ensure that the overridden consumer group ID was the one actually used
try (Admin admin = connect.kafka().createAdminClient()) {
Collection<ConsumerGroupListing> consumerGroups = admin.listConsumerGroups().all().get();
Collection<GroupListing> consumerGroups = admin.listGroups(ListGroupsOptions.forConsumerGroups()).all().get();
assertTrue(consumerGroups.stream().anyMatch(consumerGroupListing -> overriddenGroupId.equals(consumerGroupListing.groupId())));
assertTrue(consumerGroups.stream().noneMatch(consumerGroupListing -> SinkUtils.consumerGroupId(connectorName).equals(consumerGroupListing.groupId())));
}
Expand Down Expand Up @@ -343,7 +344,7 @@ public void testAlterSinkConnectorOffsetsOverriddenConsumerGroupId() throws Exce
alterAndVerifySinkConnectorOffsets(connectorConfigs, connect.kafka());
// Ensure that the overridden consumer group ID was the one actually used
try (Admin admin = connect.kafka().createAdminClient()) {
Collection<ConsumerGroupListing> consumerGroups = admin.listConsumerGroups().all().get();
Collection<GroupListing> consumerGroups = admin.listGroups(ListGroupsOptions.forConsumerGroups()).all().get();
assertTrue(consumerGroups.stream().anyMatch(consumerGroupListing -> overriddenGroupId.equals(consumerGroupListing.groupId())));
assertTrue(consumerGroups.stream().noneMatch(consumerGroupListing -> SinkUtils.consumerGroupId(connectorName).equals(consumerGroupListing.groupId())));
}
Expand Down Expand Up @@ -724,7 +725,7 @@ public void testResetSinkConnectorOffsetsOverriddenConsumerGroupId() throws Exce
resetAndVerifySinkConnectorOffsets(connectorConfigs, connect.kafka());
// Ensure that the overridden consumer group ID was the one actually used
try (Admin admin = connect.kafka().createAdminClient()) {
Collection<ConsumerGroupListing> consumerGroups = admin.listConsumerGroups().all().get();
Collection<GroupListing> consumerGroups = admin.listGroups(ListGroupsOptions.forConsumerGroups()).all().get();
assertTrue(consumerGroups.stream().anyMatch(consumerGroupListing -> overriddenGroupId.equals(consumerGroupListing.groupId())));
assertTrue(consumerGroups.stream().noneMatch(consumerGroupListing -> SinkUtils.consumerGroupId(connectorName).equals(consumerGroupListing.groupId())));
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/admin/ConfigCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import joptsimple._
import kafka.server.DynamicConfig
import kafka.utils.Implicits._
import kafka.utils.Logging
import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions, AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, DescribeConfigsOptions, ListTopicsOptions, ScramCredentialInfo, UserScramCredentialDeletion, UserScramCredentialUpsertion, ScramMechanism => PublicScramMechanism}
import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions, AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, DescribeConfigsOptions, ListGroupsOptions, ListTopicsOptions, ScramCredentialInfo, UserScramCredentialDeletion, UserScramCredentialUpsertion, ScramMechanism => PublicScramMechanism}
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors.{InvalidConfigurationException, UnsupportedVersionException}
import org.apache.kafka.common.internals.Topic
Expand Down Expand Up @@ -350,7 +350,7 @@ object ConfigCommand extends Logging {
case ClientMetricsType =>
adminClient.listClientMetricsResources().all().get().asScala.map(_.name).toSeq
case GroupType =>
adminClient.listConsumerGroups().all.get.asScala.map(_.groupId).toSeq
adminClient.listGroups(ListGroupsOptions.forConsumerGroups()).all.get.asScala.map(_.groupId).toSeq
case entityType => throw new IllegalArgumentException(s"Invalid entity type: $entityType")
})

Expand Down
Loading