Skip to content

Commit 6ca1370

Browse files
aliehsaeediiclaude
andcommitted
KAFKA-20620: Address review feedback on KIP-1331 RPC scaffolding
- Bump StreamsGroupHeartbeat request and response to v1 so the new TopologyDescriptionRequired field is actually negotiated. - Mark StreamsGroupTopologyDescriptionUpdate as latestVersionUnstable until the broker handler lands; exclude from RequestQuotaTest via a new UnimplementedApis set. - Fix StreamsGroupDescribeRequest.getErrorResponse to set TopologyDescriptionStatus=ERROR when the client requested topology; previously defaulted to NOT_REQUESTED, contradicting KIP-1331. - Switch the Update RPC getErrorResponse to ApiError.fromThrowable so cause messages survive while UNKNOWN_SERVER_ERROR is sanitized. - Reword "plugin" out of client-visible strings (Errors.java default message, JSON about fields). - Add lifecycle Javadoc to the new Request class (drops duplicated error code list) and error-code Javadoc to the new Response class; document the TopologyDescription/Status biconditional invariant and the errorCounts() vs Status=ERROR caveat on DescribeResponse. - Add "Supported errors" comment block to the new UpdateResponse JSON. - Strengthen tests: nullable SinkTopic round-trip, TopologyDescription null/NOT_STORED coverage, explicit assertion on DeleteGroups v3 per-group ErrorMessage. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 29875af commit 6ca1370

12 files changed

Lines changed: 105 additions & 21 deletions

