Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public class ListOffsetsIntegrationTest {
public void setup() throws InterruptedException {
clusterInstance.waitForReadyBrokers();
clusterInstance.createTopic(TOPIC, PARTITION, REPLICAS);
adminClient = clusterInstance.admin();
adminClient = clusterInstance.admin(Map.of(), true);
}

@AfterEach
Expand Down Expand Up @@ -288,4 +288,10 @@ private void setUpForLogAppendTimeCase() throws InterruptedException {
private void createTopicWithConfig(String topic, Map<String, String> props) throws InterruptedException {
clusterInstance.createTopic(topic, PARTITION, REPLICAS, props);
}

@ClusterTest
public void testDescribeCluster(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException {
System.out.println(adminClient.describeCluster().nodes().get());

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
public enum EndpointType {
UNKNOWN((byte) 0),
BROKER((byte) 1),
CONTROLLER((byte) 2);
CONTROLLER((byte) 2),
ALL((byte) 3);

private final byte id;

Expand All @@ -40,6 +41,8 @@ public static EndpointType fromId(byte id) {
return BROKER;
} else if (id == CONTROLLER.id) {
return CONTROLLER;
} else if (id == ALL.id) {
return ALL;
} else {
return UNKNOWN;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2475,7 +2475,7 @@ AbstractRequest.Builder<?> createRequest(int timeoutMs) {
return new DescribeClusterRequest.Builder(new DescribeClusterRequestData()
.setIncludeClusterAuthorizedOperations(options.includeAuthorizedOperations())
.setEndpointType(metadataManager.usingBootstrapControllers() ?
EndpointType.CONTROLLER.id() : EndpointType.BROKER.id())
EndpointType.ALL.id() : EndpointType.BROKER.id())
.setIncludeFencedBrokers(options.includeFencedBrokers()));
} else {
// Since this only requests node information, it's safe to pass true for allowAutoTopicCreation (and it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@
//
// Version 1 adds EndpointType for KIP-919 support.
// Version 2 adds IncludeFencedBrokers for KIP-1073 support.
// Version 3 support return brokers and controllers
//
"validVersions": "0-2",
"flexibleVersions": "0+",
"fields": [
{ "name": "IncludeClusterAuthorizedOperations", "type": "bool", "versions": "0+",
"about": "Whether to include cluster authorized operations." },
{ "name": "EndpointType", "type": "int8", "versions": "1+", "default": "1",
"about": "The endpoint type to describe. 1=brokers, 2=controllers." },
"about": "The endpoint type to describe. 1=brokers, 2=controllers. 3=all." },
{ "name": "IncludeFencedBrokers", "type": "bool", "versions": "2+",
"about": "Whether to include fenced brokers when listing brokers." }
]
Expand Down
43 changes: 40 additions & 3 deletions core/src/main/scala/kafka/server/AuthHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -123,20 +123,23 @@ class AuthHelper(authorizer: Option[Plugin[Authorizer]]) {
request: RequestChannel.Request,
expectedEndpointType: EndpointType,
clusterId: String,
getNodes: () => DescribeClusterBrokerCollection,
getControllers: () => DescribeClusterBrokerCollection,
getBrokers: () => DescribeClusterBrokerCollection,
getControllerId: () => Int
): DescribeClusterResponseData = {
val describeClusterRequest = request.body[DescribeClusterRequest]
val requestEndpointType = EndpointType.fromId(describeClusterRequest.data().endpointType())
System.err.println("server receive request and requestEndpointType: " + requestEndpointType)
if (requestEndpointType.equals(EndpointType.UNKNOWN)) {
System.err.println("zZZZZ")
return new DescribeClusterResponseData().
setErrorCode(if (request.header.data().requestApiVersion() == 0) {
Errors.INVALID_REQUEST.code()
} else {
Errors.UNSUPPORTED_ENDPOINT_TYPE.code()
}).
setErrorMessage("Unsupported endpoint type " + describeClusterRequest.data().endpointType().toInt)
} else if (!expectedEndpointType.equals(requestEndpointType)) {
} else if (!expectedEndpointType.equals(requestEndpointType) && !EndpointType.ALL.equals(requestEndpointType)) {
return new DescribeClusterResponseData().
setErrorCode(if (request.header.data().requestApiVersion() == 0) {
Errors.INVALID_REQUEST.code()
Expand All @@ -155,8 +158,18 @@ class AuthHelper(authorizer: Option[Plugin[Authorizer]]) {
clusterAuthorizedOperations = 0
}
// Get the node list and the controller ID.
val nodes = getNodes()
val controllers = getControllers()
System.err.println("getControllers" + controllers)
var nodes = controllers
if (requestEndpointType == EndpointType.ALL) {
val brokers = getBrokers()
nodes = mergeDescribeClusterBrokerCollection(controllers, brokers)
System.err.println("getBrokers " + brokers)
}

val controllerId = getControllerId()

System.err.println("nodes: " + nodes)
// If the provided controller ID is not in the node list, return -1 instead
// to avoid confusing the client. This could happen in a case where we know
// the controller ID, but we don't yet have KIP-919 information about that
Expand All @@ -173,4 +186,28 @@ class AuthHelper(authorizer: Option[Plugin[Authorizer]]) {
setBrokers(nodes).
setEndpointType(expectedEndpointType.id())
}

private def mergeDescribeClusterBrokerCollection(
controllers: DescribeClusterBrokerCollection,
brokers : DescribeClusterBrokerCollection ): DescribeClusterBrokerCollection = {

val newDescribeClusterBrokerCollection = new DescribeClusterBrokerCollection(controllers.size() + brokers.size())
System.err.println("merge function for broker " + brokers)
val brokersIter = brokers.iterator()
val controllerIter = controllers.iterator()

while(controllerIter.hasNext) {
val controller = controllerIter.next()
newDescribeClusterBrokerCollection.add(controller.duplicate())
}

while(brokersIter.hasNext) {
val broker = brokersIter.next()
newDescribeClusterBrokerCollection.add(broker.duplicate())
}

System.err.println("newDescribeClusterBrokerCollection " + newDescribeClusterBrokerCollection)

newDescribeClusterBrokerCollection
}
}
8 changes: 8 additions & 0 deletions core/src/main/scala/kafka/server/ControllerApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
import org.apache.kafka.common.resource.ResourceType.{CLUSTER, GROUP, TOPIC, USER}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.controller.ControllerRequestContext.requestTimeoutMsToDeadlineNs
import org.apache.kafka.controller.{Controller, ControllerRequestContext}
import org.apache.kafka.image.publisher.ControllerRegistrationsPublisher
Expand Down Expand Up @@ -1071,6 +1072,13 @@ class ControllerApis(
EndpointType.CONTROLLER,
clusterId,
() => registrationsPublisher.describeClusterControllers(request.context.listenerName()),
() => {
// FIXME:
val a = controller.brokerNodes(ListenerName.normalised("EXTERNAL"))
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't distinguish what listener is needed.

System.err.println("listenerName " + request.context.listenerName)
System.err.println("controller.brokerNodes " + a)
a
},
() => raftManager.client.leaderAndEpoch.leaderId().orElse(-1)
)
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2442,13 +2442,18 @@ class KafkaApis(val requestChannel: RequestChannel,
}

def handleDescribeCluster(request: RequestChannel.Request): Unit = {
System.err.print("receive DescribeCluster")
val response = authHelper.computeDescribeClusterResponse(
request,
EndpointType.BROKER,
clusterId,
() => {
new DescribeClusterResponseData.DescribeClusterBrokerCollection()
},
() => {
val brokers = new DescribeClusterResponseData.DescribeClusterBrokerCollection()
val describeClusterRequest = request.body[DescribeClusterRequest]
System.err.println("request.context.listenerName " + request.context.listenerName)
metadataCache.getBrokerNodes(request.context.listenerName).forEach { node =>
if (!node.isFenced || describeClusterRequest.data().includeFencedBrokers()) {
brokers.add(new DescribeClusterResponseData.DescribeClusterBroker().
Expand Down
6 changes: 6 additions & 0 deletions core/src/test/scala/unit/kafka/server/AuthHelperTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ class AuthHelperTest {
EndpointType.BROKER,
"ltCWoi9wRhmHSQCIgAznEg",
() => new DescribeClusterBrokerCollection(),
() => new DescribeClusterBrokerCollection(),
() => 1)
assertEquals(new DescribeClusterResponseData().
setErrorCode(Errors.UNSUPPORTED_ENDPOINT_TYPE.code()).
Expand All @@ -178,6 +179,7 @@ class AuthHelperTest {
EndpointType.BROKER,
"ltCWoi9wRhmHSQCIgAznEg",
() => new DescribeClusterBrokerCollection(),
() => new DescribeClusterBrokerCollection(),
() => 1)
assertEquals(new DescribeClusterResponseData().
setErrorCode(Errors.INVALID_REQUEST.code()).
Expand All @@ -195,6 +197,7 @@ class AuthHelperTest {
EndpointType.CONTROLLER,
"ltCWoi9wRhmHSQCIgAznEg",
() => new DescribeClusterBrokerCollection(),
() => new DescribeClusterBrokerCollection(),
() => 1)
assertEquals(new DescribeClusterResponseData().
setErrorCode(Errors.MISMATCHED_ENDPOINT_TYPE.code()).
Expand All @@ -212,6 +215,7 @@ class AuthHelperTest {
EndpointType.CONTROLLER,
"ltCWoi9wRhmHSQCIgAznEg",
() => new DescribeClusterBrokerCollection(),
() => new DescribeClusterBrokerCollection(),
() => 1)
assertEquals(new DescribeClusterResponseData().
setErrorCode(Errors.INVALID_REQUEST.code()).
Expand All @@ -229,6 +233,7 @@ class AuthHelperTest {
EndpointType.CONTROLLER,
"ltCWoi9wRhmHSQCIgAznEg",
() => new DescribeClusterBrokerCollection(),
() => new DescribeClusterBrokerCollection(),
() => 1)
assertEquals(new DescribeClusterResponseData().
setClusterId("ltCWoi9wRhmHSQCIgAznEg").
Expand All @@ -251,6 +256,7 @@ class AuthHelperTest {
EndpointType.CONTROLLER,
"ltCWoi9wRhmHSQCIgAznEg",
() => nodes,
() => nodes,
() => 1)
assertEquals(new DescribeClusterResponseData().
setClusterId("ltCWoi9wRhmHSQCIgAznEg").
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.DescribeClusterResponseData;
import org.apache.kafka.common.message.ElectLeadersRequestData;
import org.apache.kafka.common.message.ElectLeadersResponseData;
import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
Expand All @@ -49,6 +50,7 @@
import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
import org.apache.kafka.common.message.UpdateFeaturesRequestData;
import org.apache.kafka.common.message.UpdateFeaturesResponseData;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.common.requests.ApiError;
Expand Down Expand Up @@ -328,6 +330,8 @@ CompletableFuture<BrokerRegistrationReply> registerBroker(
BrokerRegistrationRequestData request
);

DescribeClusterResponseData.DescribeClusterBrokerCollection brokerNodes(ListenerName listener);

/**
* Wait for the given number of brokers to be registered and unfenced.
* This is for testing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
import org.apache.kafka.clients.admin.FeatureUpdate;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
Expand Down Expand Up @@ -47,6 +48,7 @@
import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.DescribeClusterResponseData;
import org.apache.kafka.common.message.ElectLeadersRequestData;
import org.apache.kafka.common.message.ElectLeadersResponseData;
import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
Expand Down Expand Up @@ -83,6 +85,7 @@
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
import org.apache.kafka.common.metadata.UserScramCredentialRecord;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
Expand All @@ -97,6 +100,7 @@
import org.apache.kafka.deferred.DeferredEvent;
import org.apache.kafka.deferred.DeferredEventQueue;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.metadata.BrokerRegistrationReply;
import org.apache.kafka.metadata.FinalizedControllerFeatures;
import org.apache.kafka.metadata.KafkaConfigSchema;
Expand Down Expand Up @@ -1986,6 +1990,7 @@ public CompletableFuture<BrokerRegistrationReply> registerBroker(
Map<String, Short> controllerFeatures = new HashMap<>(featureControl.finalizedFeatures(Long.MAX_VALUE).featureMap());
// Populate finalized features map with latest known kraft version for validation.
controllerFeatures.put(KRaftVersion.FEATURE_NAME, raftClient.kraftVersion().featureLevel());
this.log.error("kkkk register broker");
return clusterControl.
registerBroker(request, offsetControl.nextWriteOffset(),
new FinalizedControllerFeatures(controllerFeatures, Long.MAX_VALUE),
Expand All @@ -1994,6 +1999,25 @@ public CompletableFuture<BrokerRegistrationReply> registerBroker(
EnumSet.noneOf(ControllerOperationFlag.class));
}

@Override
public DescribeClusterResponseData.DescribeClusterBrokerCollection brokerNodes(ListenerName listerName) {
DescribeClusterResponseData.DescribeClusterBrokerCollection results =
new DescribeClusterResponseData.DescribeClusterBrokerCollection();
Map<Integer, BrokerRegistration> brokerRegistrations = clusterControl.brokerRegistrations();
for (BrokerRegistration broker : brokerRegistrations.values()) {
Endpoint endpoint = broker.listeners().get(listerName.value());
if (endpoint != null) {
results.add(new DescribeClusterResponseData.DescribeClusterBroker()
.setBrokerId(broker.id())
.setHost(endpoint.host())
.setRack(broker.rack().isPresent() ? broker.rack().get() : null)
.setIsFenced(broker.fenced()));
}
}

return results;
}

@Override
public CompletableFuture<Map<ClientQuotaEntity, ApiError>> alterClientQuotas(
ControllerRequestContext context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
import org.apache.kafka.common.message.DescribeClusterResponseData;
import org.apache.kafka.common.message.ElectLeadersRequestData;
import org.apache.kafka.common.message.ElectLeadersResponseData;
import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
Expand All @@ -55,6 +56,7 @@
import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
import org.apache.kafka.common.message.UpdateFeaturesRequestData;
import org.apache.kafka.common.message.UpdateFeaturesResponseData;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
Expand Down Expand Up @@ -522,6 +524,11 @@ public CompletableFuture<Void> registerController(
throw new UnsupportedOperationException();
}

@Override
public DescribeClusterResponseData.DescribeClusterBrokerCollection brokerNodes(ListenerName listener) {
throw new UnsupportedOperationException();
}

@Override
public void beginShutdown() {
this.active = false;
Expand Down
Loading