Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;

/**
* Indicates that {@code DeleteGroups} could not complete for the affected group. The
* accompanying error message describes the underlying cause; the caller may retry once
* the underlying condition is resolved.
*/
public class GroupDeletionFailedException extends ApiException {

private static final long serialVersionUID = 1L;

public GroupDeletionFailedException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;

/**
* Indicates that the streams group topology description plugin failed to process
* a StreamsGroupTopologyDescriptionUpdate request. The accompanying error message
* describes the underlying cause; the broker tracks the transient-vs-permanent
* distinction internally and does not reflect it on the wire.
*/
public class StreamsTopologyDescriptionUpdateFailedException extends ApiException {

private static final long serialVersionUID = 1L;

public StreamsTopologyDescriptionUpdateFailedException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ public enum ApiKeys {
STREAMS_GROUP_DESCRIBE(ApiMessageType.STREAMS_GROUP_DESCRIBE),
DESCRIBE_SHARE_GROUP_OFFSETS(ApiMessageType.DESCRIBE_SHARE_GROUP_OFFSETS),
ALTER_SHARE_GROUP_OFFSETS(ApiMessageType.ALTER_SHARE_GROUP_OFFSETS),
DELETE_SHARE_GROUP_OFFSETS(ApiMessageType.DELETE_SHARE_GROUP_OFFSETS);
DELETE_SHARE_GROUP_OFFSETS(ApiMessageType.DELETE_SHARE_GROUP_OFFSETS),
STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE(ApiMessageType.STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE);

private static final Map<ApiMessageType.ListenerType, EnumSet<ApiKeys>> APIS_BY_LISTENER =
new EnumMap<>(ApiMessageType.ListenerType.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.kafka.common.errors.FetchSessionIdNotFoundException;
import org.apache.kafka.common.errors.FetchSessionTopicIdException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.GroupDeletionFailedException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
import org.apache.kafka.common.errors.GroupNotEmptyException;
Expand Down Expand Up @@ -122,6 +123,7 @@
import org.apache.kafka.common.errors.StaleMemberEpochException;
import org.apache.kafka.common.errors.StreamsInvalidTopologyEpochException;
import org.apache.kafka.common.errors.StreamsInvalidTopologyException;
import org.apache.kafka.common.errors.StreamsTopologyDescriptionUpdateFailedException;
import org.apache.kafka.common.errors.StreamsTopologyFencedException;
import org.apache.kafka.common.errors.TelemetryTooLargeException;
import org.apache.kafka.common.errors.ThrottlingQuotaExceededException;
Expand Down Expand Up @@ -418,7 +420,9 @@ public enum Errors {
STREAMS_INVALID_TOPOLOGY(130, "The supplied topology is invalid.", StreamsInvalidTopologyException::new),
STREAMS_INVALID_TOPOLOGY_EPOCH(131, "The supplied topology epoch is invalid.", StreamsInvalidTopologyEpochException::new),
STREAMS_TOPOLOGY_FENCED(132, "The supplied topology epoch is outdated.", StreamsTopologyFencedException::new),
SHARE_SESSION_LIMIT_REACHED(133, "The limit of share sessions has been reached.", ShareSessionLimitReachedException::new);
SHARE_SESSION_LIMIT_REACHED(133, "The limit of share sessions has been reached.", ShareSessionLimitReachedException::new),
GROUP_DELETION_FAILED(134, "DeleteGroups could not complete; see the error message on the per-group result for details.", GroupDeletionFailedException::new),
STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED(135, "The broker could not process the topology description update; see the error message for details.", StreamsTopologyDescriptionUpdateFailedException::new);

private static final Logger log = LoggerFactory.getLogger(Errors.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,8 @@ private static AbstractRequest doParseRequest(ApiKeys apiKey, short apiVersion,
return AlterShareGroupOffsetsRequest.parse(readable, apiVersion);
case DELETE_SHARE_GROUP_OFFSETS:
return DeleteShareGroupOffsetsRequest.parse(readable, apiVersion);
case STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE:
return StreamsGroupTopologyDescriptionUpdateRequest.parse(readable, apiVersion);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " +
"code should be updated to do so.", apiKey));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,8 @@ public static AbstractResponse parseResponse(ApiKeys apiKey, Readable readable,
return AlterShareGroupOffsetsResponse.parse(readable, version);
case DELETE_SHARE_GROUP_OFFSETS:
return DeleteShareGroupOffsetsResponse.parse(readable, version);
case STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE:
return StreamsGroupTopologyDescriptionUpdateResponse.parse(readable, version);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " +
"code should be updated to do so.", apiKey));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,19 @@ public StreamsGroupDescribeRequest(StreamsGroupDescribeRequestData data, short v
public StreamsGroupDescribeResponse getErrorResponse(int throttleTimeMs, Throwable e) {
StreamsGroupDescribeResponseData data = new StreamsGroupDescribeResponseData()
.setThrottleTimeMs(throttleTimeMs);
// Set error for each group
short errorCode = Errors.forException(e).code();
boolean topologyRequested = this.data.includeTopologyDescription();
this.data.groupIds().forEach(
groupId -> data.groups().add(
new StreamsGroupDescribeResponseData.DescribedGroup()
.setGroupId(groupId)
.setErrorCode(Errors.forException(e).code())
)
groupId -> {
StreamsGroupDescribeResponseData.DescribedGroup group =
new StreamsGroupDescribeResponseData.DescribedGroup()
.setGroupId(groupId)
.setErrorCode(errorCode);
if (topologyRequested) {
group.setTopologyDescriptionStatus((byte) 2); // ERROR
}
data.groups().add(group);
}
);
return new StreamsGroupDescribeResponse(data);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,15 @@
* - {@link Errors#INVALID_GROUP_ID}
* - {@link Errors#GROUP_ID_NOT_FOUND}
* - {@link Errors#TOPIC_AUTHORIZATION_FAILED}
*
* <p>TopologyDescription invariant (v1+): the {@code TopologyDescription} field is non-null
* if and only if {@code TopologyDescriptionStatus} is {@code AVAILABLE} (3). The broker MUST
* set the status to {@code AVAILABLE} whenever it attaches a {@code TopologyDescription},
* and leave {@code TopologyDescription} null for any other status value.
*
* <p>Note: {@code TopologyDescriptionStatus == ERROR} (2) is a data-level signal, not a
* protocol error, and is intentionally not reflected in {@link #errorCounts()}. Operators
* tracking topology-description failures should monitor the status field directly.
*/
public class StreamsGroupDescribeResponse extends AbstractResponse {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;

import org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateRequestData;
import org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Readable;

/**
* Sent by a Streams client to push its topology description to the broker, in response
* to {@code TopologyDescriptionRequired=true} on a {@code StreamsGroupHeartbeatResponse}.
* The broker validates that {@code MemberId} still belongs to the group, checks the
* {@code TopologyEpoch} against the group's current epoch, and persists the description.
* See KIP-1331.
*
* <p>Legal error codes are documented on {@link StreamsGroupTopologyDescriptionUpdateResponse}.
*/
public class StreamsGroupTopologyDescriptionUpdateRequest extends AbstractRequest {

public static class Builder extends AbstractRequest.Builder<StreamsGroupTopologyDescriptionUpdateRequest> {
private final StreamsGroupTopologyDescriptionUpdateRequestData data;

public Builder(StreamsGroupTopologyDescriptionUpdateRequestData data) {
super(ApiKeys.STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE);
this.data = data;
}

@Override
public StreamsGroupTopologyDescriptionUpdateRequest build(short version) {
return new StreamsGroupTopologyDescriptionUpdateRequest(data, version);
}

@Override
public String toString() {
return data.toString();
}
}

private final StreamsGroupTopologyDescriptionUpdateRequestData data;

public StreamsGroupTopologyDescriptionUpdateRequest(StreamsGroupTopologyDescriptionUpdateRequestData data, short version) {
super(ApiKeys.STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE, version);
this.data = data;
}

@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
ApiError apiError = ApiError.fromThrowable(e);
return new StreamsGroupTopologyDescriptionUpdateResponse(
new StreamsGroupTopologyDescriptionUpdateResponseData()
.setThrottleTimeMs(throttleTimeMs)
.setErrorCode(apiError.error().code())
.setErrorMessage(apiError.message())
);
}

@Override
public StreamsGroupTopologyDescriptionUpdateRequestData data() {
return data;
}

public static StreamsGroupTopologyDescriptionUpdateRequest parse(Readable readable, short version) {
return new StreamsGroupTopologyDescriptionUpdateRequest(
new StreamsGroupTopologyDescriptionUpdateRequestData(readable, version), version);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;

import org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.Readable;

import java.util.Map;

/**
* Possible error codes.
*
* - {@link Errors#GROUP_AUTHORIZATION_FAILED}
* - {@link Errors#NOT_COORDINATOR}
* - {@link Errors#COORDINATOR_NOT_AVAILABLE}
* - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS}
* - {@link Errors#INVALID_REQUEST}
* - {@link Errors#UNSUPPORTED_VERSION}
* - {@link Errors#UNKNOWN_MEMBER_ID}
* - {@link Errors#GROUP_ID_NOT_FOUND}
* - {@link Errors#STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED}
*/
public class StreamsGroupTopologyDescriptionUpdateResponse extends AbstractResponse {

private final StreamsGroupTopologyDescriptionUpdateResponseData data;

public StreamsGroupTopologyDescriptionUpdateResponse(StreamsGroupTopologyDescriptionUpdateResponseData data) {
super(ApiKeys.STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE);
this.data = data;
}

@Override
public StreamsGroupTopologyDescriptionUpdateResponseData data() {
return data;
}

@Override
public Map<Errors, Integer> errorCounts() {
return errorCounts(Errors.forCode(data.errorCode()));
}

@Override
public int throttleTimeMs() {
return data.throttleTimeMs();
}

@Override
public void maybeSetThrottleTimeMs(int throttleTimeMs) {
data.setThrottleTimeMs(throttleTimeMs);
}

public static StreamsGroupTopologyDescriptionUpdateResponse parse(Readable readable, short version) {
return new StreamsGroupTopologyDescriptionUpdateResponse(
new StreamsGroupTopologyDescriptionUpdateResponseData(readable, version));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@
// Version 1 is the same as version 0.
//
// Version 2 is the first flexible version.
"validVersions": "0-2",
//
// Version 3 corresponds to the addition of an ErrorMessage field on each per-group
// result in the response (populated when the broker needs to surface a cause to the
// caller). The request body shape is unchanged at version 3.
"validVersions": "0-3",
"flexibleVersions": "2+",
"fields": [
{ "name": "GroupsNames", "type": "[]string", "versions": "0+", "entityType": "groupId",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
// Starting in version 1, on quota violation, brokers send out responses before throttling.
//
// Version 2 is the first flexible version.
"validVersions": "0-2",
//
// Version 3 adds the per-group ErrorMessage field so the broker can surface a cause
// string when GROUP_DELETION_FAILED (or any other non-NONE error) is returned for a group.
"validVersions": "0-3",
"flexibleVersions": "2+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
Expand All @@ -30,7 +33,9 @@
{ "name": "GroupId", "type": "string", "versions": "0+", "mapKey": true, "entityType": "groupId",
"about": "The group id." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The deletion error, or 0 if the deletion succeeded." }
"about": "The deletion error, or 0 if the deletion succeeded." },
{ "name": "ErrorMessage", "type": "string", "versions": "3+", "nullableVersions": "3+", "ignorable": true, "default": "null",
"about": "The error message, or null if there was no error." }
]}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@
"type": "request",
"listeners": ["broker"],
"name": "StreamsGroupDescribeRequest",
"validVersions": "0",
// Version 1 adds IncludeTopologyDescription (KIP-1331).
"validVersions": "0-1",
"flexibleVersions": "0+",
"fields": [
{ "name": "GroupIds", "type": "[]string", "versions": "0+", "entityType": "groupId",
"about": "The ids of the groups to describe" },
{ "name": "IncludeAuthorizedOperations", "type": "bool", "versions": "0+",
"about": "Whether to include authorized operations." }
"about": "Whether to include authorized operations." },
{ "name": "IncludeTopologyDescription", "type": "bool", "versions": "1+", "default": "false",
"about": "Whether to include the full topology description from the topology description plugin in the response." }
]
}
Loading