diff --git a/clients/src/main/java/org/apache/kafka/common/errors/GroupDeletionFailedException.java b/clients/src/main/java/org/apache/kafka/common/errors/GroupDeletionFailedException.java new file mode 100644 index 0000000000000..eaa8316052618 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/GroupDeletionFailedException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +/** + * Indicates that {@code DeleteGroups} could not complete for the affected group. The + * accompanying error message describes the underlying cause; the caller may retry once + * the underlying condition is resolved. + */ +public class GroupDeletionFailedException extends ApiException { + + private static final long serialVersionUID = 1L; + + public GroupDeletionFailedException(String message) { + super(message); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/errors/StreamsTopologyDescriptionUpdateFailedException.java b/clients/src/main/java/org/apache/kafka/common/errors/StreamsTopologyDescriptionUpdateFailedException.java new file mode 100644 index 0000000000000..df981b8b5b936 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/StreamsTopologyDescriptionUpdateFailedException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +/** + * Indicates that the streams group topology description plugin failed to process + * a StreamsGroupTopologyDescriptionUpdate request. The accompanying error message + * describes the underlying cause; the broker tracks the transient-vs-permanent + * distinction internally and does not reflect it on the wire. + */ +public class StreamsTopologyDescriptionUpdateFailedException extends ApiException { + + private static final long serialVersionUID = 1L; + + public StreamsTopologyDescriptionUpdateFailedException(String message) { + super(message); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index 79b283b4f8d89..22dc37e67d146 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -137,7 +137,8 @@ public enum ApiKeys { STREAMS_GROUP_DESCRIBE(ApiMessageType.STREAMS_GROUP_DESCRIBE), DESCRIBE_SHARE_GROUP_OFFSETS(ApiMessageType.DESCRIBE_SHARE_GROUP_OFFSETS), ALTER_SHARE_GROUP_OFFSETS(ApiMessageType.ALTER_SHARE_GROUP_OFFSETS), - DELETE_SHARE_GROUP_OFFSETS(ApiMessageType.DELETE_SHARE_GROUP_OFFSETS); + DELETE_SHARE_GROUP_OFFSETS(ApiMessageType.DELETE_SHARE_GROUP_OFFSETS), + STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE(ApiMessageType.STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE); private static final Map> APIS_BY_LISTENER = new EnumMap<>(ApiMessageType.ListenerType.class); diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index a27a7fcf23c77..15ac7765a2906 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -45,6 +45,7 @@ import org.apache.kafka.common.errors.FetchSessionIdNotFoundException; import org.apache.kafka.common.errors.FetchSessionTopicIdException; import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.errors.GroupDeletionFailedException; import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.GroupMaxSizeReachedException; import org.apache.kafka.common.errors.GroupNotEmptyException; @@ -122,6 +123,7 @@ import org.apache.kafka.common.errors.StaleMemberEpochException; import org.apache.kafka.common.errors.StreamsInvalidTopologyEpochException; import org.apache.kafka.common.errors.StreamsInvalidTopologyException; +import org.apache.kafka.common.errors.StreamsTopologyDescriptionUpdateFailedException; import org.apache.kafka.common.errors.StreamsTopologyFencedException; import org.apache.kafka.common.errors.TelemetryTooLargeException; import org.apache.kafka.common.errors.ThrottlingQuotaExceededException; @@ -418,7 +420,9 @@ public enum Errors { STREAMS_INVALID_TOPOLOGY(130, "The supplied topology is invalid.", StreamsInvalidTopologyException::new), STREAMS_INVALID_TOPOLOGY_EPOCH(131, "The supplied topology epoch is invalid.", StreamsInvalidTopologyEpochException::new), STREAMS_TOPOLOGY_FENCED(132, "The supplied topology epoch is outdated.", StreamsTopologyFencedException::new), - SHARE_SESSION_LIMIT_REACHED(133, "The limit of share sessions has been reached.", ShareSessionLimitReachedException::new); + SHARE_SESSION_LIMIT_REACHED(133, "The limit of share sessions has been reached.", ShareSessionLimitReachedException::new), + GROUP_DELETION_FAILED(134, "DeleteGroups could not complete; see the error message on the per-group result for details.", GroupDeletionFailedException::new), + STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED(135, "The broker could not process the topology description update; see the error message for details.", StreamsTopologyDescriptionUpdateFailedException::new); private static final Logger log = LoggerFactory.getLogger(Errors.class); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java index 750de2050f432..8630c379ed6d3 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java @@ -354,6 +354,8 @@ private static AbstractRequest doParseRequest(ApiKeys apiKey, short apiVersion, return AlterShareGroupOffsetsRequest.parse(readable, apiVersion); case DELETE_SHARE_GROUP_OFFSETS: return DeleteShareGroupOffsetsRequest.parse(readable, apiVersion); + case STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE: + return StreamsGroupTopologyDescriptionUpdateRequest.parse(readable, apiVersion); default: throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " + "code should be updated to do so.", apiKey)); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java index 7d96d1a17317c..961559d9e8fc1 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java @@ -290,6 +290,8 @@ public static AbstractResponse parseResponse(ApiKeys apiKey, Readable readable, return AlterShareGroupOffsetsResponse.parse(readable, version); case DELETE_SHARE_GROUP_OFFSETS: return DeleteShareGroupOffsetsResponse.parse(readable, version); + case STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE: + return StreamsGroupTopologyDescriptionUpdateResponse.parse(readable, version); default: throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " + "code should be updated to do so.", apiKey)); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeRequest.java index d4ea35bb64c1c..4aa2c9cd68228 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeRequest.java @@ -58,13 +58,19 @@ public StreamsGroupDescribeRequest(StreamsGroupDescribeRequestData data, short v public StreamsGroupDescribeResponse getErrorResponse(int throttleTimeMs, Throwable e) { StreamsGroupDescribeResponseData data = new StreamsGroupDescribeResponseData() .setThrottleTimeMs(throttleTimeMs); - // Set error for each group + short errorCode = Errors.forException(e).code(); + boolean topologyRequested = this.data.includeTopologyDescription(); this.data.groupIds().forEach( - groupId -> data.groups().add( - new StreamsGroupDescribeResponseData.DescribedGroup() - .setGroupId(groupId) - .setErrorCode(Errors.forException(e).code()) - ) + groupId -> { + StreamsGroupDescribeResponseData.DescribedGroup group = + new StreamsGroupDescribeResponseData.DescribedGroup() + .setGroupId(groupId) + .setErrorCode(errorCode); + if (topologyRequested) { + group.setTopologyDescriptionStatus((byte) 2); // ERROR + } + data.groups().add(group); + } ); return new StreamsGroupDescribeResponse(data); } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeResponse.java index efee6e521f4fc..7b4ca85a34d17 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeResponse.java @@ -35,6 +35,15 @@ * - {@link Errors#INVALID_GROUP_ID} * - {@link Errors#GROUP_ID_NOT_FOUND} * - {@link Errors#TOPIC_AUTHORIZATION_FAILED} + * + *

TopologyDescription invariant (v1+): the {@code TopologyDescription} field is non-null + * if and only if {@code TopologyDescriptionStatus} is {@code AVAILABLE} (3). The broker MUST + * set the status to {@code AVAILABLE} whenever it attaches a {@code TopologyDescription}, + * and leave {@code TopologyDescription} null for any other status value. + * + *

Note: {@code TopologyDescriptionStatus == ERROR} (2) is a data-level signal, not a + * protocol error, and is intentionally not reflected in {@link #errorCounts()}. Operators + * tracking topology-description failures should monitor the status field directly. */ public class StreamsGroupDescribeResponse extends AbstractResponse { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupTopologyDescriptionUpdateRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupTopologyDescriptionUpdateRequest.java new file mode 100644 index 0000000000000..962e212462f4d --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupTopologyDescriptionUpdateRequest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateRequestData; +import org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Readable; + +/** + * Sent by a Streams client to push its topology description to the broker, in response + * to {@code TopologyDescriptionRequired=true} on a {@code StreamsGroupHeartbeatResponse}. + * The broker validates that {@code MemberId} still belongs to the group, checks the + * {@code TopologyEpoch} against the group's current epoch, and persists the description. + * See KIP-1331. + * + *

Legal error codes are documented on {@link StreamsGroupTopologyDescriptionUpdateResponse}. + */ +public class StreamsGroupTopologyDescriptionUpdateRequest extends AbstractRequest { + + public static class Builder extends AbstractRequest.Builder { + private final StreamsGroupTopologyDescriptionUpdateRequestData data; + + public Builder(StreamsGroupTopologyDescriptionUpdateRequestData data) { + super(ApiKeys.STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE); + this.data = data; + } + + @Override + public StreamsGroupTopologyDescriptionUpdateRequest build(short version) { + return new StreamsGroupTopologyDescriptionUpdateRequest(data, version); + } + + @Override + public String toString() { + return data.toString(); + } + } + + private final StreamsGroupTopologyDescriptionUpdateRequestData data; + + public StreamsGroupTopologyDescriptionUpdateRequest(StreamsGroupTopologyDescriptionUpdateRequestData data, short version) { + super(ApiKeys.STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE, version); + this.data = data; + } + + @Override + public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { + ApiError apiError = ApiError.fromThrowable(e); + return new StreamsGroupTopologyDescriptionUpdateResponse( + new StreamsGroupTopologyDescriptionUpdateResponseData() + .setThrottleTimeMs(throttleTimeMs) + .setErrorCode(apiError.error().code()) + .setErrorMessage(apiError.message()) + ); + } + + @Override + public StreamsGroupTopologyDescriptionUpdateRequestData data() { + return data; + } + + public static StreamsGroupTopologyDescriptionUpdateRequest parse(Readable readable, short version) { + return new StreamsGroupTopologyDescriptionUpdateRequest( + new StreamsGroupTopologyDescriptionUpdateRequestData(readable, version), version); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupTopologyDescriptionUpdateResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupTopologyDescriptionUpdateResponse.java new file mode 100644 index 0000000000000..fb93bf49ccd73 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupTopologyDescriptionUpdateResponse.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.Readable; + +import java.util.Map; + +/** + * Possible error codes. + * + * - {@link Errors#GROUP_AUTHORIZATION_FAILED} + * - {@link Errors#NOT_COORDINATOR} + * - {@link Errors#COORDINATOR_NOT_AVAILABLE} + * - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS} + * - {@link Errors#INVALID_REQUEST} + * - {@link Errors#UNSUPPORTED_VERSION} + * - {@link Errors#UNKNOWN_MEMBER_ID} + * - {@link Errors#GROUP_ID_NOT_FOUND} + * - {@link Errors#STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED} + */ +public class StreamsGroupTopologyDescriptionUpdateResponse extends AbstractResponse { + + private final StreamsGroupTopologyDescriptionUpdateResponseData data; + + public StreamsGroupTopologyDescriptionUpdateResponse(StreamsGroupTopologyDescriptionUpdateResponseData data) { + super(ApiKeys.STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE); + this.data = data; + } + + @Override + public StreamsGroupTopologyDescriptionUpdateResponseData data() { + return data; + } + + @Override + public Map errorCounts() { + return errorCounts(Errors.forCode(data.errorCode())); + } + + @Override + public int throttleTimeMs() { + return data.throttleTimeMs(); + } + + @Override + public void maybeSetThrottleTimeMs(int throttleTimeMs) { + data.setThrottleTimeMs(throttleTimeMs); + } + + public static StreamsGroupTopologyDescriptionUpdateResponse parse(Readable readable, short version) { + return new StreamsGroupTopologyDescriptionUpdateResponse( + new StreamsGroupTopologyDescriptionUpdateResponseData(readable, version)); + } +} diff --git a/clients/src/main/resources/common/message/DeleteGroupsRequest.json b/clients/src/main/resources/common/message/DeleteGroupsRequest.json index 7d7c437178997..5c55a9939b76c 100644 --- a/clients/src/main/resources/common/message/DeleteGroupsRequest.json +++ b/clients/src/main/resources/common/message/DeleteGroupsRequest.json @@ -21,7 +21,11 @@ // Version 1 is the same as version 0. // // Version 2 is the first flexible version. - "validVersions": "0-2", + // + // Version 3 corresponds to the addition of an ErrorMessage field on each per-group + // result in the response (populated when the broker needs to surface a cause to the + // caller). The request body shape is unchanged at version 3. + "validVersions": "0-3", "flexibleVersions": "2+", "fields": [ { "name": "GroupsNames", "type": "[]string", "versions": "0+", "entityType": "groupId", diff --git a/clients/src/main/resources/common/message/DeleteGroupsResponse.json b/clients/src/main/resources/common/message/DeleteGroupsResponse.json index 168cde03ba341..4f5ad712a63ad 100644 --- a/clients/src/main/resources/common/message/DeleteGroupsResponse.json +++ b/clients/src/main/resources/common/message/DeleteGroupsResponse.json @@ -20,7 +20,10 @@ // Starting in version 1, on quota violation, brokers send out responses before throttling. // // Version 2 is the first flexible version. - "validVersions": "0-2", + // + // Version 3 adds the per-group ErrorMessage field so the broker can surface a cause + // string when GROUP_DELETION_FAILED (or any other non-NONE error) is returned for a group. + "validVersions": "0-3", "flexibleVersions": "2+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", @@ -30,7 +33,9 @@ { "name": "GroupId", "type": "string", "versions": "0+", "mapKey": true, "entityType": "groupId", "about": "The group id." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", - "about": "The deletion error, or 0 if the deletion succeeded." } + "about": "The deletion error, or 0 if the deletion succeeded." }, + { "name": "ErrorMessage", "type": "string", "versions": "3+", "nullableVersions": "3+", "ignorable": true, "default": "null", + "about": "The error message, or null if there was no error." } ]} ] } diff --git a/clients/src/main/resources/common/message/StreamsGroupDescribeRequest.json b/clients/src/main/resources/common/message/StreamsGroupDescribeRequest.json index 6e36479043aa0..b2ac1a9be79b0 100644 --- a/clients/src/main/resources/common/message/StreamsGroupDescribeRequest.json +++ b/clients/src/main/resources/common/message/StreamsGroupDescribeRequest.json @@ -18,12 +18,15 @@ "type": "request", "listeners": ["broker"], "name": "StreamsGroupDescribeRequest", - "validVersions": "0", + // Version 1 adds IncludeTopologyDescription (KIP-1331). + "validVersions": "0-1", "flexibleVersions": "0+", "fields": [ { "name": "GroupIds", "type": "[]string", "versions": "0+", "entityType": "groupId", "about": "The ids of the groups to describe" }, { "name": "IncludeAuthorizedOperations", "type": "bool", "versions": "0+", - "about": "Whether to include authorized operations." } + "about": "Whether to include authorized operations." }, + { "name": "IncludeTopologyDescription", "type": "bool", "versions": "1+", "default": "false", + "about": "Whether to include the full topology description from the topology description plugin in the response." } ] } \ No newline at end of file diff --git a/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json b/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json index b99f9c00b0840..2c7e93e735f7d 100644 --- a/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json +++ b/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json @@ -17,7 +17,8 @@ "apiKey": 89, "type": "response", "name": "StreamsGroupDescribeResponse", - "validVersions": "0", + // Version 1 adds TopologyDescription and TopologyDescriptionStatus (KIP-1331). + "validVersions": "0-1", "flexibleVersions": "0+", // Supported errors: // - GROUP_AUTHORIZATION_FAILED (version 0+) @@ -106,7 +107,12 @@ "about": "True for classic members that have not been upgraded yet." } ]}, { "name": "AuthorizedOperations", "type": "int32", "versions": "0+", "default": "-2147483648", - "about": "32-bit bitfield to represent authorized operations for this group." } + "about": "32-bit bitfield to represent authorized operations for this group." }, + { "name": "TopologyDescription", "type": "TopologyDescription", "versions": "1+", + "nullableVersions": "1+", "default": "null", + "about": "The full topology description for this group. Non-null if and only if TopologyDescriptionStatus is AVAILABLE (3); null otherwise." }, + { "name": "TopologyDescriptionStatus", "type": "int8", "versions": "1+", "default": "0", + "about": "The status of the topology description for this group, paired with TopologyDescription: 0=NOT_REQUESTED (client did not set IncludeTopologyDescription; TopologyDescription is null); 1=NOT_STORED (no description recorded for this group; TopologyDescription is null); 2=ERROR (broker failed to fetch the description, see broker logs; TopologyDescription is null); 3=AVAILABLE (TopologyDescription is non-null and carries the description)." } ] } ], @@ -155,6 +161,39 @@ { "name": "TopicConfigs", "type": "[]KeyValue", "versions": "0+", "about": "Topic-level configurations as key-value pairs." } + ]}, + { "name": "TopologyDescription", "versions": "1+", "fields": [ + { "name": "Subtopologies", "type": "[]TopologyDescriptionSubtopology", "versions": "1+", + "about": "The subtopologies that make up this topology." }, + { "name": "GlobalStores", "type": "[]TopologyDescriptionGlobalStore", "versions": "1+", + "about": "Global state stores used by this topology." } + ]}, + { "name": "TopologyDescriptionSubtopology", "versions": "1+", "fields": [ + { "name": "SubtopologyId", "type": "string", "versions": "1+", + "about": "The subtopology identifier, unique within the topology." }, + { "name": "Nodes", "type": "[]TopologyDescriptionNode", "versions": "1+", + "about": "The processing nodes in this subtopology." } + ]}, + { "name": "TopologyDescriptionNode", "versions": "1+", "fields": [ + { "name": "Name", "type": "string", "versions": "1+", + "about": "The name of this node (e.g., KSTREAM-SOURCE-0000000000)." }, + { "name": "NodeType", "type": "int8", "versions": "1+", + "about": "The type of this node: 1=SOURCE, 2=PROCESSOR, 3=SINK." }, + { "name": "SourceTopics", "type": "[]string", "versions": "1+", "entityType": "topicName", + "about": "The source topics this node reads from. Defined only for source nodes, may be empty if source topics are dynamically determined." }, + { "name": "SinkTopic", "type": "string", "versions": "1+", "entityType": "topicName", + "nullableVersions": "1+", "default": "null", + "about": "The topic this node writes to. Defined only for sink nodes, may be null if sink topic is dynamically determined." }, + { "name": "Stores", "type": "[]string", "versions": "1+", + "about": "The state store names accessed by this node. Defined only for processor nodes." }, + { "name": "Successors", "type": "[]string", "versions": "1+", + "about": "The names of successor nodes in the processing graph. Predecessor relationships are reconstructed from this field." } + ]}, + { "name": "TopologyDescriptionGlobalStore", "versions": "1+", "fields": [ + { "name": "Source", "type": "TopologyDescriptionNode", "versions": "1+", + "about": "The source node providing data to the global store." }, + { "name": "Processor", "type": "TopologyDescriptionNode", "versions": "1+", + "about": "The processor node that populates the global store." } ]} ] } \ No newline at end of file diff --git a/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json b/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json index a2cba46e76322..e901ad5fa5689 100644 --- a/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json +++ b/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json @@ -18,7 +18,9 @@ "type": "request", "listeners": ["broker"], "name": "StreamsGroupHeartbeatRequest", - "validVersions": "0", + // Version 1 is the same as version 0; bumped together with StreamsGroupHeartbeatResponse v1, + // which adds TopologyDescriptionRequired (KIP-1331). Required so the response v1 is negotiated. + "validVersions": "0-1", "flexibleVersions": "0+", "fields": [ { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", diff --git a/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json b/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json index 27cf47bb1a4c7..a70fbb7258c4b 100644 --- a/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json +++ b/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json @@ -17,7 +17,8 @@ "apiKey": 88, "type": "response", "name": "StreamsGroupHeartbeatResponse", - "validVersions": "0", + // Version 1 adds TopologyDescriptionRequired (KIP-1331). + "validVersions": "0-1", "flexibleVersions": "0+", // Supported errors: // - GROUP_AUTHORIZATION_FAILED (version 0+) @@ -64,6 +65,9 @@ { "name": "WarmupTasks", "type": "[]TaskIds", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "Assigned warm-up tasks for this client. Null if unchanged since last heartbeat." }, + { "name": "TopologyDescriptionRequired", "type": "bool", "versions": "1+", "default": "false", + "about": "True if the broker does not have an up-to-date topology description for this group. The client should send the topology description via StreamsGroupTopologyDescriptionUpdate." }, + // IQ-related information { "name": "EndpointInformationEpoch", "type": "int32", "versions": "0+", "about": "The endpoint epoch set in the response"}, diff --git a/clients/src/main/resources/common/message/StreamsGroupTopologyDescriptionUpdateRequest.json b/clients/src/main/resources/common/message/StreamsGroupTopologyDescriptionUpdateRequest.json new file mode 100644 index 0000000000000..78b244d4b5e29 --- /dev/null +++ b/clients/src/main/resources/common/message/StreamsGroupTopologyDescriptionUpdateRequest.json @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 93, + "type": "request", + "listeners": ["broker"], + "name": "StreamsGroupTopologyDescriptionUpdateRequest", + // The broker handler is not yet implemented (deferred to a later sub-task of KIP-1331), + // so the latest version is marked unstable to suppress ApiVersions advertisement. + "latestVersionUnstable": true, + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", + "about": "The streams group identifier." }, + { "name": "MemberId", "type": "string", "versions": "0+", + "about": "The ID of the streams group member sending the push. The broker validates that this member is still in the group; mismatches (including when the group itself has been deleted) are rejected with UNKNOWN_MEMBER_ID so the client treats itself as fenced and rejoins." }, + { "name": "TopologyEpoch", "type": "int32", "versions": "0+", + "about": "The epoch of the topology being described." }, + { "name": "TopologyDescription", "type": "TopologyDescription", "versions": "0+", + "about": "The topology description." } + ], + "commonStructs": [ + { "name": "TopologyDescription", "versions": "0+", "fields": [ + { "name": "Subtopologies", "type": "[]TopologyDescriptionSubtopology", "versions": "0+", + "about": "The subtopologies that make up this topology." }, + { "name": "GlobalStores", "type": "[]TopologyDescriptionGlobalStore", "versions": "0+", + "about": "Global state stores used by this topology." } + ]}, + { "name": "TopologyDescriptionSubtopology", "versions": "0+", "fields": [ + { "name": "SubtopologyId", "type": "string", "versions": "0+", + "about": "The subtopology identifier, unique within the topology." }, + { "name": "Nodes", "type": "[]TopologyDescriptionNode", "versions": "0+", + "about": "The processing nodes in this subtopology." } + ]}, + { "name": "TopologyDescriptionNode", "versions": "0+", "fields": [ + { "name": "Name", "type": "string", "versions": "0+", + "about": "The name of this node (e.g., KSTREAM-SOURCE-0000000000)." }, + { "name": "NodeType", "type": "int8", "versions": "0+", + "about": "The type of this node: 1=SOURCE, 2=PROCESSOR, 3=SINK." }, + { "name": "SourceTopics", "type": "[]string", "versions": "0+", "entityType": "topicName", + "about": "The source topics this node reads from. Defined only for source nodes, may be empty if source topics are dynamically determined." }, + { "name": "SinkTopic", "type": "string", "versions": "0+", "entityType": "topicName", + "nullableVersions": "0+", "default": "null", + "about": "The topic this node writes to. Defined only for sink nodes, may be null if sink topic is dynamically determined." }, + { "name": "Stores", "type": "[]string", "versions": "0+", + "about": "The state store names accessed by this node. Defined only for processor nodes." }, + { "name": "Successors", "type": "[]string", "versions": "0+", + "about": "The names of successor nodes in the processing graph. Predecessor relationships are reconstructed from this field." } + ]}, + { "name": "TopologyDescriptionGlobalStore", "versions": "0+", "fields": [ + { "name": "Source", "type": "TopologyDescriptionNode", "versions": "0+", + "about": "The source node providing data to the global store." }, + { "name": "Processor", "type": "TopologyDescriptionNode", "versions": "0+", + "about": "The processor node that populates the global store." } + ]} + ] +} diff --git a/clients/src/main/resources/common/message/StreamsGroupTopologyDescriptionUpdateResponse.json b/clients/src/main/resources/common/message/StreamsGroupTopologyDescriptionUpdateResponse.json new file mode 100644 index 0000000000000..e03b4215cd300 --- /dev/null +++ b/clients/src/main/resources/common/message/StreamsGroupTopologyDescriptionUpdateResponse.json @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 93, + "type": "response", + "name": "StreamsGroupTopologyDescriptionUpdateResponse", + "validVersions": "0", + "flexibleVersions": "0+", + // Supported errors: + // - GROUP_AUTHORIZATION_FAILED (version 0+) + // - NOT_COORDINATOR (version 0+) + // - COORDINATOR_NOT_AVAILABLE (version 0+) + // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) + // - INVALID_REQUEST (version 0+) + // - UNSUPPORTED_VERSION (version 0+) + // - UNKNOWN_MEMBER_ID (version 0+) + // - GROUP_ID_NOT_FOUND (version 0+) + // - STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED (version 0+) + "fields": [ + { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", + "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The top-level error code, or 0 if there was no error." }, + { "name": "ErrorMessage", "type": "string", "versions": "0+", + "nullableVersions": "0+", "default": "null", + "about": "The top-level error message, or null if there was no error." } + ] +} diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 29bc1213c63a4..8baa1c279151d 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -234,6 +234,8 @@ import org.apache.kafka.common.message.StreamsGroupDescribeResponseData; import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData; import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData; +import org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateRequestData; +import org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateResponseData; import org.apache.kafka.common.message.SyncGroupRequestData; import org.apache.kafka.common.message.SyncGroupRequestData.SyncGroupRequestAssignment; import org.apache.kafka.common.message.SyncGroupResponseData; @@ -703,6 +705,17 @@ public void testCreateTopicRequestV3FailsIfNoPartitionsOrReplicas() { assertTrue(exception.getMessage().contains("[foo, bar]")); } + @Test + public void testDeleteGroupsResponseV3PreservesErrorMessage() { + DeleteGroupsResponse response = createDeleteGroupsResponse(); + short version = ApiKeys.DELETE_GROUPS.latestVersion(); + DeleteGroupsResponse parsed = DeleteGroupsResponse.parse(response.serialize(version), version); + DeletableGroupResult failed = parsed.data().results().find("failed-group"); + assertNotNull(failed); + assertEquals(Errors.GROUP_DELETION_FAILED.code(), failed.errorCode()); + assertEquals("plugin offline", failed.errorMessage()); + } + @Test public void testFetchRequestIsolationLevel() { FetchRequest request = createFetchRequest((short) 4, IsolationLevel.READ_COMMITTED); @@ -1076,6 +1089,7 @@ private AbstractRequest getRequest(ApiKeys apikey, short version) { case DESCRIBE_SHARE_GROUP_OFFSETS: return createDescribeShareGroupOffsetsRequest(version); case ALTER_SHARE_GROUP_OFFSETS: return createAlterShareGroupOffsetsRequest(version); case DELETE_SHARE_GROUP_OFFSETS: return createDeleteShareGroupOffsetsRequest(version); + case STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE: return createStreamsGroupTopologyDescriptionUpdateRequest(version); default: throw new IllegalArgumentException("Unknown API key " + apikey); } } @@ -1166,11 +1180,12 @@ private AbstractResponse getResponse(ApiKeys apikey, short version) { case WRITE_SHARE_GROUP_STATE: return createWriteShareGroupStateResponse(); case DELETE_SHARE_GROUP_STATE: return createDeleteShareGroupStateResponse(); case READ_SHARE_GROUP_STATE_SUMMARY: return createReadShareGroupStateSummaryResponse(); - case STREAMS_GROUP_HEARTBEAT: return createStreamsGroupHeartbeatResponse(); - case STREAMS_GROUP_DESCRIBE: return createStreamsGroupDescribeResponse(); + case STREAMS_GROUP_HEARTBEAT: return createStreamsGroupHeartbeatResponse(version); + case STREAMS_GROUP_DESCRIBE: return createStreamsGroupDescribeResponse(version); case DESCRIBE_SHARE_GROUP_OFFSETS: return createDescribeShareGroupOffsetsResponse(); case ALTER_SHARE_GROUP_OFFSETS: return createAlterShareGroupOffsetsResponse(); case DELETE_SHARE_GROUP_OFFSETS: return createDeleteShareGroupOffsetsResponse(); + case STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE: return createStreamsGroupTopologyDescriptionUpdateResponse(); default: throw new IllegalArgumentException("Unknown API key " + apikey); } } @@ -2286,6 +2301,10 @@ private DeleteGroupsResponse createDeleteGroupsResponse() { result.add(new DeletableGroupResult() .setGroupId("test-group") .setErrorCode(Errors.NONE.code())); + result.add(new DeletableGroupResult() + .setGroupId("failed-group") + .setErrorCode(Errors.GROUP_DELETION_FAILED.code()) + .setErrorMessage("plugin offline")); return new DeleteGroupsResponse( new DeleteGroupsResponseData() .setResults(result) @@ -3857,20 +3876,42 @@ private DeleteShareGroupOffsetsResponse createDeleteShareGroupOffsetsResponse() } private AbstractRequest createStreamsGroupDescribeRequest(final short version) { - return new StreamsGroupDescribeRequest.Builder(new StreamsGroupDescribeRequestData() + StreamsGroupDescribeRequestData data = new StreamsGroupDescribeRequestData() .setGroupIds(Collections.singletonList("group")) - .setIncludeAuthorizedOperations(false)).build(version); + .setIncludeAuthorizedOperations(false); + if (version >= 1) { + data.setIncludeTopologyDescription(true); + } + return new StreamsGroupDescribeRequest.Builder(data).build(version); } private AbstractRequest createStreamsGroupHeartbeatRequest(final short version) { return new StreamsGroupHeartbeatRequest.Builder(new StreamsGroupHeartbeatRequestData()).build(version); } - private AbstractResponse createStreamsGroupDescribeResponse() { - StreamsGroupDescribeResponseData data = new StreamsGroupDescribeResponseData() - .setGroups(Collections.singletonList( + private AbstractResponse createStreamsGroupDescribeResponse(final short version) { + StreamsGroupDescribeResponseData.DescribedGroup group = + new StreamsGroupDescribeResponseData.DescribedGroup() + .setGroupId("group") + .setErrorCode((short) 0) + .setErrorMessage(Errors.forCode((short) 0).message()) + .setGroupState("EMPTY") + .setGroupEpoch(0) + .setAssignmentEpoch(0) + .setMembers(new ArrayList<>(0)) + .setTopology(null); + if (version >= 1) { + group.setTopologyDescription(new StreamsGroupDescribeResponseData.TopologyDescription() + .setSubtopologies(new ArrayList<>(0)) + .setGlobalStores(new ArrayList<>(0))); + group.setTopologyDescriptionStatus((byte) 3); + } + List groups = new ArrayList<>(); + groups.add(group); + if (version >= 1) { + StreamsGroupDescribeResponseData.DescribedGroup notStoredGroup = new StreamsGroupDescribeResponseData.DescribedGroup() - .setGroupId("group") + .setGroupId("group-without-description") .setErrorCode((short) 0) .setErrorMessage(Errors.forCode((short) 0).message()) .setGroupState("EMPTY") @@ -3878,13 +3919,82 @@ private AbstractResponse createStreamsGroupDescribeResponse() { .setAssignmentEpoch(0) .setMembers(new ArrayList<>(0)) .setTopology(null) - )) + .setTopologyDescription(null) + .setTopologyDescriptionStatus((byte) 1); + groups.add(notStoredGroup); + } + StreamsGroupDescribeResponseData data = new StreamsGroupDescribeResponseData() + .setGroups(groups) .setThrottleTimeMs(1000); return new StreamsGroupDescribeResponse(data); } - private AbstractResponse createStreamsGroupHeartbeatResponse() { - return new StreamsGroupHeartbeatResponse(new StreamsGroupHeartbeatResponseData()); + private AbstractResponse createStreamsGroupHeartbeatResponse(final short version) { + StreamsGroupHeartbeatResponseData data = new StreamsGroupHeartbeatResponseData(); + if (version >= 1) { + data.setTopologyDescriptionRequired(true); + } + return new StreamsGroupHeartbeatResponse(data); + } + + private AbstractRequest createStreamsGroupTopologyDescriptionUpdateRequest(final short version) { + StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionNode sourceNode = + new StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionNode() + .setName("KSTREAM-SOURCE-0000000000") + .setNodeType((byte) 1) + .setSourceTopics(List.of("input-topic")) + .setSuccessors(List.of("KSTREAM-PROCESSOR-0000000001")); + StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionNode processorNode = + new StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionNode() + .setName("KSTREAM-PROCESSOR-0000000001") + .setNodeType((byte) 2) + .setStores(List.of("store-1")) + .setSuccessors(List.of("KSTREAM-SINK-0000000002")); + StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionNode sinkNode = + new StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionNode() + .setName("KSTREAM-SINK-0000000002") + .setNodeType((byte) 3) + .setSinkTopic("output-topic"); + StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionNode dynamicSinkNode = + new StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionNode() + .setName("KSTREAM-SINK-0000000005") + .setNodeType((byte) 3) + .setSinkTopic(null); + StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionSubtopology subtopology = + new StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionSubtopology() + .setSubtopologyId("0") + .setNodes(List.of(sourceNode, processorNode, sinkNode, dynamicSinkNode)); + StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionNode globalSource = + new StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionNode() + .setName("KSTREAM-GLOBAL-SOURCE-0000000003") + .setNodeType((byte) 1) + .setSourceTopics(List.of("global-topic")); + StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionNode globalProcessor = + new StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionNode() + .setName("KTABLE-SOURCE-0000000004") + .setNodeType((byte) 2) + .setStores(List.of("global-store")); + StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionGlobalStore globalStore = + new StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionGlobalStore() + .setSource(globalSource) + .setProcessor(globalProcessor); + StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescription topology = + new StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescription() + .setSubtopologies(List.of(subtopology)) + .setGlobalStores(List.of(globalStore)); + return new StreamsGroupTopologyDescriptionUpdateRequest.Builder( + new StreamsGroupTopologyDescriptionUpdateRequestData() + .setGroupId("test-group") + .setMemberId("test-member") + .setTopologyEpoch(1) + .setTopologyDescription(topology) + ).build(version); + } + + private AbstractResponse createStreamsGroupTopologyDescriptionUpdateResponse() { + return new StreamsGroupTopologyDescriptionUpdateResponse( + new StreamsGroupTopologyDescriptionUpdateResponseData() + ); } @Test diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index 639328798a0e5..463d1e32ecc78 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -223,7 +223,7 @@ class RequestQuotaTest extends BaseRequestTest { RequestQuotaTest.principal = RequestQuotaTest.UnauthorizedPrincipal val apiKeys = ApiKeys.brokerApis - for (apiKey <- apiKeys.asScala.toSet -- RequestQuotaTest.Envelope) { + for (apiKey <- apiKeys.asScala.toSet -- RequestQuotaTest.Envelope -- RequestQuotaTest.UnimplementedApis) { submitTest(apiKey, () => checkUnauthorizedRequestThrottle(apiKey)) } @@ -231,7 +231,7 @@ class RequestQuotaTest extends BaseRequestTest { } private def clientActions: Set[ApiKeys] = { - ApiKeys.brokerApis.asScala.toSet -- clusterActions -- RequestQuotaTest.SaslActions -- RequestQuotaTest.Envelope + ApiKeys.brokerApis.asScala.toSet -- clusterActions -- RequestQuotaTest.SaslActions -- RequestQuotaTest.Envelope -- RequestQuotaTest.UnimplementedApis } private def clusterActions: Set[ApiKeys] = { @@ -905,6 +905,8 @@ object RequestQuotaTest { val Envelope = Set(ApiKeys.ENVELOPE) val ShareGroupState = Set(ApiKeys.INITIALIZE_SHARE_GROUP_STATE, ApiKeys.READ_SHARE_GROUP_STATE, ApiKeys.WRITE_SHARE_GROUP_STATE, ApiKeys.DELETE_SHARE_GROUP_STATE, ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY) + // APIs whose broker handler has not been wired yet; exclude from quota tests until they land. + val UnimplementedApis: Set[ApiKeys] = Set(ApiKeys.STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE) val UnauthorizedPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "Unauthorized") // Principal used for all client connections. This is modified by tests which diff --git a/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java b/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java index 09602ea498d5e..023ef02a5f693 100644 --- a/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java +++ b/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java @@ -179,6 +179,8 @@ import org.apache.kafka.common.message.StreamsGroupDescribeResponseDataJsonConverter; import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestDataJsonConverter; import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseDataJsonConverter; +import org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateRequestDataJsonConverter; +import org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateResponseDataJsonConverter; import org.apache.kafka.common.message.SyncGroupRequestDataJsonConverter; import org.apache.kafka.common.message.SyncGroupResponseDataJsonConverter; import org.apache.kafka.common.message.TxnOffsetCommitRequestDataJsonConverter; @@ -362,6 +364,8 @@ import org.apache.kafka.common.requests.StreamsGroupDescribeResponse; import org.apache.kafka.common.requests.StreamsGroupHeartbeatRequest; import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse; +import org.apache.kafka.common.requests.StreamsGroupTopologyDescriptionUpdateRequest; +import org.apache.kafka.common.requests.StreamsGroupTopologyDescriptionUpdateResponse; import org.apache.kafka.common.requests.SyncGroupRequest; import org.apache.kafka.common.requests.SyncGroupResponse; import org.apache.kafka.common.requests.TxnOffsetCommitRequest; @@ -564,6 +568,8 @@ public static JsonNode request(AbstractRequest request) { UpdateFeaturesRequestDataJsonConverter.write(((UpdateFeaturesRequest) request).data(), request.version()); case UPDATE_RAFT_VOTER -> UpdateRaftVoterRequestDataJsonConverter.write(((UpdateRaftVoterRequest) request).data(), request.version()); + case STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE -> + StreamsGroupTopologyDescriptionUpdateRequestDataJsonConverter.write(((StreamsGroupTopologyDescriptionUpdateRequest) request).data(), request.version()); case VOTE -> VoteRequestDataJsonConverter.write(((VoteRequest) request).data(), request.version()); case WRITE_SHARE_GROUP_STATE -> WriteShareGroupStateRequestDataJsonConverter.write(((WriteShareGroupStateRequest) request).data(), request.version()); @@ -741,6 +747,8 @@ public static JsonNode response(AbstractResponse response, short version) { UpdateFeaturesResponseDataJsonConverter.write(((UpdateFeaturesResponse) response).data(), version); case UPDATE_RAFT_VOTER -> UpdateRaftVoterResponseDataJsonConverter.write(((UpdateRaftVoterResponse) response).data(), version); + case STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE -> + StreamsGroupTopologyDescriptionUpdateResponseDataJsonConverter.write(((StreamsGroupTopologyDescriptionUpdateResponse) response).data(), version); case VOTE -> VoteResponseDataJsonConverter.write(((VoteResponse) response).data(), version); case WRITE_SHARE_GROUP_STATE -> WriteShareGroupStateResponseDataJsonConverter.write(((WriteShareGroupStateResponse) response).data(), version);