Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
@ClusterTestDefaults(
types = {Type.KRAFT},
brokers = 3,
controllers = 3,
serverProperties = {
@ClusterConfigProperty(key = "log.retention.ms", value = "-1"),
}
Expand All @@ -74,14 +75,21 @@ public class ListOffsetsIntegrationTest {
public void setup() throws InterruptedException {
clusterInstance.waitForReadyBrokers();
clusterInstance.createTopic(TOPIC, PARTITION, REPLICAS);
adminClient = clusterInstance.admin();
adminClient = clusterInstance.admin(Map.of(), true);
}

@AfterEach
public void teardown() {
Utils.closeQuietly(adminClient, "ListOffsetsAdminClient");
}

@ClusterTest
public void testFeature() throws ExecutionException, InterruptedException {
var opts = new DescribeFeaturesOptions().nodeId(2);
// var opts = new DescribeFeaturesOptions();
System.err.println(adminClient.describeFeatures(opts).featureMetadata().get());
}

@ClusterTest
public void testListMaxTimestampWithEmptyLog() throws InterruptedException, ExecutionException {
ListOffsetsResultInfo maxTimestampOffset = runFetchOffsets(OffsetSpec.maxTimestamp(), TOPIC);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,27 @@
*/
package org.apache.kafka.clients.admin;

import java.util.OptionalInt;

/**
* Options for {@link AdminClient#describeFeatures(DescribeFeaturesOptions)}.
*/
public class DescribeFeaturesOptions extends AbstractOptions<DescribeFeaturesOptions> {
private OptionalInt nodeId = OptionalInt.empty();

/**
* Set the node id to which the request should be sent.
*/
public DescribeFeaturesOptions nodeId(int nodeId) {
this.nodeId = OptionalInt.of(nodeId);
return this;
}

/**
* The node id to which the request should be sent. If the node id is empty, the request will be sent to the
* arbitrary controller/broker.
*/
public OptionalInt nodeId() {
return nodeId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -756,6 +756,7 @@ private class ConstantNodeIdProvider implements NodeProvider {
public Node provide() {
if (metadataManager.isReady() &&
(metadataManager.nodeById(nodeId) != null)) {
System.err.println("ConstantNodeIdProvider nodeId" + nodeId);
return metadataManager.nodeById(nodeId);
}
// If we can't find the node with the given constant ID, we schedule a
Expand Down Expand Up @@ -4518,9 +4519,10 @@ private static byte[] getSaltedPassword(ScramMechanism publicScramMechanism, byt
public DescribeFeaturesResult describeFeatures(final DescribeFeaturesOptions options) {
final KafkaFutureImpl<FeatureMetadata> future = new KafkaFutureImpl<>();
final long now = time.milliseconds();
NodeProvider nodeProvider = options.nodeId().isEmpty() ?
new LeastLoadedBrokerOrActiveKController() : new ConstantNodeIdProvider(options.nodeId().getAsInt(), true);
final Call call = new Call(
"describeFeatures", calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedBrokerOrActiveKController()) {

"describeFeatures", calcDeadlineMs(now, options.timeoutMs()), nodeProvider) {
private FeatureMetadata createFeatureMetadata(final ApiVersionsResponse response) {
final Map<String, FinalizedVersionRange> finalizedFeatures = new HashMap<>();
for (final FinalizedFeatureKey key : response.data().finalizedFeatures().valuesSet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ public Node controller() {
}

public Node nodeById(int nodeId) {
System.err.println("in adminMetadataManager cluster " + cluster);
return cluster.nodeById(nodeId);
}

Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/kafka/server/ControllerApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,7 @@ class ControllerApis(
// If this is considered to leak information about the controller version a workaround is to use SSL
// with client authentication which is performed at an earlier stage of the connection where the
// ApiVersionRequest is not available.
System.err.println("controllerApis handleApiVersionRequest node.id=" + config.nodeId)
val apiVersionRequest = request.body[ApiVersionsRequest]
if (apiVersionRequest.hasUnsupportedRequestVersion) {
requestHelper.sendResponseMaybeThrottle(request,
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1514,6 +1514,7 @@ class KafkaApis(val requestChannel: RequestChannel,
// If this is considered to leak information about the broker version a workaround is to use SSL
// with client authentication which is performed at an earlier stage of the connection where the
// ApiVersionRequest is not available.
System.err.println("kafkaApis handleApiVersionRequest node.id=" + config.nodeId)
def createResponseCallback(requestThrottleMs: Int): ApiVersionsResponse = {
val apiVersionRequest = request.body[ApiVersionsRequest]
if (apiVersionRequest.hasUnsupportedRequestVersion) {
Expand Down
18 changes: 13 additions & 5 deletions tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.tools;

import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.DescribeFeaturesOptions;
import org.apache.kafka.clients.admin.FeatureMetadata;
import org.apache.kafka.clients.admin.FeatureUpdate;
import org.apache.kafka.clients.admin.SupportedVersionRange;
Expand Down Expand Up @@ -104,7 +105,7 @@ static void execute(String... args) throws Exception {
try (Admin adminClient = Admin.create(properties)) {
switch (command) {
case "describe":
handleDescribe(adminClient);
handleDescribe(namespace, adminClient);
break;
case "upgrade":
handleUpgrade(namespace, adminClient);
Expand All @@ -128,8 +129,12 @@ static void execute(String... args) throws Exception {
}

private static void addDescribeParser(Subparsers subparsers) {
subparsers.addParser("describe")
Subparser describeParser = subparsers.addParser("describe")
.help("Describes the current active feature flags.");
describeParser.addArgument("--node-id")
.type(String.class)
.help("Send request to specific node.")
.action(store());
}

private static void addUpgradeParser(Subparsers subparsers) {
Expand Down Expand Up @@ -223,8 +228,11 @@ static String levelToString(String feature, short level) {
return String.valueOf(level);
}

static void handleDescribe(Admin adminClient) throws ExecutionException, InterruptedException {
FeatureMetadata featureMetadata = adminClient.describeFeatures().featureMetadata().get();
static void handleDescribe(Namespace namespace, Admin adminClient) throws ExecutionException, InterruptedException {

String target = namespace.getString("node_id");
DescribeFeaturesOptions options = target == null ? new DescribeFeaturesOptions() : new DescribeFeaturesOptions().nodeId(Integer.parseInt(target));
FeatureMetadata featureMetadata = adminClient.describeFeatures(options).featureMetadata().get();
featureMetadata.supportedFeatures().keySet().stream().sorted().forEach(feature -> {
short finalizedLevel = (featureMetadata.finalizedFeatures().get(feature) == null) ? 0 : featureMetadata.finalizedFeatures().get(feature).maxVersionLevel();
SupportedVersionRange range = featureMetadata.supportedFeatures().get(feature);
Expand Down Expand Up @@ -449,4 +457,4 @@ private static void update(String op, Admin admin, Map<String, FeatureUpdate> up
throw new TerseException(numFailures + " out of " + updates.size() + " operation(s) failed.");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,9 @@ private static MockAdminClient buildAdminClient() {
public void testHandleDescribe() {
String describeResult = ToolsTestUtils.captureStandardOut(() -> {
try {
FeatureCommand.handleDescribe(buildAdminClient());
Map<String, Object> namespace = new HashMap<>();
namespace.put("--node-id", "10");
FeatureCommand.handleDescribe(new Namespace(namespace), buildAdminClient());
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Loading