From 69803ca8f0fcdd4f64e91c38ba4e452199b7d430 Mon Sep 17 00:00:00 2001 From: aliehsaeedii Date: Thu, 28 May 2026 10:47:16 +0200 Subject: [PATCH 1/4] Add StreamsGroupTopologyDescriptionUpdate RPC schema and extend StreamsGroupDescribe/Heartbeat --- .../errors/GroupDeletionFailedException.java | 31 ++++++++ ...ologyDescriptionUpdateFailedException.java | 32 +++++++++ .../apache/kafka/common/protocol/ApiKeys.java | 3 +- .../apache/kafka/common/protocol/Errors.java | 6 +- .../common/requests/AbstractRequest.java | 2 + .../common/requests/AbstractResponse.java | 2 + ...GroupTopologyDescriptionUpdateRequest.java | 71 +++++++++++++++++++ ...roupTopologyDescriptionUpdateResponse.java | 59 +++++++++++++++ .../common/message/DeleteGroupsRequest.json | 6 +- .../common/message/DeleteGroupsResponse.json | 9 ++- .../message/StreamsGroupDescribeRequest.json | 5 +- .../message/StreamsGroupDescribeResponse.json | 41 ++++++++++- .../StreamsGroupHeartbeatResponse.json | 4 ++ ...GroupTopologyDescriptionUpdateRequest.json | 68 ++++++++++++++++++ ...roupTopologyDescriptionUpdateResponse.json | 31 ++++++++ .../common/requests/RequestResponseTest.java | 23 ++++++ .../unit/kafka/server/RequestQuotaTest.scala | 3 + .../kafka/network/RequestConvertToJson.java | 8 +++ 18 files changed, 397 insertions(+), 7 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/common/errors/GroupDeletionFailedException.java create mode 100644 clients/src/main/java/org/apache/kafka/common/errors/StreamsTopologyDescriptionUpdateFailedException.java create mode 100644 clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupTopologyDescriptionUpdateRequest.java create mode 100644 clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupTopologyDescriptionUpdateResponse.java create mode 100644 clients/src/main/resources/common/message/StreamsGroupTopologyDescriptionUpdateRequest.json create mode 100644 clients/src/main/resources/common/message/StreamsGroupTopologyDescriptionUpdateResponse.json diff --git a/clients/src/main/java/org/apache/kafka/common/errors/GroupDeletionFailedException.java b/clients/src/main/java/org/apache/kafka/common/errors/GroupDeletionFailedException.java new file mode 100644 index 0000000000000..eaa8316052618 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/GroupDeletionFailedException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +/** + * Indicates that {@code DeleteGroups} could not complete for the affected group. The + * accompanying error message describes the underlying cause; the caller may retry once + * the underlying condition is resolved. + */ +public class GroupDeletionFailedException extends ApiException { + + private static final long serialVersionUID = 1L; + + public GroupDeletionFailedException(String message) { + super(message); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/errors/StreamsTopologyDescriptionUpdateFailedException.java b/clients/src/main/java/org/apache/kafka/common/errors/StreamsTopologyDescriptionUpdateFailedException.java new file mode 100644 index 0000000000000..df981b8b5b936 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/StreamsTopologyDescriptionUpdateFailedException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +/** + * Indicates that the streams group topology description plugin failed to process + * a StreamsGroupTopologyDescriptionUpdate request. The accompanying error message + * describes the underlying cause; the broker tracks the transient-vs-permanent + * distinction internally and does not reflect it on the wire. + */ +public class StreamsTopologyDescriptionUpdateFailedException extends ApiException { + + private static final long serialVersionUID = 1L; + + public StreamsTopologyDescriptionUpdateFailedException(String message) { + super(message); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index 79b283b4f8d89..22dc37e67d146 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -137,7 +137,8 @@ public enum ApiKeys { STREAMS_GROUP_DESCRIBE(ApiMessageType.STREAMS_GROUP_DESCRIBE), DESCRIBE_SHARE_GROUP_OFFSETS(ApiMessageType.DESCRIBE_SHARE_GROUP_OFFSETS), ALTER_SHARE_GROUP_OFFSETS(ApiMessageType.ALTER_SHARE_GROUP_OFFSETS), - DELETE_SHARE_GROUP_OFFSETS(ApiMessageType.DELETE_SHARE_GROUP_OFFSETS); + DELETE_SHARE_GROUP_OFFSETS(ApiMessageType.DELETE_SHARE_GROUP_OFFSETS), + STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE(ApiMessageType.STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE); private static final Map> APIS_BY_LISTENER = new EnumMap<>(ApiMessageType.ListenerType.class); diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index a27a7fcf23c77..b1eb842181599 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -45,6 +45,7 @@ import org.apache.kafka.common.errors.FetchSessionIdNotFoundException; import org.apache.kafka.common.errors.FetchSessionTopicIdException; import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.errors.GroupDeletionFailedException; import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.GroupMaxSizeReachedException; import org.apache.kafka.common.errors.GroupNotEmptyException; @@ -122,6 +123,7 @@ import org.apache.kafka.common.errors.StaleMemberEpochException; import org.apache.kafka.common.errors.StreamsInvalidTopologyEpochException; import org.apache.kafka.common.errors.StreamsInvalidTopologyException; +import org.apache.kafka.common.errors.StreamsTopologyDescriptionUpdateFailedException; import org.apache.kafka.common.errors.StreamsTopologyFencedException; import org.apache.kafka.common.errors.TelemetryTooLargeException; import org.apache.kafka.common.errors.ThrottlingQuotaExceededException; @@ -418,7 +420,9 @@ public enum Errors { STREAMS_INVALID_TOPOLOGY(130, "The supplied topology is invalid.", StreamsInvalidTopologyException::new), STREAMS_INVALID_TOPOLOGY_EPOCH(131, "The supplied topology epoch is invalid.", StreamsInvalidTopologyEpochException::new), STREAMS_TOPOLOGY_FENCED(132, "The supplied topology epoch is outdated.", StreamsTopologyFencedException::new), - SHARE_SESSION_LIMIT_REACHED(133, "The limit of share sessions has been reached.", ShareSessionLimitReachedException::new); + SHARE_SESSION_LIMIT_REACHED(133, "The limit of share sessions has been reached.", ShareSessionLimitReachedException::new), + GROUP_DELETION_FAILED(134, "DeleteGroups could not complete; see the error message on the per-group result for details.", GroupDeletionFailedException::new), + STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED(135, "The topology description plugin failed to process the request.", StreamsTopologyDescriptionUpdateFailedException::new); private static final Logger log = LoggerFactory.getLogger(Errors.class); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java index 750de2050f432..8630c379ed6d3 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java @@ -354,6 +354,8 @@ private static AbstractRequest doParseRequest(ApiKeys apiKey, short apiVersion, return AlterShareGroupOffsetsRequest.parse(readable, apiVersion); case DELETE_SHARE_GROUP_OFFSETS: return DeleteShareGroupOffsetsRequest.parse(readable, apiVersion); + case STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE: + return StreamsGroupTopologyDescriptionUpdateRequest.parse(readable, apiVersion); default: throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " + "code should be updated to do so.", apiKey)); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java index 7d96d1a17317c..961559d9e8fc1 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java @@ -290,6 +290,8 @@ public static AbstractResponse parseResponse(ApiKeys apiKey, Readable readable, return AlterShareGroupOffsetsResponse.parse(readable, version); case DELETE_SHARE_GROUP_OFFSETS: return DeleteShareGroupOffsetsResponse.parse(readable, version); + case STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE: + return StreamsGroupTopologyDescriptionUpdateResponse.parse(readable, version); default: throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " + "code should be updated to do so.", apiKey)); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupTopologyDescriptionUpdateRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupTopologyDescriptionUpdateRequest.java new file mode 100644 index 0000000000000..954463c8b6f95 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupTopologyDescriptionUpdateRequest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateRequestData; +import org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.Readable; + +public class StreamsGroupTopologyDescriptionUpdateRequest extends AbstractRequest { + + public static class Builder extends AbstractRequest.Builder { + private final StreamsGroupTopologyDescriptionUpdateRequestData data; + + public Builder(StreamsGroupTopologyDescriptionUpdateRequestData data) { + super(ApiKeys.STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE); + this.data = data; + } + + @Override + public StreamsGroupTopologyDescriptionUpdateRequest build(short version) { + return new StreamsGroupTopologyDescriptionUpdateRequest(data, version); + } + + @Override + public String toString() { + return data.toString(); + } + } + + private final StreamsGroupTopologyDescriptionUpdateRequestData data; + + public StreamsGroupTopologyDescriptionUpdateRequest(StreamsGroupTopologyDescriptionUpdateRequestData data, short version) { + super(ApiKeys.STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE, version); + this.data = data; + } + + @Override + public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { + return new StreamsGroupTopologyDescriptionUpdateResponse( + new StreamsGroupTopologyDescriptionUpdateResponseData() + .setThrottleTimeMs(throttleTimeMs) + .setErrorCode(Errors.forException(e).code()) + ); + } + + @Override + public StreamsGroupTopologyDescriptionUpdateRequestData data() { + return data; + } + + public static StreamsGroupTopologyDescriptionUpdateRequest parse(Readable readable, short version) { + return new StreamsGroupTopologyDescriptionUpdateRequest( + new StreamsGroupTopologyDescriptionUpdateRequestData(readable, version), version); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupTopologyDescriptionUpdateResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupTopologyDescriptionUpdateResponse.java new file mode 100644 index 0000000000000..3ca0ac3ebfc50 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupTopologyDescriptionUpdateResponse.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.Readable; + +import java.util.Map; + +public class StreamsGroupTopologyDescriptionUpdateResponse extends AbstractResponse { + + private final StreamsGroupTopologyDescriptionUpdateResponseData data; + + public StreamsGroupTopologyDescriptionUpdateResponse(StreamsGroupTopologyDescriptionUpdateResponseData data) { + super(ApiKeys.STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE); + this.data = data; + } + + @Override + public StreamsGroupTopologyDescriptionUpdateResponseData data() { + return data; + } + + @Override + public Map errorCounts() { + return errorCounts(Errors.forCode(data.errorCode())); + } + + @Override + public int throttleTimeMs() { + return data.throttleTimeMs(); + } + + @Override + public void maybeSetThrottleTimeMs(int throttleTimeMs) { + data.setThrottleTimeMs(throttleTimeMs); + } + + public static StreamsGroupTopologyDescriptionUpdateResponse parse(Readable readable, short version) { + return new StreamsGroupTopologyDescriptionUpdateResponse( + new StreamsGroupTopologyDescriptionUpdateResponseData(readable, version)); + } +} diff --git a/clients/src/main/resources/common/message/DeleteGroupsRequest.json b/clients/src/main/resources/common/message/DeleteGroupsRequest.json index 7d7c437178997..5c55a9939b76c 100644 --- a/clients/src/main/resources/common/message/DeleteGroupsRequest.json +++ b/clients/src/main/resources/common/message/DeleteGroupsRequest.json @@ -21,7 +21,11 @@ // Version 1 is the same as version 0. // // Version 2 is the first flexible version. - "validVersions": "0-2", + // + // Version 3 corresponds to the addition of an ErrorMessage field on each per-group + // result in the response (populated when the broker needs to surface a cause to the + // caller). The request body shape is unchanged at version 3. + "validVersions": "0-3", "flexibleVersions": "2+", "fields": [ { "name": "GroupsNames", "type": "[]string", "versions": "0+", "entityType": "groupId", diff --git a/clients/src/main/resources/common/message/DeleteGroupsResponse.json b/clients/src/main/resources/common/message/DeleteGroupsResponse.json index 168cde03ba341..4f5ad712a63ad 100644 --- a/clients/src/main/resources/common/message/DeleteGroupsResponse.json +++ b/clients/src/main/resources/common/message/DeleteGroupsResponse.json @@ -20,7 +20,10 @@ // Starting in version 1, on quota violation, brokers send out responses before throttling. // // Version 2 is the first flexible version. - "validVersions": "0-2", + // + // Version 3 adds the per-group ErrorMessage field so the broker can surface a cause + // string when GROUP_DELETION_FAILED (or any other non-NONE error) is returned for a group. + "validVersions": "0-3", "flexibleVersions": "2+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", @@ -30,7 +33,9 @@ { "name": "GroupId", "type": "string", "versions": "0+", "mapKey": true, "entityType": "groupId", "about": "The group id." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", - "about": "The deletion error, or 0 if the deletion succeeded." } + "about": "The deletion error, or 0 if the deletion succeeded." }, + { "name": "ErrorMessage", "type": "string", "versions": "3+", "nullableVersions": "3+", "ignorable": true, "default": "null", + "about": "The error message, or null if there was no error." } ]} ] } diff --git a/clients/src/main/resources/common/message/StreamsGroupDescribeRequest.json b/clients/src/main/resources/common/message/StreamsGroupDescribeRequest.json index 6e36479043aa0..b84fd1464fcf4 100644 --- a/clients/src/main/resources/common/message/StreamsGroupDescribeRequest.json +++ b/clients/src/main/resources/common/message/StreamsGroupDescribeRequest.json @@ -24,6 +24,9 @@ { "name": "GroupIds", "type": "[]string", "versions": "0+", "entityType": "groupId", "about": "The ids of the groups to describe" }, { "name": "IncludeAuthorizedOperations", "type": "bool", "versions": "0+", - "about": "Whether to include authorized operations." } + "about": "Whether to include authorized operations." }, + { "name": "IncludeTopologyDescription", "type": "bool", "versions": "0+", + "tag": 0, "taggedVersions": "0+", "default": "false", + "about": "Whether to include the full topology description from the topology description plugin in the response." } ] } \ No newline at end of file diff --git a/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json b/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json index b99f9c00b0840..9cb8f60443173 100644 --- a/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json +++ b/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json @@ -106,7 +106,13 @@ "about": "True for classic members that have not been upgraded yet." } ]}, { "name": "AuthorizedOperations", "type": "int32", "versions": "0+", "default": "-2147483648", - "about": "32-bit bitfield to represent authorized operations for this group." } + "about": "32-bit bitfield to represent authorized operations for this group." }, + { "name": "TopologyDescription", "type": "TopologyDescription", "versions": "0+", + "nullableVersions": "0+", "tag": 0, "taggedVersions": "0+", "default": "null", + "about": "Full topology description retrieved from the topology description plugin. Null if the client did not request it, no plugin is configured, no description is stored for the group, or retrieval failed." }, + { "name": "TopologyDescriptionStatus", "type": "int8", "versions": "0+", + "tag": 1, "taggedVersions": "0+", "default": "0", + "about": "The status of the topology description for this group: 0=NOT_REQUESTED (client did not set IncludeTopologyDescription), 1=NOT_STORED (no topology description has been recorded for this group), 2=ERROR (the broker failed to fetch the topology description; check broker logs), 3=AVAILABLE (a topology description is present in the TopologyDescription field). The broker MUST set this field to AVAILABLE whenever it attaches a TopologyDescription." } ] } ], @@ -155,6 +161,39 @@ { "name": "TopicConfigs", "type": "[]KeyValue", "versions": "0+", "about": "Topic-level configurations as key-value pairs." } + ]}, + { "name": "TopologyDescription", "versions": "0+", "fields": [ + { "name": "Subtopologies", "type": "[]TopologyDescriptionSubtopology", "versions": "0+", + "about": "The subtopologies that make up this topology." }, + { "name": "GlobalStores", "type": "[]TopologyDescriptionGlobalStore", "versions": "0+", + "about": "Global state stores used by this topology." } + ]}, + { "name": "TopologyDescriptionSubtopology", "versions": "0+", "fields": [ + { "name": "SubtopologyId", "type": "string", "versions": "0+", + "about": "The subtopology identifier, unique within the topology." }, + { "name": "Nodes", "type": "[]TopologyDescriptionNode", "versions": "0+", + "about": "The processing nodes in this subtopology." } + ]}, + { "name": "TopologyDescriptionNode", "versions": "0+", "fields": [ + { "name": "Name", "type": "string", "versions": "0+", + "about": "The name of this node (e.g., KSTREAM-SOURCE-0000000000)." }, + { "name": "NodeType", "type": "int8", "versions": "0+", + "about": "The type of this node: 1=SOURCE, 2=PROCESSOR, 3=SINK." }, + { "name": "SourceTopics", "type": "[]string", "versions": "0+", "entityType": "topicName", + "about": "The source topics this node reads from. Defined only for source nodes, may be empty if source topics are dynamically determined." }, + { "name": "SinkTopic", "type": "string", "versions": "0+", "entityType": "topicName", + "nullableVersions": "0+", "default": "null", + "about": "The topic this node writes to. Defined only for sink nodes, may be null if sink topic is dynamically determined." }, + { "name": "Stores", "type": "[]string", "versions": "0+", + "about": "The state store names accessed by this node. Defined only for processor nodes." }, + { "name": "Successors", "type": "[]string", "versions": "0+", + "about": "The names of successor nodes in the processing graph. Predecessor relationships are reconstructed from this field." } + ]}, + { "name": "TopologyDescriptionGlobalStore", "versions": "0+", "fields": [ + { "name": "Source", "type": "TopologyDescriptionNode", "versions": "0+", + "about": "The source node providing data to the global store." }, + { "name": "Processor", "type": "TopologyDescriptionNode", "versions": "0+", + "about": "The processor node that populates the global store." } ]} ] } \ No newline at end of file diff --git a/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json b/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json index 27cf47bb1a4c7..6980fe7cc5d7a 100644 --- a/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json +++ b/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json @@ -64,6 +64,10 @@ { "name": "WarmupTasks", "type": "[]TaskIds", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "Assigned warm-up tasks for this client. Null if unchanged since last heartbeat." }, + { "name": "TopologyDescriptionRequired", "type": "bool", "versions": "0+", + "tag": 0, "taggedVersions": "0+", "default": "false", + "about": "True if the broker's topology description plugin does not have an up-to-date topology description for this group. The client should send the topology description via StreamsGroupTopologyDescriptionUpdate." }, + // IQ-related information { "name": "EndpointInformationEpoch", "type": "int32", "versions": "0+", "about": "The endpoint epoch set in the response"}, diff --git a/clients/src/main/resources/common/message/StreamsGroupTopologyDescriptionUpdateRequest.json b/clients/src/main/resources/common/message/StreamsGroupTopologyDescriptionUpdateRequest.json new file mode 100644 index 0000000000000..a032c3c1d82c8 --- /dev/null +++ b/clients/src/main/resources/common/message/StreamsGroupTopologyDescriptionUpdateRequest.json @@ -0,0 +1,68 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 93, + "type": "request", + "listeners": ["broker"], + "name": "StreamsGroupTopologyDescriptionUpdateRequest", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", + "about": "The streams group identifier." }, + { "name": "MemberId", "type": "string", "versions": "0+", + "about": "The ID of the streams group member sending the push. The broker validates that this member is still in the group; mismatches (including when the group itself has been deleted) are rejected with UNKNOWN_MEMBER_ID so the client treats itself as fenced and rejoins." }, + { "name": "TopologyEpoch", "type": "int32", "versions": "0+", + "about": "The epoch of the topology being described." }, + { "name": "TopologyDescription", "type": "TopologyDescription", "versions": "0+", + "about": "The topology description." } + ], + "commonStructs": [ + { "name": "TopologyDescription", "versions": "0+", "fields": [ + { "name": "Subtopologies", "type": "[]Subtopology", "versions": "0+", + "about": "The subtopologies that make up this topology." }, + { "name": "GlobalStores", "type": "[]GlobalStore", "versions": "0+", + "about": "Global state stores used by this topology." } + ]}, + { "name": "Subtopology", "versions": "0+", "fields": [ + { "name": "SubtopologyId", "type": "string", "versions": "0+", + "about": "The subtopology identifier, unique within the topology." }, + { "name": "Nodes", "type": "[]TopologyNode", "versions": "0+", + "about": "The processing nodes in this subtopology." } + ]}, + { "name": "TopologyNode", "versions": "0+", "fields": [ + { "name": "Name", "type": "string", "versions": "0+", + "about": "The name of this node (e.g., KSTREAM-SOURCE-0000000000)." }, + { "name": "NodeType", "type": "int8", "versions": "0+", + "about": "The type of this node: 1=SOURCE, 2=PROCESSOR, 3=SINK." }, + { "name": "SourceTopics", "type": "[]string", "versions": "0+", "entityType": "topicName", + "about": "The source topics this node reads from. Defined only for source nodes, may be empty if source topics are dynamically determined." }, + { "name": "SinkTopic", "type": "string", "versions": "0+", "entityType": "topicName", + "nullableVersions": "0+", "default": "null", + "about": "The topic this node writes to. Defined only for sink nodes, may be null if sink topic is dynamically determined." }, + { "name": "Stores", "type": "[]string", "versions": "0+", + "about": "The state store names accessed by this node. Defined only for processor nodes." }, + { "name": "Successors", "type": "[]string", "versions": "0+", + "about": "The names of successor nodes in the processing graph. Predecessor relationships are reconstructed from this field." } + ]}, + { "name": "GlobalStore", "versions": "0+", "fields": [ + { "name": "Source", "type": "TopologyNode", "versions": "0+", + "about": "The source node providing data to the global store." }, + { "name": "Processor", "type": "TopologyNode", "versions": "0+", + "about": "The processor node that populates the global store." } + ]} + ] +} diff --git a/clients/src/main/resources/common/message/StreamsGroupTopologyDescriptionUpdateResponse.json b/clients/src/main/resources/common/message/StreamsGroupTopologyDescriptionUpdateResponse.json new file mode 100644 index 0000000000000..9edd99f1e0b3a --- /dev/null +++ b/clients/src/main/resources/common/message/StreamsGroupTopologyDescriptionUpdateResponse.json @@ -0,0 +1,31 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 93, + "type": "response", + "name": "StreamsGroupTopologyDescriptionUpdateResponse", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", + "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The top-level error code, or 0 if there was no error." }, + { "name": "ErrorMessage", "type": "string", "versions": "0+", + "nullableVersions": "0+", "default": "null", + "about": "The top-level error message, or null if there was no error." } + ] +} diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 29bc1213c63a4..f61abacc17a4e 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -234,6 +234,8 @@ import org.apache.kafka.common.message.StreamsGroupDescribeResponseData; import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData; import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData; +import org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateRequestData; +import org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateResponseData; import org.apache.kafka.common.message.SyncGroupRequestData; import org.apache.kafka.common.message.SyncGroupRequestData.SyncGroupRequestAssignment; import org.apache.kafka.common.message.SyncGroupResponseData; @@ -1076,6 +1078,7 @@ private AbstractRequest getRequest(ApiKeys apikey, short version) { case DESCRIBE_SHARE_GROUP_OFFSETS: return createDescribeShareGroupOffsetsRequest(version); case ALTER_SHARE_GROUP_OFFSETS: return createAlterShareGroupOffsetsRequest(version); case DELETE_SHARE_GROUP_OFFSETS: return createDeleteShareGroupOffsetsRequest(version); + case STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE: return createStreamsGroupTopologyDescriptionUpdateRequest(version); default: throw new IllegalArgumentException("Unknown API key " + apikey); } } @@ -1171,6 +1174,7 @@ private AbstractResponse getResponse(ApiKeys apikey, short version) { case DESCRIBE_SHARE_GROUP_OFFSETS: return createDescribeShareGroupOffsetsResponse(); case ALTER_SHARE_GROUP_OFFSETS: return createAlterShareGroupOffsetsResponse(); case DELETE_SHARE_GROUP_OFFSETS: return createDeleteShareGroupOffsetsResponse(); + case STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE: return createStreamsGroupTopologyDescriptionUpdateResponse(); default: throw new IllegalArgumentException("Unknown API key " + apikey); } } @@ -2286,6 +2290,10 @@ private DeleteGroupsResponse createDeleteGroupsResponse() { result.add(new DeletableGroupResult() .setGroupId("test-group") .setErrorCode(Errors.NONE.code())); + result.add(new DeletableGroupResult() + .setGroupId("failed-group") + .setErrorCode(Errors.GROUP_DELETION_FAILED.code()) + .setErrorMessage("plugin offline")); return new DeleteGroupsResponse( new DeleteGroupsResponseData() .setResults(result) @@ -3887,6 +3895,21 @@ private AbstractResponse createStreamsGroupHeartbeatResponse() { return new StreamsGroupHeartbeatResponse(new StreamsGroupHeartbeatResponseData()); } + private AbstractRequest createStreamsGroupTopologyDescriptionUpdateRequest(final short version) { + return new StreamsGroupTopologyDescriptionUpdateRequest.Builder( + new StreamsGroupTopologyDescriptionUpdateRequestData() + .setGroupId("test-group") + .setTopologyEpoch(1) + .setTopologyDescription(new StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescription()) + ).build(version); + } + + private AbstractResponse createStreamsGroupTopologyDescriptionUpdateResponse() { + return new StreamsGroupTopologyDescriptionUpdateResponse( + new StreamsGroupTopologyDescriptionUpdateResponseData() + ); + } + @Test public void testInvalidSaslHandShakeRequest() { AbstractRequest request = new SaslHandshakeRequest.Builder( diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index 639328798a0e5..02c523f0d3f4b 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -758,6 +758,9 @@ class RequestQuotaTest extends BaseRequestTest { case ApiKeys.STREAMS_GROUP_DESCRIBE => new StreamsGroupDescribeRequest.Builder(new StreamsGroupDescribeRequestData()) + case ApiKeys.STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE => + new StreamsGroupTopologyDescriptionUpdateRequest.Builder(new StreamsGroupTopologyDescriptionUpdateRequestData()) + case ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS => new DescribeShareGroupOffsetsRequest.Builder(new DescribeShareGroupOffsetsRequestData()) diff --git a/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java b/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java index 09602ea498d5e..023ef02a5f693 100644 --- a/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java +++ b/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java @@ -179,6 +179,8 @@ import org.apache.kafka.common.message.StreamsGroupDescribeResponseDataJsonConverter; import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestDataJsonConverter; import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseDataJsonConverter; +import org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateRequestDataJsonConverter; +import org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateResponseDataJsonConverter; import org.apache.kafka.common.message.SyncGroupRequestDataJsonConverter; import org.apache.kafka.common.message.SyncGroupResponseDataJsonConverter; import org.apache.kafka.common.message.TxnOffsetCommitRequestDataJsonConverter; @@ -362,6 +364,8 @@ import org.apache.kafka.common.requests.StreamsGroupDescribeResponse; import org.apache.kafka.common.requests.StreamsGroupHeartbeatRequest; import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse; +import org.apache.kafka.common.requests.StreamsGroupTopologyDescriptionUpdateRequest; +import org.apache.kafka.common.requests.StreamsGroupTopologyDescriptionUpdateResponse; import org.apache.kafka.common.requests.SyncGroupRequest; import org.apache.kafka.common.requests.SyncGroupResponse; import org.apache.kafka.common.requests.TxnOffsetCommitRequest; @@ -564,6 +568,8 @@ public static JsonNode request(AbstractRequest request) { UpdateFeaturesRequestDataJsonConverter.write(((UpdateFeaturesRequest) request).data(), request.version()); case UPDATE_RAFT_VOTER -> UpdateRaftVoterRequestDataJsonConverter.write(((UpdateRaftVoterRequest) request).data(), request.version()); + case STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE -> + StreamsGroupTopologyDescriptionUpdateRequestDataJsonConverter.write(((StreamsGroupTopologyDescriptionUpdateRequest) request).data(), request.version()); case VOTE -> VoteRequestDataJsonConverter.write(((VoteRequest) request).data(), request.version()); case WRITE_SHARE_GROUP_STATE -> WriteShareGroupStateRequestDataJsonConverter.write(((WriteShareGroupStateRequest) request).data(), request.version()); @@ -741,6 +747,8 @@ public static JsonNode response(AbstractResponse response, short version) { UpdateFeaturesResponseDataJsonConverter.write(((UpdateFeaturesResponse) response).data(), version); case UPDATE_RAFT_VOTER -> UpdateRaftVoterResponseDataJsonConverter.write(((UpdateRaftVoterResponse) response).data(), version); + case STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE -> + StreamsGroupTopologyDescriptionUpdateResponseDataJsonConverter.write(((StreamsGroupTopologyDescriptionUpdateResponse) response).data(), version); case VOTE -> VoteResponseDataJsonConverter.write(((VoteResponse) response).data(), version); case WRITE_SHARE_GROUP_STATE -> WriteShareGroupStateResponseDataJsonConverter.write(((WriteShareGroupStateResponse) response).data(), version); From ece569d071abe8f316d28fec68e3541d99366eea Mon Sep 17 00:00:00 2001 From: aliehsaeedii Date: Thu, 28 May 2026 11:10:01 +0200 Subject: [PATCH 2/4] KAFKA-20620: Address review findings on topology description schema Align nested struct names in StreamsGroupTopologyDescriptionUpdateRequest with StreamsGroupDescribeResponse (TopologyDescriptionSubtopology / TopologyDescriptionNode / TopologyDescriptionGlobalStore) so the broker can pass the same generated type through both RPCs without a converter. Keeping the prefixed form rather than the simple Subtopology name avoids colliding with the existing v0 inline Subtopology struct used by the already-shipped Topology field. Also populate ErrorMessage in getErrorResponse so the response field defined by the schema is set on error, and exercise the nested topology struct in the RequestResponseTest round-trip with a realistic source/processor/sink subtopology plus a global store. Co-Authored-By: Claude Opus 4.7 (1M context) --- ...GroupTopologyDescriptionUpdateRequest.java | 1 + ...GroupTopologyDescriptionUpdateRequest.json | 16 +++---- .../common/requests/RequestResponseTest.java | 42 ++++++++++++++++++- 3 files changed, 50 insertions(+), 9 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupTopologyDescriptionUpdateRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupTopologyDescriptionUpdateRequest.java index 954463c8b6f95..c9960eae3413c 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupTopologyDescriptionUpdateRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupTopologyDescriptionUpdateRequest.java @@ -56,6 +56,7 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { new StreamsGroupTopologyDescriptionUpdateResponseData() .setThrottleTimeMs(throttleTimeMs) .setErrorCode(Errors.forException(e).code()) + .setErrorMessage(Errors.forException(e).message()) ); } diff --git a/clients/src/main/resources/common/message/StreamsGroupTopologyDescriptionUpdateRequest.json b/clients/src/main/resources/common/message/StreamsGroupTopologyDescriptionUpdateRequest.json index a032c3c1d82c8..56570e25f272a 100644 --- a/clients/src/main/resources/common/message/StreamsGroupTopologyDescriptionUpdateRequest.json +++ b/clients/src/main/resources/common/message/StreamsGroupTopologyDescriptionUpdateRequest.json @@ -32,18 +32,18 @@ ], "commonStructs": [ { "name": "TopologyDescription", "versions": "0+", "fields": [ - { "name": "Subtopologies", "type": "[]Subtopology", "versions": "0+", + { "name": "Subtopologies", "type": "[]TopologyDescriptionSubtopology", "versions": "0+", "about": "The subtopologies that make up this topology." }, - { "name": "GlobalStores", "type": "[]GlobalStore", "versions": "0+", + { "name": "GlobalStores", "type": "[]TopologyDescriptionGlobalStore", "versions": "0+", "about": "Global state stores used by this topology." } ]}, - { "name": "Subtopology", "versions": "0+", "fields": [ + { "name": "TopologyDescriptionSubtopology", "versions": "0+", "fields": [ { "name": "SubtopologyId", "type": "string", "versions": "0+", "about": "The subtopology identifier, unique within the topology." }, - { "name": "Nodes", "type": "[]TopologyNode", "versions": "0+", + { "name": "Nodes", "type": "[]TopologyDescriptionNode", "versions": "0+", "about": "The processing nodes in this subtopology." } ]}, - { "name": "TopologyNode", "versions": "0+", "fields": [ + { "name": "TopologyDescriptionNode", "versions": "0+", "fields": [ { "name": "Name", "type": "string", "versions": "0+", "about": "The name of this node (e.g., KSTREAM-SOURCE-0000000000)." }, { "name": "NodeType", "type": "int8", "versions": "0+", @@ -58,10 +58,10 @@ { "name": "Successors", "type": "[]string", "versions": "0+", "about": "The names of successor nodes in the processing graph. Predecessor relationships are reconstructed from this field." } ]}, - { "name": "GlobalStore", "versions": "0+", "fields": [ - { "name": "Source", "type": "TopologyNode", "versions": "0+", + { "name": "TopologyDescriptionGlobalStore", "versions": "0+", "fields": [ + { "name": "Source", "type": "TopologyDescriptionNode", "versions": "0+", "about": "The source node providing data to the global store." }, - { "name": "Processor", "type": "TopologyNode", "versions": "0+", + { "name": "Processor", "type": "TopologyDescriptionNode", "versions": "0+", "about": "The processor node that populates the global store." } ]} ] diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index f61abacc17a4e..4daa944fc61e2 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -3896,11 +3896,51 @@ private AbstractResponse createStreamsGroupHeartbeatResponse() { } private AbstractRequest createStreamsGroupTopologyDescriptionUpdateRequest(final short version) { + StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionNode sourceNode = + new StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionNode() + .setName("KSTREAM-SOURCE-0000000000") + .setNodeType((byte) 1) + .setSourceTopics(List.of("input-topic")) + .setSuccessors(List.of("KSTREAM-PROCESSOR-0000000001")); + StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionNode processorNode = + new StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionNode() + .setName("KSTREAM-PROCESSOR-0000000001") + .setNodeType((byte) 2) + .setStores(List.of("store-1")) + .setSuccessors(List.of("KSTREAM-SINK-0000000002")); + StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionNode sinkNode = + new StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionNode() + .setName("KSTREAM-SINK-0000000002") + .setNodeType((byte) 3) + .setSinkTopic("output-topic"); + StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionSubtopology subtopology = + new StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionSubtopology() + .setSubtopologyId("0") + .setNodes(List.of(sourceNode, processorNode, sinkNode)); + StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionNode globalSource = + new StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionNode() + .setName("KSTREAM-GLOBAL-SOURCE-0000000003") + .setNodeType((byte) 1) + .setSourceTopics(List.of("global-topic")); + StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionNode globalProcessor = + new StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionNode() + .setName("KTABLE-SOURCE-0000000004") + .setNodeType((byte) 2) + .setStores(List.of("global-store")); + StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionGlobalStore globalStore = + new StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionGlobalStore() + .setSource(globalSource) + .setProcessor(globalProcessor); + StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescription topology = + new StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescription() + .setSubtopologies(List.of(subtopology)) + .setGlobalStores(List.of(globalStore)); return new StreamsGroupTopologyDescriptionUpdateRequest.Builder( new StreamsGroupTopologyDescriptionUpdateRequestData() .setGroupId("test-group") + .setMemberId("test-member") .setTopologyEpoch(1) - .setTopologyDescription(new StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescription()) + .setTopologyDescription(topology) ).build(version); } From 29875afbe9cff4b4e9f2527b8406f1439d02b4a7 Mon Sep 17 00:00:00 2001 From: aliehsaeedii Date: Thu, 28 May 2026 14:36:56 +0200 Subject: [PATCH 3/4] address reviews --- .../message/StreamsGroupDescribeRequest.json | 6 +-- .../message/StreamsGroupDescribeResponse.json | 44 ++++++++-------- .../StreamsGroupHeartbeatResponse.json | 6 +-- .../common/requests/RequestResponseTest.java | 50 ++++++++++++------- 4 files changed, 60 insertions(+), 46 deletions(-) diff --git a/clients/src/main/resources/common/message/StreamsGroupDescribeRequest.json b/clients/src/main/resources/common/message/StreamsGroupDescribeRequest.json index b84fd1464fcf4..b2ac1a9be79b0 100644 --- a/clients/src/main/resources/common/message/StreamsGroupDescribeRequest.json +++ b/clients/src/main/resources/common/message/StreamsGroupDescribeRequest.json @@ -18,15 +18,15 @@ "type": "request", "listeners": ["broker"], "name": "StreamsGroupDescribeRequest", - "validVersions": "0", + // Version 1 adds IncludeTopologyDescription (KIP-1331). + "validVersions": "0-1", "flexibleVersions": "0+", "fields": [ { "name": "GroupIds", "type": "[]string", "versions": "0+", "entityType": "groupId", "about": "The ids of the groups to describe" }, { "name": "IncludeAuthorizedOperations", "type": "bool", "versions": "0+", "about": "Whether to include authorized operations." }, - { "name": "IncludeTopologyDescription", "type": "bool", "versions": "0+", - "tag": 0, "taggedVersions": "0+", "default": "false", + { "name": "IncludeTopologyDescription", "type": "bool", "versions": "1+", "default": "false", "about": "Whether to include the full topology description from the topology description plugin in the response." } ] } \ No newline at end of file diff --git a/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json b/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json index 9cb8f60443173..53db33c2034f6 100644 --- a/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json +++ b/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json @@ -17,7 +17,8 @@ "apiKey": 89, "type": "response", "name": "StreamsGroupDescribeResponse", - "validVersions": "0", + // Version 1 adds TopologyDescription and TopologyDescriptionStatus (KIP-1331). + "validVersions": "0-1", "flexibleVersions": "0+", // Supported errors: // - GROUP_AUTHORIZATION_FAILED (version 0+) @@ -107,11 +108,10 @@ ]}, { "name": "AuthorizedOperations", "type": "int32", "versions": "0+", "default": "-2147483648", "about": "32-bit bitfield to represent authorized operations for this group." }, - { "name": "TopologyDescription", "type": "TopologyDescription", "versions": "0+", - "nullableVersions": "0+", "tag": 0, "taggedVersions": "0+", "default": "null", + { "name": "TopologyDescription", "type": "TopologyDescription", "versions": "1+", + "nullableVersions": "1+", "default": "null", "about": "Full topology description retrieved from the topology description plugin. Null if the client did not request it, no plugin is configured, no description is stored for the group, or retrieval failed." }, - { "name": "TopologyDescriptionStatus", "type": "int8", "versions": "0+", - "tag": 1, "taggedVersions": "0+", "default": "0", + { "name": "TopologyDescriptionStatus", "type": "int8", "versions": "1+", "default": "0", "about": "The status of the topology description for this group: 0=NOT_REQUESTED (client did not set IncludeTopologyDescription), 1=NOT_STORED (no topology description has been recorded for this group), 2=ERROR (the broker failed to fetch the topology description; check broker logs), 3=AVAILABLE (a topology description is present in the TopologyDescription field). The broker MUST set this field to AVAILABLE whenever it attaches a TopologyDescription." } ] } @@ -162,37 +162,37 @@ "about": "Topic-level configurations as key-value pairs." } ]}, - { "name": "TopologyDescription", "versions": "0+", "fields": [ - { "name": "Subtopologies", "type": "[]TopologyDescriptionSubtopology", "versions": "0+", + { "name": "TopologyDescription", "versions": "1+", "fields": [ + { "name": "Subtopologies", "type": "[]TopologyDescriptionSubtopology", "versions": "1+", "about": "The subtopologies that make up this topology." }, - { "name": "GlobalStores", "type": "[]TopologyDescriptionGlobalStore", "versions": "0+", + { "name": "GlobalStores", "type": "[]TopologyDescriptionGlobalStore", "versions": "1+", "about": "Global state stores used by this topology." } ]}, - { "name": "TopologyDescriptionSubtopology", "versions": "0+", "fields": [ - { "name": "SubtopologyId", "type": "string", "versions": "0+", + { "name": "TopologyDescriptionSubtopology", "versions": "1+", "fields": [ + { "name": "SubtopologyId", "type": "string", "versions": "1+", "about": "The subtopology identifier, unique within the topology." }, - { "name": "Nodes", "type": "[]TopologyDescriptionNode", "versions": "0+", + { "name": "Nodes", "type": "[]TopologyDescriptionNode", "versions": "1+", "about": "The processing nodes in this subtopology." } ]}, - { "name": "TopologyDescriptionNode", "versions": "0+", "fields": [ - { "name": "Name", "type": "string", "versions": "0+", + { "name": "TopologyDescriptionNode", "versions": "1+", "fields": [ + { "name": "Name", "type": "string", "versions": "1+", "about": "The name of this node (e.g., KSTREAM-SOURCE-0000000000)." }, - { "name": "NodeType", "type": "int8", "versions": "0+", + { "name": "NodeType", "type": "int8", "versions": "1+", "about": "The type of this node: 1=SOURCE, 2=PROCESSOR, 3=SINK." }, - { "name": "SourceTopics", "type": "[]string", "versions": "0+", "entityType": "topicName", + { "name": "SourceTopics", "type": "[]string", "versions": "1+", "entityType": "topicName", "about": "The source topics this node reads from. Defined only for source nodes, may be empty if source topics are dynamically determined." }, - { "name": "SinkTopic", "type": "string", "versions": "0+", "entityType": "topicName", - "nullableVersions": "0+", "default": "null", + { "name": "SinkTopic", "type": "string", "versions": "1+", "entityType": "topicName", + "nullableVersions": "1+", "default": "null", "about": "The topic this node writes to. Defined only for sink nodes, may be null if sink topic is dynamically determined." }, - { "name": "Stores", "type": "[]string", "versions": "0+", + { "name": "Stores", "type": "[]string", "versions": "1+", "about": "The state store names accessed by this node. Defined only for processor nodes." }, - { "name": "Successors", "type": "[]string", "versions": "0+", + { "name": "Successors", "type": "[]string", "versions": "1+", "about": "The names of successor nodes in the processing graph. Predecessor relationships are reconstructed from this field." } ]}, - { "name": "TopologyDescriptionGlobalStore", "versions": "0+", "fields": [ - { "name": "Source", "type": "TopologyDescriptionNode", "versions": "0+", + { "name": "TopologyDescriptionGlobalStore", "versions": "1+", "fields": [ + { "name": "Source", "type": "TopologyDescriptionNode", "versions": "1+", "about": "The source node providing data to the global store." }, - { "name": "Processor", "type": "TopologyDescriptionNode", "versions": "0+", + { "name": "Processor", "type": "TopologyDescriptionNode", "versions": "1+", "about": "The processor node that populates the global store." } ]} ] diff --git a/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json b/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json index 6980fe7cc5d7a..cd3f6091401e2 100644 --- a/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json +++ b/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json @@ -17,7 +17,8 @@ "apiKey": 88, "type": "response", "name": "StreamsGroupHeartbeatResponse", - "validVersions": "0", + // Version 1 adds TopologyDescriptionRequired (KIP-1331). + "validVersions": "0-1", "flexibleVersions": "0+", // Supported errors: // - GROUP_AUTHORIZATION_FAILED (version 0+) @@ -64,8 +65,7 @@ { "name": "WarmupTasks", "type": "[]TaskIds", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "Assigned warm-up tasks for this client. Null if unchanged since last heartbeat." }, - { "name": "TopologyDescriptionRequired", "type": "bool", "versions": "0+", - "tag": 0, "taggedVersions": "0+", "default": "false", + { "name": "TopologyDescriptionRequired", "type": "bool", "versions": "1+", "default": "false", "about": "True if the broker's topology description plugin does not have an up-to-date topology description for this group. The client should send the topology description via StreamsGroupTopologyDescriptionUpdate." }, // IQ-related information diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 4daa944fc61e2..812e0cb2f4750 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -1169,8 +1169,8 @@ private AbstractResponse getResponse(ApiKeys apikey, short version) { case WRITE_SHARE_GROUP_STATE: return createWriteShareGroupStateResponse(); case DELETE_SHARE_GROUP_STATE: return createDeleteShareGroupStateResponse(); case READ_SHARE_GROUP_STATE_SUMMARY: return createReadShareGroupStateSummaryResponse(); - case STREAMS_GROUP_HEARTBEAT: return createStreamsGroupHeartbeatResponse(); - case STREAMS_GROUP_DESCRIBE: return createStreamsGroupDescribeResponse(); + case STREAMS_GROUP_HEARTBEAT: return createStreamsGroupHeartbeatResponse(version); + case STREAMS_GROUP_DESCRIBE: return createStreamsGroupDescribeResponse(version); case DESCRIBE_SHARE_GROUP_OFFSETS: return createDescribeShareGroupOffsetsResponse(); case ALTER_SHARE_GROUP_OFFSETS: return createAlterShareGroupOffsetsResponse(); case DELETE_SHARE_GROUP_OFFSETS: return createDeleteShareGroupOffsetsResponse(); @@ -3865,34 +3865,48 @@ private DeleteShareGroupOffsetsResponse createDeleteShareGroupOffsetsResponse() } private AbstractRequest createStreamsGroupDescribeRequest(final short version) { - return new StreamsGroupDescribeRequest.Builder(new StreamsGroupDescribeRequestData() + StreamsGroupDescribeRequestData data = new StreamsGroupDescribeRequestData() .setGroupIds(Collections.singletonList("group")) - .setIncludeAuthorizedOperations(false)).build(version); + .setIncludeAuthorizedOperations(false); + if (version >= 1) { + data.setIncludeTopologyDescription(true); + } + return new StreamsGroupDescribeRequest.Builder(data).build(version); } private AbstractRequest createStreamsGroupHeartbeatRequest(final short version) { return new StreamsGroupHeartbeatRequest.Builder(new StreamsGroupHeartbeatRequestData()).build(version); } - private AbstractResponse createStreamsGroupDescribeResponse() { + private AbstractResponse createStreamsGroupDescribeResponse(final short version) { + StreamsGroupDescribeResponseData.DescribedGroup group = + new StreamsGroupDescribeResponseData.DescribedGroup() + .setGroupId("group") + .setErrorCode((short) 0) + .setErrorMessage(Errors.forCode((short) 0).message()) + .setGroupState("EMPTY") + .setGroupEpoch(0) + .setAssignmentEpoch(0) + .setMembers(new ArrayList<>(0)) + .setTopology(null); + if (version >= 1) { + group.setTopologyDescription(new StreamsGroupDescribeResponseData.TopologyDescription() + .setSubtopologies(new ArrayList<>(0)) + .setGlobalStores(new ArrayList<>(0))); + group.setTopologyDescriptionStatus((byte) 3); + } StreamsGroupDescribeResponseData data = new StreamsGroupDescribeResponseData() - .setGroups(Collections.singletonList( - new StreamsGroupDescribeResponseData.DescribedGroup() - .setGroupId("group") - .setErrorCode((short) 0) - .setErrorMessage(Errors.forCode((short) 0).message()) - .setGroupState("EMPTY") - .setGroupEpoch(0) - .setAssignmentEpoch(0) - .setMembers(new ArrayList<>(0)) - .setTopology(null) - )) + .setGroups(Collections.singletonList(group)) .setThrottleTimeMs(1000); return new StreamsGroupDescribeResponse(data); } - private AbstractResponse createStreamsGroupHeartbeatResponse() { - return new StreamsGroupHeartbeatResponse(new StreamsGroupHeartbeatResponseData()); + private AbstractResponse createStreamsGroupHeartbeatResponse(final short version) { + StreamsGroupHeartbeatResponseData data = new StreamsGroupHeartbeatResponseData(); + if (version >= 1) { + data.setTopologyDescriptionRequired(true); + } + return new StreamsGroupHeartbeatResponse(data); } private AbstractRequest createStreamsGroupTopologyDescriptionUpdateRequest(final short version) { From 6ca1370277acd1fa1a2dd40a3512ce822e40664a Mon Sep 17 00:00:00 2001 From: aliehsaeedii Date: Fri, 29 May 2026 11:56:05 +0200 Subject: [PATCH 4/4] KAFKA-20620: Address review feedback on KIP-1331 RPC scaffolding - Bump StreamsGroupHeartbeat request and response to v1 so the new TopologyDescriptionRequired field is actually negotiated. - Mark StreamsGroupTopologyDescriptionUpdate as latestVersionUnstable until the broker handler lands; exclude from RequestQuotaTest via a new UnimplementedApis set. - Fix StreamsGroupDescribeRequest.getErrorResponse to set TopologyDescriptionStatus=ERROR when the client requested topology; previously defaulted to NOT_REQUESTED, contradicting KIP-1331. - Switch the Update RPC getErrorResponse to ApiError.fromThrowable so cause messages survive while UNKNOWN_SERVER_ERROR is sanitized. - Reword "plugin" out of client-visible strings (Errors.java default message, JSON about fields). - Add lifecycle Javadoc to the new Request class (drops duplicated error code list) and error-code Javadoc to the new Response class; document the TopologyDescription/Status biconditional invariant and the errorCounts() vs Status=ERROR caveat on DescribeResponse. - Add "Supported errors" comment block to the new UpdateResponse JSON. - Strengthen tests: nullable SinkTopic round-trip, TopologyDescription null/NOT_STORED coverage, explicit assertion on DeleteGroups v3 per-group ErrorMessage. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../apache/kafka/common/protocol/Errors.java | 2 +- .../requests/StreamsGroupDescribeRequest.java | 18 ++++++--- .../StreamsGroupDescribeResponse.java | 9 +++++ ...GroupTopologyDescriptionUpdateRequest.java | 15 ++++++-- ...roupTopologyDescriptionUpdateResponse.java | 13 +++++++ .../message/StreamsGroupDescribeResponse.json | 4 +- .../message/StreamsGroupHeartbeatRequest.json | 4 +- .../StreamsGroupHeartbeatResponse.json | 2 +- ...GroupTopologyDescriptionUpdateRequest.json | 3 ++ ...roupTopologyDescriptionUpdateResponse.json | 10 +++++ .../common/requests/RequestResponseTest.java | 37 ++++++++++++++++++- .../unit/kafka/server/RequestQuotaTest.scala | 9 ++--- 12 files changed, 105 insertions(+), 21 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index b1eb842181599..15ac7765a2906 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -422,7 +422,7 @@ public enum Errors { STREAMS_TOPOLOGY_FENCED(132, "The supplied topology epoch is outdated.", StreamsTopologyFencedException::new), SHARE_SESSION_LIMIT_REACHED(133, "The limit of share sessions has been reached.", ShareSessionLimitReachedException::new), GROUP_DELETION_FAILED(134, "DeleteGroups could not complete; see the error message on the per-group result for details.", GroupDeletionFailedException::new), - STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED(135, "The topology description plugin failed to process the request.", StreamsTopologyDescriptionUpdateFailedException::new); + STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED(135, "The broker could not process the topology description update; see the error message for details.", StreamsTopologyDescriptionUpdateFailedException::new); private static final Logger log = LoggerFactory.getLogger(Errors.class); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeRequest.java index d4ea35bb64c1c..4aa2c9cd68228 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeRequest.java @@ -58,13 +58,19 @@ public StreamsGroupDescribeRequest(StreamsGroupDescribeRequestData data, short v public StreamsGroupDescribeResponse getErrorResponse(int throttleTimeMs, Throwable e) { StreamsGroupDescribeResponseData data = new StreamsGroupDescribeResponseData() .setThrottleTimeMs(throttleTimeMs); - // Set error for each group + short errorCode = Errors.forException(e).code(); + boolean topologyRequested = this.data.includeTopologyDescription(); this.data.groupIds().forEach( - groupId -> data.groups().add( - new StreamsGroupDescribeResponseData.DescribedGroup() - .setGroupId(groupId) - .setErrorCode(Errors.forException(e).code()) - ) + groupId -> { + StreamsGroupDescribeResponseData.DescribedGroup group = + new StreamsGroupDescribeResponseData.DescribedGroup() + .setGroupId(groupId) + .setErrorCode(errorCode); + if (topologyRequested) { + group.setTopologyDescriptionStatus((byte) 2); // ERROR + } + data.groups().add(group); + } ); return new StreamsGroupDescribeResponse(data); } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeResponse.java index efee6e521f4fc..7b4ca85a34d17 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeResponse.java @@ -35,6 +35,15 @@ * - {@link Errors#INVALID_GROUP_ID} * - {@link Errors#GROUP_ID_NOT_FOUND} * - {@link Errors#TOPIC_AUTHORIZATION_FAILED} + * + *

TopologyDescription invariant (v1+): the {@code TopologyDescription} field is non-null + * if and only if {@code TopologyDescriptionStatus} is {@code AVAILABLE} (3). The broker MUST + * set the status to {@code AVAILABLE} whenever it attaches a {@code TopologyDescription}, + * and leave {@code TopologyDescription} null for any other status value. + * + *

Note: {@code TopologyDescriptionStatus == ERROR} (2) is a data-level signal, not a + * protocol error, and is intentionally not reflected in {@link #errorCounts()}. Operators + * tracking topology-description failures should monitor the status field directly. */ public class StreamsGroupDescribeResponse extends AbstractResponse { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupTopologyDescriptionUpdateRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupTopologyDescriptionUpdateRequest.java index c9960eae3413c..962e212462f4d 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupTopologyDescriptionUpdateRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupTopologyDescriptionUpdateRequest.java @@ -19,9 +19,17 @@ import org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateRequestData; import org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateResponseData; import org.apache.kafka.common.protocol.ApiKeys; -import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Readable; +/** + * Sent by a Streams client to push its topology description to the broker, in response + * to {@code TopologyDescriptionRequired=true} on a {@code StreamsGroupHeartbeatResponse}. + * The broker validates that {@code MemberId} still belongs to the group, checks the + * {@code TopologyEpoch} against the group's current epoch, and persists the description. + * See KIP-1331. + * + *

Legal error codes are documented on {@link StreamsGroupTopologyDescriptionUpdateResponse}. + */ public class StreamsGroupTopologyDescriptionUpdateRequest extends AbstractRequest { public static class Builder extends AbstractRequest.Builder { @@ -52,11 +60,12 @@ public StreamsGroupTopologyDescriptionUpdateRequest(StreamsGroupTopologyDescript @Override public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { + ApiError apiError = ApiError.fromThrowable(e); return new StreamsGroupTopologyDescriptionUpdateResponse( new StreamsGroupTopologyDescriptionUpdateResponseData() .setThrottleTimeMs(throttleTimeMs) - .setErrorCode(Errors.forException(e).code()) - .setErrorMessage(Errors.forException(e).message()) + .setErrorCode(apiError.error().code()) + .setErrorMessage(apiError.message()) ); } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupTopologyDescriptionUpdateResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupTopologyDescriptionUpdateResponse.java index 3ca0ac3ebfc50..fb93bf49ccd73 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupTopologyDescriptionUpdateResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupTopologyDescriptionUpdateResponse.java @@ -23,6 +23,19 @@ import java.util.Map; +/** + * Possible error codes. + * + * - {@link Errors#GROUP_AUTHORIZATION_FAILED} + * - {@link Errors#NOT_COORDINATOR} + * - {@link Errors#COORDINATOR_NOT_AVAILABLE} + * - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS} + * - {@link Errors#INVALID_REQUEST} + * - {@link Errors#UNSUPPORTED_VERSION} + * - {@link Errors#UNKNOWN_MEMBER_ID} + * - {@link Errors#GROUP_ID_NOT_FOUND} + * - {@link Errors#STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED} + */ public class StreamsGroupTopologyDescriptionUpdateResponse extends AbstractResponse { private final StreamsGroupTopologyDescriptionUpdateResponseData data; diff --git a/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json b/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json index 53db33c2034f6..2c7e93e735f7d 100644 --- a/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json +++ b/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json @@ -110,9 +110,9 @@ "about": "32-bit bitfield to represent authorized operations for this group." }, { "name": "TopologyDescription", "type": "TopologyDescription", "versions": "1+", "nullableVersions": "1+", "default": "null", - "about": "Full topology description retrieved from the topology description plugin. Null if the client did not request it, no plugin is configured, no description is stored for the group, or retrieval failed." }, + "about": "The full topology description for this group. Non-null if and only if TopologyDescriptionStatus is AVAILABLE (3); null otherwise." }, { "name": "TopologyDescriptionStatus", "type": "int8", "versions": "1+", "default": "0", - "about": "The status of the topology description for this group: 0=NOT_REQUESTED (client did not set IncludeTopologyDescription), 1=NOT_STORED (no topology description has been recorded for this group), 2=ERROR (the broker failed to fetch the topology description; check broker logs), 3=AVAILABLE (a topology description is present in the TopologyDescription field). The broker MUST set this field to AVAILABLE whenever it attaches a TopologyDescription." } + "about": "The status of the topology description for this group, paired with TopologyDescription: 0=NOT_REQUESTED (client did not set IncludeTopologyDescription; TopologyDescription is null); 1=NOT_STORED (no description recorded for this group; TopologyDescription is null); 2=ERROR (broker failed to fetch the description, see broker logs; TopologyDescription is null); 3=AVAILABLE (TopologyDescription is non-null and carries the description)." } ] } ], diff --git a/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json b/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json index a2cba46e76322..e901ad5fa5689 100644 --- a/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json +++ b/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json @@ -18,7 +18,9 @@ "type": "request", "listeners": ["broker"], "name": "StreamsGroupHeartbeatRequest", - "validVersions": "0", + // Version 1 is the same as version 0; bumped together with StreamsGroupHeartbeatResponse v1, + // which adds TopologyDescriptionRequired (KIP-1331). Required so the response v1 is negotiated. + "validVersions": "0-1", "flexibleVersions": "0+", "fields": [ { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", diff --git a/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json b/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json index cd3f6091401e2..a70fbb7258c4b 100644 --- a/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json +++ b/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json @@ -66,7 +66,7 @@ "about": "Assigned warm-up tasks for this client. Null if unchanged since last heartbeat." }, { "name": "TopologyDescriptionRequired", "type": "bool", "versions": "1+", "default": "false", - "about": "True if the broker's topology description plugin does not have an up-to-date topology description for this group. The client should send the topology description via StreamsGroupTopologyDescriptionUpdate." }, + "about": "True if the broker does not have an up-to-date topology description for this group. The client should send the topology description via StreamsGroupTopologyDescriptionUpdate." }, // IQ-related information { "name": "EndpointInformationEpoch", "type": "int32", "versions": "0+", diff --git a/clients/src/main/resources/common/message/StreamsGroupTopologyDescriptionUpdateRequest.json b/clients/src/main/resources/common/message/StreamsGroupTopologyDescriptionUpdateRequest.json index 56570e25f272a..78b244d4b5e29 100644 --- a/clients/src/main/resources/common/message/StreamsGroupTopologyDescriptionUpdateRequest.json +++ b/clients/src/main/resources/common/message/StreamsGroupTopologyDescriptionUpdateRequest.json @@ -18,6 +18,9 @@ "type": "request", "listeners": ["broker"], "name": "StreamsGroupTopologyDescriptionUpdateRequest", + // The broker handler is not yet implemented (deferred to a later sub-task of KIP-1331), + // so the latest version is marked unstable to suppress ApiVersions advertisement. + "latestVersionUnstable": true, "validVersions": "0", "flexibleVersions": "0+", "fields": [ diff --git a/clients/src/main/resources/common/message/StreamsGroupTopologyDescriptionUpdateResponse.json b/clients/src/main/resources/common/message/StreamsGroupTopologyDescriptionUpdateResponse.json index 9edd99f1e0b3a..e03b4215cd300 100644 --- a/clients/src/main/resources/common/message/StreamsGroupTopologyDescriptionUpdateResponse.json +++ b/clients/src/main/resources/common/message/StreamsGroupTopologyDescriptionUpdateResponse.json @@ -19,6 +19,16 @@ "name": "StreamsGroupTopologyDescriptionUpdateResponse", "validVersions": "0", "flexibleVersions": "0+", + // Supported errors: + // - GROUP_AUTHORIZATION_FAILED (version 0+) + // - NOT_COORDINATOR (version 0+) + // - COORDINATOR_NOT_AVAILABLE (version 0+) + // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) + // - INVALID_REQUEST (version 0+) + // - UNSUPPORTED_VERSION (version 0+) + // - UNKNOWN_MEMBER_ID (version 0+) + // - GROUP_ID_NOT_FOUND (version 0+) + // - STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED (version 0+) "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 812e0cb2f4750..8baa1c279151d 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -705,6 +705,17 @@ public void testCreateTopicRequestV3FailsIfNoPartitionsOrReplicas() { assertTrue(exception.getMessage().contains("[foo, bar]")); } + @Test + public void testDeleteGroupsResponseV3PreservesErrorMessage() { + DeleteGroupsResponse response = createDeleteGroupsResponse(); + short version = ApiKeys.DELETE_GROUPS.latestVersion(); + DeleteGroupsResponse parsed = DeleteGroupsResponse.parse(response.serialize(version), version); + DeletableGroupResult failed = parsed.data().results().find("failed-group"); + assertNotNull(failed); + assertEquals(Errors.GROUP_DELETION_FAILED.code(), failed.errorCode()); + assertEquals("plugin offline", failed.errorMessage()); + } + @Test public void testFetchRequestIsolationLevel() { FetchRequest request = createFetchRequest((short) 4, IsolationLevel.READ_COMMITTED); @@ -3895,8 +3906,25 @@ private AbstractResponse createStreamsGroupDescribeResponse(final short version) .setGlobalStores(new ArrayList<>(0))); group.setTopologyDescriptionStatus((byte) 3); } + List groups = new ArrayList<>(); + groups.add(group); + if (version >= 1) { + StreamsGroupDescribeResponseData.DescribedGroup notStoredGroup = + new StreamsGroupDescribeResponseData.DescribedGroup() + .setGroupId("group-without-description") + .setErrorCode((short) 0) + .setErrorMessage(Errors.forCode((short) 0).message()) + .setGroupState("EMPTY") + .setGroupEpoch(0) + .setAssignmentEpoch(0) + .setMembers(new ArrayList<>(0)) + .setTopology(null) + .setTopologyDescription(null) + .setTopologyDescriptionStatus((byte) 1); + groups.add(notStoredGroup); + } StreamsGroupDescribeResponseData data = new StreamsGroupDescribeResponseData() - .setGroups(Collections.singletonList(group)) + .setGroups(groups) .setThrottleTimeMs(1000); return new StreamsGroupDescribeResponse(data); } @@ -3927,10 +3955,15 @@ private AbstractRequest createStreamsGroupTopologyDescriptionUpdateRequest(final .setName("KSTREAM-SINK-0000000002") .setNodeType((byte) 3) .setSinkTopic("output-topic"); + StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionNode dynamicSinkNode = + new StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionNode() + .setName("KSTREAM-SINK-0000000005") + .setNodeType((byte) 3) + .setSinkTopic(null); StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionSubtopology subtopology = new StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionSubtopology() .setSubtopologyId("0") - .setNodes(List.of(sourceNode, processorNode, sinkNode)); + .setNodes(List.of(sourceNode, processorNode, sinkNode, dynamicSinkNode)); StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionNode globalSource = new StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionNode() .setName("KSTREAM-GLOBAL-SOURCE-0000000003") diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index 02c523f0d3f4b..463d1e32ecc78 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -223,7 +223,7 @@ class RequestQuotaTest extends BaseRequestTest { RequestQuotaTest.principal = RequestQuotaTest.UnauthorizedPrincipal val apiKeys = ApiKeys.brokerApis - for (apiKey <- apiKeys.asScala.toSet -- RequestQuotaTest.Envelope) { + for (apiKey <- apiKeys.asScala.toSet -- RequestQuotaTest.Envelope -- RequestQuotaTest.UnimplementedApis) { submitTest(apiKey, () => checkUnauthorizedRequestThrottle(apiKey)) } @@ -231,7 +231,7 @@ class RequestQuotaTest extends BaseRequestTest { } private def clientActions: Set[ApiKeys] = { - ApiKeys.brokerApis.asScala.toSet -- clusterActions -- RequestQuotaTest.SaslActions -- RequestQuotaTest.Envelope + ApiKeys.brokerApis.asScala.toSet -- clusterActions -- RequestQuotaTest.SaslActions -- RequestQuotaTest.Envelope -- RequestQuotaTest.UnimplementedApis } private def clusterActions: Set[ApiKeys] = { @@ -758,9 +758,6 @@ class RequestQuotaTest extends BaseRequestTest { case ApiKeys.STREAMS_GROUP_DESCRIBE => new StreamsGroupDescribeRequest.Builder(new StreamsGroupDescribeRequestData()) - case ApiKeys.STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE => - new StreamsGroupTopologyDescriptionUpdateRequest.Builder(new StreamsGroupTopologyDescriptionUpdateRequestData()) - case ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS => new DescribeShareGroupOffsetsRequest.Builder(new DescribeShareGroupOffsetsRequestData()) @@ -908,6 +905,8 @@ object RequestQuotaTest { val Envelope = Set(ApiKeys.ENVELOPE) val ShareGroupState = Set(ApiKeys.INITIALIZE_SHARE_GROUP_STATE, ApiKeys.READ_SHARE_GROUP_STATE, ApiKeys.WRITE_SHARE_GROUP_STATE, ApiKeys.DELETE_SHARE_GROUP_STATE, ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY) + // APIs whose broker handler has not been wired yet; exclude from quota tests until they land. + val UnimplementedApis: Set[ApiKeys] = Set(ApiKeys.STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE) val UnauthorizedPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "Unauthorized") // Principal used for all client connections. This is modified by tests which