clients/src/main/java/org/apache/kafka/common/protocol/Errors.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -422,7 +422,7 @@ public enum Errors {
422422
STREAMS_TOPOLOGY_FENCED(132, "The supplied topology epoch is outdated.", StreamsTopologyFencedException::new),
423423
SHARE_SESSION_LIMIT_REACHED(133, "The limit of share sessions has been reached.", ShareSessionLimitReachedException::new),
424424
GROUP_DELETION_FAILED(134, "DeleteGroups could not complete; see the error message on the per-group result for details.", GroupDeletionFailedException::new),
425-
STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED(135, "The topology description plugin failed to process the request.", StreamsTopologyDescriptionUpdateFailedException::new);
425+
STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED(135, "The broker could not process the topology description update; see the error message for details.", StreamsTopologyDescriptionUpdateFailedException::new);
426426

427427
private static final Logger log = LoggerFactory.getLogger(Errors.class);
428428

clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeRequest.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,13 +58,19 @@ public StreamsGroupDescribeRequest(StreamsGroupDescribeRequestData data, short v
5858
public StreamsGroupDescribeResponse getErrorResponse(int throttleTimeMs, Throwable e) {
5959
StreamsGroupDescribeResponseData data = new StreamsGroupDescribeResponseData()
6060
.setThrottleTimeMs(throttleTimeMs);
61-
// Set error for each group
61+
short errorCode = Errors.forException(e).code();
62+
boolean topologyRequested = this.data.includeTopologyDescription();
6263
this.data.groupIds().forEach(
63-
groupId -> data.groups().add(
64-
new StreamsGroupDescribeResponseData.DescribedGroup()
65-
.setGroupId(groupId)
66-
.setErrorCode(Errors.forException(e).code())
67-
)
64+
groupId -> {
65+
StreamsGroupDescribeResponseData.DescribedGroup group =
66+
new StreamsGroupDescribeResponseData.DescribedGroup()
67+
.setGroupId(groupId)
68+
.setErrorCode(errorCode);
69+
if (topologyRequested) {
70+
group.setTopologyDescriptionStatus((byte) 2); // ERROR
71+
}
72+
data.groups().add(group);
73+
}
6874
);
6975
return new StreamsGroupDescribeResponse(data);
7076
}

clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeResponse.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,15 @@
3535
* - {@link Errors#INVALID_GROUP_ID}
3636
* - {@link Errors#GROUP_ID_NOT_FOUND}
3737
* - {@link Errors#TOPIC_AUTHORIZATION_FAILED}
38+
*
39+
* <p>TopologyDescription invariant (v1+): the {@code TopologyDescription} field is non-null
40+
* if and only if {@code TopologyDescriptionStatus} is {@code AVAILABLE} (3). The broker MUST
41+
* set the status to {@code AVAILABLE} whenever it attaches a {@code TopologyDescription},
42+
* and leave {@code TopologyDescription} null for any other status value.
43+
*
44+
* <p>Note: {@code TopologyDescriptionStatus == ERROR} (2) is a data-level signal, not a
45+
* protocol error, and is intentionally not reflected in {@link #errorCounts()}. Operators
46+
* tracking topology-description failures should monitor the status field directly.
3847
*/
3948
public class StreamsGroupDescribeResponse extends AbstractResponse {
4049

clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupTopologyDescriptionUpdateRequest.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,17 @@
1919
import org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateRequestData;
2020
import org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateResponseData;
2121
import org.apache.kafka.common.protocol.ApiKeys;
22-
import org.apache.kafka.common.protocol.Errors;
2322
import org.apache.kafka.common.protocol.Readable;
2423

24+
/**
25+
* Sent by a Streams client to push its topology description to the broker, in response
26+
* to {@code TopologyDescriptionRequired=true} on a {@code StreamsGroupHeartbeatResponse}.
27+
* The broker validates that {@code MemberId} still belongs to the group, checks the
28+
* {@code TopologyEpoch} against the group's current epoch, and persists the description.
29+
* See KIP-1331.
30+
*
31+
* <p>Legal error codes are documented on {@link StreamsGroupTopologyDescriptionUpdateResponse}.
32+
*/
2533
public class StreamsGroupTopologyDescriptionUpdateRequest extends AbstractRequest {
2634

2735
public static class Builder extends AbstractRequest.Builder<StreamsGroupTopologyDescriptionUpdateRequest> {
@@ -52,11 +60,12 @@ public StreamsGroupTopologyDescriptionUpdateRequest(StreamsGroupTopologyDescript
5260

5361
@Override
5462
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
63+
ApiError apiError = ApiError.fromThrowable(e);
5564
return new StreamsGroupTopologyDescriptionUpdateResponse(
5665
new StreamsGroupTopologyDescriptionUpdateResponseData()
5766
.setThrottleTimeMs(throttleTimeMs)
58-
.setErrorCode(Errors.forException(e).code())
59-
.setErrorMessage(Errors.forException(e).message())
67+
.setErrorCode(apiError.error().code())
68+
.setErrorMessage(apiError.message())
6069
);
6170
}
6271

clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupTopologyDescriptionUpdateResponse.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,19 @@
2323

2424
import java.util.Map;
2525

26+
/**
27+
* Possible error codes.
28+
*
29+
* - {@link Errors#GROUP_AUTHORIZATION_FAILED}
30+
* - {@link Errors#NOT_COORDINATOR}
31+
* - {@link Errors#COORDINATOR_NOT_AVAILABLE}
32+
* - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS}
33+
* - {@link Errors#INVALID_REQUEST}
34+
* - {@link Errors#UNSUPPORTED_VERSION}
35+
* - {@link Errors#UNKNOWN_MEMBER_ID}
36+
* - {@link Errors#GROUP_ID_NOT_FOUND}
37+
* - {@link Errors#STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED}
38+
*/
2639
public class StreamsGroupTopologyDescriptionUpdateResponse extends AbstractResponse {
2740

2841
private final StreamsGroupTopologyDescriptionUpdateResponseData data;

clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,9 +110,9 @@
110110
"about": "32-bit bitfield to represent authorized operations for this group." },
111111
{ "name": "TopologyDescription", "type": "TopologyDescription", "versions": "1+",
112112
"nullableVersions": "1+", "default": "null",
113-
"about": "Full topology description retrieved from the topology description plugin. Null if the client did not request it, no plugin is configured, no description is stored for the group, or retrieval failed." },
113+
"about": "The full topology description for this group. Non-null if and only if TopologyDescriptionStatus is AVAILABLE (3); null otherwise." },
114114
{ "name": "TopologyDescriptionStatus", "type": "int8", "versions": "1+", "default": "0",
115-
"about": "The status of the topology description for this group: 0=NOT_REQUESTED (client did not set IncludeTopologyDescription), 1=NOT_STORED (no topology description has been recorded for this group), 2=ERROR (the broker failed to fetch the topology description; check broker logs), 3=AVAILABLE (a topology description is present in the TopologyDescription field). The broker MUST set this field to AVAILABLE whenever it attaches a TopologyDescription." }
115+
"about": "The status of the topology description for this group, paired with TopologyDescription: 0=NOT_REQUESTED (client did not set IncludeTopologyDescription; TopologyDescription is null); 1=NOT_STORED (no description recorded for this group; TopologyDescription is null); 2=ERROR (broker failed to fetch the description, see broker logs; TopologyDescription is null); 3=AVAILABLE (TopologyDescription is non-null and carries the description)." }
116116
]
117117
}
118118
],

clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
"type": "request",
1919
"listeners": ["broker"],
2020
"name": "StreamsGroupHeartbeatRequest",
21-
"validVersions": "0",
21+
// Version 1 is the same as version 0; bumped together with StreamsGroupHeartbeatResponse v1,
22+
// which adds TopologyDescriptionRequired (KIP-1331). Required so the response v1 is negotiated.
23+
"validVersions": "0-1",
2224
"flexibleVersions": "0+",
2325
"fields": [
2426
{ "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",

clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@
6666
"about": "Assigned warm-up tasks for this client. Null if unchanged since last heartbeat." },
6767

6868
{ "name": "TopologyDescriptionRequired", "type": "bool", "versions": "1+", "default": "false",
69-
"about": "True if the broker's topology description plugin does not have an up-to-date topology description for this group. The client should send the topology description via StreamsGroupTopologyDescriptionUpdate." },
69+
"about": "True if the broker does not have an up-to-date topology description for this group. The client should send the topology description via StreamsGroupTopologyDescriptionUpdate." },
7070

7171
// IQ-related information
7272
{ "name": "EndpointInformationEpoch", "type": "int32", "versions": "0+",

clients/src/main/resources/common/message/StreamsGroupTopologyDescriptionUpdateRequest.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
"type": "request",
1919
"listeners": ["broker"],
2020
"name": "StreamsGroupTopologyDescriptionUpdateRequest",
21+
// The broker handler is not yet implemented (deferred to a later sub-task of KIP-1331),
22+
// so the latest version is marked unstable to suppress ApiVersions advertisement.
23+
"latestVersionUnstable": true,
2124
"validVersions": "0",
2225
"flexibleVersions": "0+",
2326
"fields": [

clients/src/main/resources/common/message/StreamsGroupTopologyDescriptionUpdateResponse.json

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,16 @@
1919
"name": "StreamsGroupTopologyDescriptionUpdateResponse",
2020
"validVersions": "0",
2121
"flexibleVersions": "0+",
22+
// Supported errors:
23+
// - GROUP_AUTHORIZATION_FAILED (version 0+)
24+
// - NOT_COORDINATOR (version 0+)
25+
// - COORDINATOR_NOT_AVAILABLE (version 0+)
26+
// - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
27+
// - INVALID_REQUEST (version 0+)
28+
// - UNSUPPORTED_VERSION (version 0+)
29+
// - UNKNOWN_MEMBER_ID (version 0+)
30+
// - GROUP_ID_NOT_FOUND (version 0+)
31+
// - STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED (version 0+)
2232
"fields": [
2333
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
2434
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },

0 commit comments

Comments
 (0)