Skip to content
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;

/**
* Indicates that {@code DeleteGroups} could not complete for the affected group. The
* accompanying error message describes the underlying cause; the caller may retry once
* the underlying condition is resolved.
*/
public class GroupDeletionFailedException extends ApiException {

private static final long serialVersionUID = 1L;

public GroupDeletionFailedException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;

/**
* Indicates that the streams group topology description plugin failed to process
* a StreamsGroupTopologyDescriptionUpdate request. The accompanying error message
* describes the underlying cause; the broker tracks the transient-vs-permanent
* distinction internally and does not reflect it on the wire.
*/
public class StreamsTopologyDescriptionUpdateFailedException extends ApiException {

private static final long serialVersionUID = 1L;

public StreamsTopologyDescriptionUpdateFailedException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ public enum ApiKeys {
STREAMS_GROUP_DESCRIBE(ApiMessageType.STREAMS_GROUP_DESCRIBE),
DESCRIBE_SHARE_GROUP_OFFSETS(ApiMessageType.DESCRIBE_SHARE_GROUP_OFFSETS),
ALTER_SHARE_GROUP_OFFSETS(ApiMessageType.ALTER_SHARE_GROUP_OFFSETS),
DELETE_SHARE_GROUP_OFFSETS(ApiMessageType.DELETE_SHARE_GROUP_OFFSETS);
DELETE_SHARE_GROUP_OFFSETS(ApiMessageType.DELETE_SHARE_GROUP_OFFSETS),
STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE(ApiMessageType.STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE);

private static final Map<ApiMessageType.ListenerType, EnumSet<ApiKeys>> APIS_BY_LISTENER =
new EnumMap<>(ApiMessageType.ListenerType.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.kafka.common.errors.FetchSessionIdNotFoundException;
import org.apache.kafka.common.errors.FetchSessionTopicIdException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.GroupDeletionFailedException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
import org.apache.kafka.common.errors.GroupNotEmptyException;
Expand Down Expand Up @@ -122,6 +123,7 @@
import org.apache.kafka.common.errors.StaleMemberEpochException;
import org.apache.kafka.common.errors.StreamsInvalidTopologyEpochException;
import org.apache.kafka.common.errors.StreamsInvalidTopologyException;
import org.apache.kafka.common.errors.StreamsTopologyDescriptionUpdateFailedException;
import org.apache.kafka.common.errors.StreamsTopologyFencedException;
import org.apache.kafka.common.errors.TelemetryTooLargeException;
import org.apache.kafka.common.errors.ThrottlingQuotaExceededException;
Expand Down Expand Up @@ -418,7 +420,9 @@ public enum Errors {
STREAMS_INVALID_TOPOLOGY(130, "The supplied topology is invalid.", StreamsInvalidTopologyException::new),
STREAMS_INVALID_TOPOLOGY_EPOCH(131, "The supplied topology epoch is invalid.", StreamsInvalidTopologyEpochException::new),
STREAMS_TOPOLOGY_FENCED(132, "The supplied topology epoch is outdated.", StreamsTopologyFencedException::new),
SHARE_SESSION_LIMIT_REACHED(133, "The limit of share sessions has been reached.", ShareSessionLimitReachedException::new);
SHARE_SESSION_LIMIT_REACHED(133, "The limit of share sessions has been reached.", ShareSessionLimitReachedException::new),
GROUP_DELETION_FAILED(134, "DeleteGroups could not complete; see the error message on the per-group result for details.", GroupDeletionFailedException::new),
STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED(135, "The topology description plugin failed to process the request.", StreamsTopologyDescriptionUpdateFailedException::new);

private static final Logger log = LoggerFactory.getLogger(Errors.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,8 @@ private static AbstractRequest doParseRequest(ApiKeys apiKey, short apiVersion,
return AlterShareGroupOffsetsRequest.parse(readable, apiVersion);
case DELETE_SHARE_GROUP_OFFSETS:
return DeleteShareGroupOffsetsRequest.parse(readable, apiVersion);
case STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE:
return StreamsGroupTopologyDescriptionUpdateRequest.parse(readable, apiVersion);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " +
"code should be updated to do so.", apiKey));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,8 @@ public static AbstractResponse parseResponse(ApiKeys apiKey, Readable readable,
return AlterShareGroupOffsetsResponse.parse(readable, version);
case DELETE_SHARE_GROUP_OFFSETS:
return DeleteShareGroupOffsetsResponse.parse(readable, version);
case STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE:
return StreamsGroupTopologyDescriptionUpdateResponse.parse(readable, version);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " +
"code should be updated to do so.", apiKey));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;

import org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateRequestData;
import org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.Readable;

public class StreamsGroupTopologyDescriptionUpdateRequest extends AbstractRequest {

public static class Builder extends AbstractRequest.Builder<StreamsGroupTopologyDescriptionUpdateRequest> {
private final StreamsGroupTopologyDescriptionUpdateRequestData data;

public Builder(StreamsGroupTopologyDescriptionUpdateRequestData data) {
super(ApiKeys.STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE);
this.data = data;
}

@Override
public StreamsGroupTopologyDescriptionUpdateRequest build(short version) {
return new StreamsGroupTopologyDescriptionUpdateRequest(data, version);
}

@Override
public String toString() {
return data.toString();
}
}

private final StreamsGroupTopologyDescriptionUpdateRequestData data;

public StreamsGroupTopologyDescriptionUpdateRequest(StreamsGroupTopologyDescriptionUpdateRequestData data, short version) {
super(ApiKeys.STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE, version);
this.data = data;
}

@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
return new StreamsGroupTopologyDescriptionUpdateResponse(
new StreamsGroupTopologyDescriptionUpdateResponseData()
.setThrottleTimeMs(throttleTimeMs)
.setErrorCode(Errors.forException(e).code())
.setErrorMessage(Errors.forException(e).message())
);
}

@Override
public StreamsGroupTopologyDescriptionUpdateRequestData data() {
return data;
}

public static StreamsGroupTopologyDescriptionUpdateRequest parse(Readable readable, short version) {
return new StreamsGroupTopologyDescriptionUpdateRequest(
new StreamsGroupTopologyDescriptionUpdateRequestData(readable, version), version);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;

import org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.Readable;

import java.util.Map;

public class StreamsGroupTopologyDescriptionUpdateResponse extends AbstractResponse {

private final StreamsGroupTopologyDescriptionUpdateResponseData data;

public StreamsGroupTopologyDescriptionUpdateResponse(StreamsGroupTopologyDescriptionUpdateResponseData data) {
super(ApiKeys.STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE);
this.data = data;
}

@Override
public StreamsGroupTopologyDescriptionUpdateResponseData data() {
return data;
}

@Override
public Map<Errors, Integer> errorCounts() {
return errorCounts(Errors.forCode(data.errorCode()));
}

@Override
public int throttleTimeMs() {
return data.throttleTimeMs();
}

@Override
public void maybeSetThrottleTimeMs(int throttleTimeMs) {
data.setThrottleTimeMs(throttleTimeMs);
}

public static StreamsGroupTopologyDescriptionUpdateResponse parse(Readable readable, short version) {
return new StreamsGroupTopologyDescriptionUpdateResponse(
new StreamsGroupTopologyDescriptionUpdateResponseData(readable, version));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@
// Version 1 is the same as version 0.
//
// Version 2 is the first flexible version.
"validVersions": "0-2",
//
// Version 3 corresponds to the addition of an ErrorMessage field on each per-group
// result in the response (populated when the broker needs to surface a cause to the
// caller). The request body shape is unchanged at version 3.
"validVersions": "0-3",
"flexibleVersions": "2+",
"fields": [
{ "name": "GroupsNames", "type": "[]string", "versions": "0+", "entityType": "groupId",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
// Starting in version 1, on quota violation, brokers send out responses before throttling.
//
// Version 2 is the first flexible version.
"validVersions": "0-2",
//
// Version 3 adds the per-group ErrorMessage field so the broker can surface a cause
// string when GROUP_DELETION_FAILED (or any other non-NONE error) is returned for a group.
"validVersions": "0-3",
"flexibleVersions": "2+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
Expand All @@ -30,7 +33,9 @@
{ "name": "GroupId", "type": "string", "versions": "0+", "mapKey": true, "entityType": "groupId",
"about": "The group id." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The deletion error, or 0 if the deletion succeeded." }
"about": "The deletion error, or 0 if the deletion succeeded." },
{ "name": "ErrorMessage", "type": "string", "versions": "3+", "nullableVersions": "3+", "ignorable": true, "default": "null",
"about": "The error message, or null if there was no error." }
]}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
{ "name": "GroupIds", "type": "[]string", "versions": "0+", "entityType": "groupId",
"about": "The ids of the groups to describe" },
{ "name": "IncludeAuthorizedOperations", "type": "bool", "versions": "0+",
"about": "Whether to include authorized operations." }
"about": "Whether to include authorized operations." },
{ "name": "IncludeTopologyDescription", "type": "bool", "versions": "0+",
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The KIP states that we use a request version bump instead of a tagged field

"tag": 0, "taggedVersions": "0+", "default": "false",
"about": "Whether to include the full topology description from the topology description plugin in the response." }
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,13 @@
"about": "True for classic members that have not been upgraded yet." }
]},
{ "name": "AuthorizedOperations", "type": "int32", "versions": "0+", "default": "-2147483648",
"about": "32-bit bitfield to represent authorized operations for this group." }
"about": "32-bit bitfield to represent authorized operations for this group." },
{ "name": "TopologyDescription", "type": "TopologyDescription", "versions": "0+",
"nullableVersions": "0+", "tag": 0, "taggedVersions": "0+", "default": "null",
"about": "Full topology description retrieved from the topology description plugin. Null if the client did not request it, no plugin is configured, no description is stored for the group, or retrieval failed." },
{ "name": "TopologyDescriptionStatus", "type": "int8", "versions": "0+",
"tag": 1, "taggedVersions": "0+", "default": "0",
"about": "The status of the topology description for this group: 0=NOT_REQUESTED (client did not set IncludeTopologyDescription), 1=NOT_STORED (no topology description has been recorded for this group), 2=ERROR (the broker failed to fetch the topology description; check broker logs), 3=AVAILABLE (a topology description is present in the TopologyDescription field). The broker MUST set this field to AVAILABLE whenever it attaches a TopologyDescription." }
]
}
],
Expand Down Expand Up @@ -155,6 +161,39 @@
{ "name": "TopicConfigs", "type": "[]KeyValue", "versions": "0+",
"about": "Topic-level configurations as key-value pairs."
}
]},
{ "name": "TopologyDescription", "versions": "0+", "fields": [
{ "name": "Subtopologies", "type": "[]TopologyDescriptionSubtopology", "versions": "0+",
"about": "The subtopologies that make up this topology." },
{ "name": "GlobalStores", "type": "[]TopologyDescriptionGlobalStore", "versions": "0+",
"about": "Global state stores used by this topology." }
]},
{ "name": "TopologyDescriptionSubtopology", "versions": "0+", "fields": [
{ "name": "SubtopologyId", "type": "string", "versions": "0+",
"about": "The subtopology identifier, unique within the topology." },
{ "name": "Nodes", "type": "[]TopologyDescriptionNode", "versions": "0+",
"about": "The processing nodes in this subtopology." }
]},
{ "name": "TopologyDescriptionNode", "versions": "0+", "fields": [
{ "name": "Name", "type": "string", "versions": "0+",
"about": "The name of this node (e.g., KSTREAM-SOURCE-0000000000)." },
{ "name": "NodeType", "type": "int8", "versions": "0+",
"about": "The type of this node: 1=SOURCE, 2=PROCESSOR, 3=SINK." },
{ "name": "SourceTopics", "type": "[]string", "versions": "0+", "entityType": "topicName",
"about": "The source topics this node reads from. Defined only for source nodes, may be empty if source topics are dynamically determined." },
{ "name": "SinkTopic", "type": "string", "versions": "0+", "entityType": "topicName",
"nullableVersions": "0+", "default": "null",
"about": "The topic this node writes to. Defined only for sink nodes, may be null if sink topic is dynamically determined." },
{ "name": "Stores", "type": "[]string", "versions": "0+",
"about": "The state store names accessed by this node. Defined only for processor nodes." },
{ "name": "Successors", "type": "[]string", "versions": "0+",
"about": "The names of successor nodes in the processing graph. Predecessor relationships are reconstructed from this field." }
]},
{ "name": "TopologyDescriptionGlobalStore", "versions": "0+", "fields": [
{ "name": "Source", "type": "TopologyDescriptionNode", "versions": "0+",
"about": "The source node providing data to the global store." },
{ "name": "Processor", "type": "TopologyDescriptionNode", "versions": "0+",
"about": "The processor node that populates the global store." }
]}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@
{ "name": "WarmupTasks", "type": "[]TaskIds", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "Assigned warm-up tasks for this client. Null if unchanged since last heartbeat." },

{ "name": "TopologyDescriptionRequired", "type": "bool", "versions": "0+",
"tag": 0, "taggedVersions": "0+", "default": "false",
"about": "True if the broker's topology description plugin does not have an up-to-date topology description for this group. The client should send the topology description via StreamsGroupTopologyDescriptionUpdate." },

// IQ-related information
{ "name": "EndpointInformationEpoch", "type": "int32", "versions": "0+",
"about": "The endpoint epoch set in the response"},
Expand Down
Loading