Skip to content

KAFKA-19057: Stabilize KIP-932 RPCs for AK 4.1 #19378

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Apr 22, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,7 @@
@ClusterConfigProperty(key = "share.coordinator.state.topic.num.partitions", value = "3"),
@ClusterConfigProperty(key = "share.coordinator.state.topic.replication.factor", value = "1"),
@ClusterConfigProperty(key = "transaction.state.log.min.isr", value = "1"),
@ClusterConfigProperty(key = "transaction.state.log.replication.factor", value = "1"),
@ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true")
@ClusterConfigProperty(key = "transaction.state.log.replication.factor", value = "1")
},
types = {Type.KRAFT}
)
Expand Down Expand Up @@ -1881,8 +1880,7 @@ public void testShareAutoOffsetResetByDurationInvalidFormat() throws Exception {
@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "3"),
@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "3"),
@ClusterConfigProperty(key = "share.coordinator.state.topic.num.partitions", value = "3"),
@ClusterConfigProperty(key = "share.coordinator.state.topic.replication.factor", value = "3"),
@ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true")
@ClusterConfigProperty(key = "share.coordinator.state.topic.replication.factor", value = "3")
}
)
@Timeout(90)
Expand Down Expand Up @@ -2033,8 +2031,7 @@ public void testShareConsumerAfterCoordinatorMovement() throws Exception {
@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "3"),
@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "3"),
@ClusterConfigProperty(key = "share.coordinator.state.topic.num.partitions", value = "3"),
@ClusterConfigProperty(key = "share.coordinator.state.topic.replication.factor", value = "3"),
@ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true")
@ClusterConfigProperty(key = "share.coordinator.state.topic.replication.factor", value = "3")
}
)
@Timeout(150)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public ShareGroupDescribeRequest.Builder buildBatchedRequest(int coordinatorId,
ShareGroupDescribeRequestData data = new ShareGroupDescribeRequestData()
.setGroupIds(groupIds)
.setIncludeAuthorizedOperations(includeAuthorizedOperations);
return new ShareGroupDescribeRequest.Builder(data, true);
return new ShareGroupDescribeRequest.Builder(data);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public DescribeShareGroupOffsetsRequest.Builder buildBatchedRequest(int coordina
});
DescribeShareGroupOffsetsRequestData data = new DescribeShareGroupOffsetsRequestData()
.setGroups(groups);
return new DescribeShareGroupOffsetsRequest.Builder(data, true);
return new DescribeShareGroupOffsetsRequest.Builder(data);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public static class Builder extends AbstractRequest.Builder<AlterShareGroupOffse
private final AlterShareGroupOffsetsRequestData data;

public Builder(AlterShareGroupOffsetsRequestData data) {
this(data, true);
this(data, false);
}

public Builder(AlterShareGroupOffsetsRequestData data, boolean enableUnstableLastVersion) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public static class Builder extends AbstractRequest.Builder<DeleteShareGroupStat
private final DeleteShareGroupStateRequestData data;

public Builder(DeleteShareGroupStateRequestData data) {
this(data, true);
this(data, false);
}

public Builder(DeleteShareGroupStateRequestData data, boolean enableUnstableLastVersion) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public static class Builder extends AbstractRequest.Builder<InitializeShareGroup
private final InitializeShareGroupStateRequestData data;

public Builder(InitializeShareGroupStateRequestData data) {
this(data, true);
this(data, false);
}

public Builder(InitializeShareGroupStateRequestData data, boolean enableUnstableLastVersion) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public static class Builder extends AbstractRequest.Builder<ReadShareGroupStateR
private final ReadShareGroupStateRequestData data;

public Builder(ReadShareGroupStateRequestData data) {
this(data, true);
this(data, false);
}

public Builder(ReadShareGroupStateRequestData data, boolean enableUnstableLastVersion) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public static ShareAcknowledgeRequest.Builder forConsumer(String groupId, ShareR
partMap.forEach((index, ackPartition) -> ackTopic.partitions().add(ackPartition));
});

return new ShareAcknowledgeRequest.Builder(data, true);
return new ShareAcknowledgeRequest.Builder(data);
}

public ShareAcknowledgeRequestData data() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public static class Builder extends AbstractRequest.Builder<ShareGroupHeartbeatR
private final ShareGroupHeartbeatRequestData data;

public Builder(ShareGroupHeartbeatRequestData data) {
this(data, true);
this(data, false);
}

public Builder(ShareGroupHeartbeatRequestData data, boolean enableUnstableLastVersion) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public static class Builder extends AbstractRequest.Builder<WriteShareGroupState
private final WriteShareGroupStateRequestData data;

public Builder(WriteShareGroupStateRequestData data) {
this(data, true);
this(data, false);
}

public Builder(WriteShareGroupStateRequestData data, boolean enableUnstableLastVersion) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
"name": "AlterShareGroupOffsetsRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"latestVersionUnstable": true,
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
"about": "The group identifier." },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"flexibleVersions": "0+",
// Supported errors:
// - GROUP_AUTHORIZATION_FAILED (version 0+)
// - TOPIC_AUTHORIZATION_FAILED (version 0+)
// - NOT_COORDINATOR (version 0+)
// - COORDINATOR_NOT_AVAILABLE (version 0+)
// - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
"name": "DeleteShareGroupOffsetsRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"latestVersionUnstable": true,
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
"about": "The group identifier." },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,15 @@
"name": "DeleteShareGroupStateRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"latestVersionUnstable": true,
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0+",
"about":"The group identifier." },
"about": "The group identifier." },
{ "name": "Topics", "type": "[]DeleteStateData", "versions": "0+",
"about": "The data for the topics.", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+",
"about": "The topic identifier." },
{ "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
"about": "The data for the partitions.", "fields": [
"about": "The data for the partitions.", "fields": [
{ "name": "Partition", "type": "int32", "versions": "0+",
"about": "The partition index." }
]}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
"name": "DescribeShareGroupOffsetsRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"latestVersionUnstable": true,
"fields": [
{ "name": "Groups", "type": "[]DescribeShareGroupOffsetsRequestGroup", "versions": "0+",
"about": "The groups to describe offsets for.", "fields": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
"name": "InitializeShareGroupStateRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"latestVersionUnstable": true,
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0+",
"about": "The group identifier." },
Expand All @@ -29,7 +28,7 @@
{ "name": "TopicId", "type": "uuid", "versions": "0+",
"about": "The topic identifier." },
{ "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
"about": "The data for the partitions.", "fields": [
"about": "The data for the partitions.", "fields": [
{ "name": "Partition", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "StateEpoch", "type": "int32", "versions": "0+",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,15 @@
"name": "ReadShareGroupStateRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"latestVersionUnstable": true,
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0+",
"about":"The group identifier." },
"about": "The group identifier." },
{ "name": "Topics", "type": "[]ReadStateData", "versions": "0+",
"about": "The data for the topics.", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+",
"about": "The topic identifier." },
{ "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
"about": "The data for the partitions.", "fields": [
"about": "The data for the partitions.", "fields": [
{ "name": "Partition", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "LeaderEpoch", "type": "int32", "versions": "0+",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
// - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
// - GROUP_ID_NOT_FOUND (version 0+)
// - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
// - FENCED_LEADER_EPOCH (version 0+)
// - INVALID_REQUEST (version 0+)
"fields": [
{ "name": "Results", "type": "[]ReadStateResult", "versions": "0+",
Expand All @@ -39,17 +40,17 @@
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The error message, or null if there was no error." },
{ "name": "StateEpoch", "type": "int32", "versions": "0+",
"about": "The state epoch for this share-partition." },
"about": "The state epoch of the share-partition." },
{ "name": "StartOffset", "type": "int64", "versions": "0+",
"about": "The share-partition start offset, which can be -1 if it is not yet initialized." },
{ "name": "StateBatches", "type": "[]StateBatch", "versions": "0+",
"about": "The state batches for this share-partition.", "fields":[
{ "name": "FirstOffset", "type": "int64", "versions": "0+",
"about": "The base offset of this state batch." },
"about": "The first offset of this state batch." },
{ "name": "LastOffset", "type": "int64", "versions": "0+",
"about": "The last offset of this state batch." },
{ "name": "DeliveryState", "type": "int8", "versions": "0+",
"about": "The state - 0:Available,2:Acked,4:Archived." },
"about": "The delivery state - 0:Available,2:Acked,4:Archived." },
{ "name": "DeliveryCount", "type": "int16", "versions": "0+",
"about": "The delivery count." }
]}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,15 @@
"name": "ReadShareGroupStateSummaryRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"latestVersionUnstable": true,
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0+",
"about":"The group identifier." },
"about": "The group identifier." },
{ "name": "Topics", "type": "[]ReadStateSummaryData", "versions": "0+",
"about": "The data for the topics.", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+",
"about": "The topic identifier." },
{ "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
"about": "The data for the partitions.", "fields": [
"about": "The data for the partitions.", "fields": [
{ "name": "Partition", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "LeaderEpoch", "type": "int32", "versions": "0+",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,34 +18,33 @@
"type": "request",
"listeners": ["broker"],
"name": "ShareAcknowledgeRequest",
"validVersions": "0",
"flexibleVersions": "0+",
// The ShareAcknowledgeRequest API is added as part of KIP-932 and is still under
// development. Hence, the API is not exposed by default by brokers unless
// explicitly enabled.
"latestVersionUnstable": true,
// Version 0 was used for early access of KIP-932 in Apache Kafka 4.0 but removed in Apacke Kafka 4.1.
//
// Version 1 is the initial stable version (KIP-932).
"validVersions": "1",
"flexibleVersions": "1+",
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "entityType": "groupId",
{ "name": "GroupId", "type": "string", "versions": "1+", "nullableVersions": "1+", "default": "null", "entityType": "groupId",
"about": "The group identifier." },
{ "name": "MemberId", "type": "string", "versions": "0+", "nullableVersions": "0+",
{ "name": "MemberId", "type": "string", "versions": "1+", "nullableVersions": "1+",
"about": "The member ID." },
{ "name": "ShareSessionEpoch", "type": "int32", "versions": "0+",
{ "name": "ShareSessionEpoch", "type": "int32", "versions": "1+",
"about": "The current share session epoch: 0 to open a share session; -1 to close it; otherwise increments for consecutive requests." },
{ "name": "Topics", "type": "[]AcknowledgeTopic", "versions": "0+",
{ "name": "Topics", "type": "[]AcknowledgeTopic", "versions": "1+",
"about": "The topics containing records to acknowledge.", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The unique topic ID."},
{ "name": "Partitions", "type": "[]AcknowledgePartition", "versions": "0+",
{ "name": "TopicId", "type": "uuid", "versions": "1+", "about": "The unique topic ID." },
{ "name": "Partitions", "type": "[]AcknowledgePartition", "versions": "1+",
"about": "The partitions containing records to acknowledge.", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
{ "name": "PartitionIndex", "type": "int32", "versions": "1+",
"about": "The partition index." },
{ "name": "AcknowledgementBatches", "type": "[]AcknowledgementBatch", "versions": "0+",
{ "name": "AcknowledgementBatches", "type": "[]AcknowledgementBatch", "versions": "1+",
"about": "Record batches to acknowledge.", "fields": [
{ "name": "FirstOffset", "type": "int64", "versions": "0+",
"about": "First offset of batch of records to acknowledge."},
{ "name": "LastOffset", "type": "int64", "versions": "0+",
"about": "Last offset (inclusive) of batch of records to acknowledge."},
{ "name": "AcknowledgeTypes", "type": "[]int8", "versions": "0+",
"about": "Array of acknowledge types - 0:Gap,1:Accept,2:Release,3:Reject."}
{ "name": "FirstOffset", "type": "int64", "versions": "1+",
"about": "First offset of batch of records to acknowledge." },
{ "name": "LastOffset", "type": "int64", "versions": "1+",
"about": "Last offset (inclusive) of batch of records to acknowledge." },
{ "name": "AcknowledgeTypes", "type": "[]int8", "versions": "1+",
"about": "Array of acknowledge types - 0:Gap,1:Accept,2:Release,3:Reject." }
]}
]}
]}
Expand Down
Loading