Skip to content

KAFKA-19057: Stabilize KIP-932 RPCs for AK 4.1 #19378

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Apr 22, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,7 @@
@ClusterConfigProperty(key = "share.coordinator.state.topic.num.partitions", value = "3"),
@ClusterConfigProperty(key = "share.coordinator.state.topic.replication.factor", value = "1"),
@ClusterConfigProperty(key = "transaction.state.log.min.isr", value = "1"),
@ClusterConfigProperty(key = "transaction.state.log.replication.factor", value = "1"),
@ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true")
@ClusterConfigProperty(key = "transaction.state.log.replication.factor", value = "1")
},
types = {Type.KRAFT}
)
Expand Down Expand Up @@ -1885,8 +1884,7 @@ public void testShareAutoOffsetResetByDurationInvalidFormat() throws Exception {
@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "3"),
@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "3"),
@ClusterConfigProperty(key = "share.coordinator.state.topic.num.partitions", value = "3"),
@ClusterConfigProperty(key = "share.coordinator.state.topic.replication.factor", value = "3"),
@ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true")
@ClusterConfigProperty(key = "share.coordinator.state.topic.replication.factor", value = "3")
}
)
@Timeout(90)
Expand Down Expand Up @@ -2037,8 +2035,7 @@ public void testShareConsumerAfterCoordinatorMovement() throws Exception {
@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "3"),
@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "3"),
@ClusterConfigProperty(key = "share.coordinator.state.topic.num.partitions", value = "3"),
@ClusterConfigProperty(key = "share.coordinator.state.topic.replication.factor", value = "3"),
@ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true")
@ClusterConfigProperty(key = "share.coordinator.state.topic.replication.factor", value = "3")
}
)
@Timeout(150)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,7 @@ DeleteShareGroupOffsetsRequest.Builder buildBatchedRequest(int brokerId, Set<Coo
return new DeleteShareGroupOffsetsRequest.Builder(
new DeleteShareGroupOffsetsRequestData()
.setGroupId(groupId.idValue)
.setTopics(topics),
true
.setTopics(topics)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public ShareGroupDescribeRequest.Builder buildBatchedRequest(int coordinatorId,
ShareGroupDescribeRequestData data = new ShareGroupDescribeRequestData()
.setGroupIds(groupIds)
.setIncludeAuthorizedOperations(includeAuthorizedOperations);
return new ShareGroupDescribeRequest.Builder(data, true);
return new ShareGroupDescribeRequest.Builder(data);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public DescribeShareGroupOffsetsRequest.Builder buildBatchedRequest(int coordina
});
DescribeShareGroupOffsetsRequestData data = new DescribeShareGroupOffsetsRequestData()
.setGroups(groups);
return new DescribeShareGroupOffsetsRequest.Builder(data, true);
return new DescribeShareGroupOffsetsRequest.Builder(data);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,7 @@ public static class Builder extends AbstractRequest.Builder<AlterShareGroupOffse
private final AlterShareGroupOffsetsRequestData data;

public Builder(AlterShareGroupOffsetsRequestData data) {
this(data, true);
}

public Builder(AlterShareGroupOffsetsRequestData data, boolean enableUnstableLastVersion) {
super(ApiKeys.ALTER_SHARE_GROUP_OFFSETS, enableUnstableLastVersion);
super(ApiKeys.ALTER_SHARE_GROUP_OFFSETS);
this.data = data;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,7 @@ public static class Builder extends AbstractRequest.Builder<DeleteShareGroupOffs
private final DeleteShareGroupOffsetsRequestData data;

public Builder(DeleteShareGroupOffsetsRequestData data) {
this(data, false);
}

public Builder(DeleteShareGroupOffsetsRequestData data, boolean enableUnstableLastVersion) {
super(ApiKeys.DELETE_SHARE_GROUP_OFFSETS, enableUnstableLastVersion);
super(ApiKeys.DELETE_SHARE_GROUP_OFFSETS);
this.data = data;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,7 @@ public static class Builder extends AbstractRequest.Builder<DeleteShareGroupStat
private final DeleteShareGroupStateRequestData data;

public Builder(DeleteShareGroupStateRequestData data) {
this(data, true);
}

public Builder(DeleteShareGroupStateRequestData data, boolean enableUnstableLastVersion) {
super(ApiKeys.DELETE_SHARE_GROUP_STATE, enableUnstableLastVersion);
super(ApiKeys.DELETE_SHARE_GROUP_STATE);
this.data = data;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,7 @@ public static class Builder extends AbstractRequest.Builder<DescribeShareGroupOf
private final DescribeShareGroupOffsetsRequestData data;

public Builder(DescribeShareGroupOffsetsRequestData data) {
this(data, false);
}

public Builder(DescribeShareGroupOffsetsRequestData data, boolean enableUnstableLastVersion) {
super(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS, enableUnstableLastVersion);
super(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS);
this.data = data;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,7 @@ public static class Builder extends AbstractRequest.Builder<InitializeShareGroup
private final InitializeShareGroupStateRequestData data;

public Builder(InitializeShareGroupStateRequestData data) {
this(data, true);
}

public Builder(InitializeShareGroupStateRequestData data, boolean enableUnstableLastVersion) {
super(ApiKeys.INITIALIZE_SHARE_GROUP_STATE, enableUnstableLastVersion);
super(ApiKeys.INITIALIZE_SHARE_GROUP_STATE);
this.data = data;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,7 @@ public static class Builder extends AbstractRequest.Builder<ReadShareGroupStateR
private final ReadShareGroupStateRequestData data;

public Builder(ReadShareGroupStateRequestData data) {
this(data, true);
}

public Builder(ReadShareGroupStateRequestData data, boolean enableUnstableLastVersion) {
super(ApiKeys.READ_SHARE_GROUP_STATE, enableUnstableLastVersion);
super(ApiKeys.READ_SHARE_GROUP_STATE);
this.data = data;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,7 @@ public static class Builder extends AbstractRequest.Builder<ReadShareGroupStateS
private final ReadShareGroupStateSummaryRequestData data;

public Builder(ReadShareGroupStateSummaryRequestData data) {
this(data, false);
}

public Builder(ReadShareGroupStateSummaryRequestData data, boolean enableUnstableLastVersion) {
super(ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY, enableUnstableLastVersion);
super(ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY);
this.data = data;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,7 @@ public static class Builder extends AbstractRequest.Builder<ShareAcknowledgeRequ
private final ShareAcknowledgeRequestData data;

public Builder(ShareAcknowledgeRequestData data) {
this(data, false);
}

public Builder(ShareAcknowledgeRequestData data, boolean enableUnstableLastVersion) {
super(ApiKeys.SHARE_ACKNOWLEDGE, enableUnstableLastVersion);
super(ApiKeys.SHARE_ACKNOWLEDGE);
this.data = data;
}

Expand Down Expand Up @@ -79,7 +75,7 @@ public static ShareAcknowledgeRequest.Builder forConsumer(String groupId, ShareR
partMap.forEach((index, ackPartition) -> ackTopic.partitions().add(ackPartition));
});

return new ShareAcknowledgeRequest.Builder(data, true);
return new ShareAcknowledgeRequest.Builder(data);
}

public ShareAcknowledgeRequestData data() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,7 @@ public static class Builder extends AbstractRequest.Builder<ShareFetchRequest> {
private final ShareFetchRequestData data;

public Builder(ShareFetchRequestData data) {
this(data, false);
}

public Builder(ShareFetchRequestData data, boolean enableUnstableLastVersion) {
super(ApiKeys.SHARE_FETCH, enableUnstableLastVersion);
super(ApiKeys.SHARE_FETCH);
this.data = data;
}

Expand Down Expand Up @@ -104,7 +100,7 @@ public static Builder forConsumer(String groupId, ShareRequestMetadata metadata,
});
}

Builder builder = new Builder(data, true);
Builder builder = new Builder(data);
// And finally, forget the topic-partitions that are no longer in the session
if (!forget.isEmpty()) {
data.setForgottenTopicsData(new ArrayList<>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,7 @@ public static class Builder extends AbstractRequest.Builder<ShareGroupDescribeRe
private final ShareGroupDescribeRequestData data;

public Builder(ShareGroupDescribeRequestData data) {
this(data, false);
}

public Builder(ShareGroupDescribeRequestData data, boolean enableUnstableLastVersion) {
super(ApiKeys.SHARE_GROUP_DESCRIBE, enableUnstableLastVersion);
super(ApiKeys.SHARE_GROUP_DESCRIBE);
this.data = data;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,7 @@ public static class Builder extends AbstractRequest.Builder<ShareGroupHeartbeatR
private final ShareGroupHeartbeatRequestData data;

public Builder(ShareGroupHeartbeatRequestData data) {
this(data, true);
}

public Builder(ShareGroupHeartbeatRequestData data, boolean enableUnstableLastVersion) {
super(ApiKeys.SHARE_GROUP_HEARTBEAT, enableUnstableLastVersion);
super(ApiKeys.SHARE_GROUP_HEARTBEAT);
this.data = data;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,7 @@ public static class Builder extends AbstractRequest.Builder<WriteShareGroupState
private final WriteShareGroupStateRequestData data;

public Builder(WriteShareGroupStateRequestData data) {
this(data, true);
}

public Builder(WriteShareGroupStateRequestData data, boolean enableUnstableLastVersion) {
super(ApiKeys.WRITE_SHARE_GROUP_STATE, enableUnstableLastVersion);
super(ApiKeys.WRITE_SHARE_GROUP_STATE);
this.data = data;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
"name": "AlterShareGroupOffsetsRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"latestVersionUnstable": true,
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
"about": "The group identifier." },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"flexibleVersions": "0+",
// Supported errors:
// - GROUP_AUTHORIZATION_FAILED (version 0+)
// - TOPIC_AUTHORIZATION_FAILED (version 0+)
// - NOT_COORDINATOR (version 0+)
// - COORDINATOR_NOT_AVAILABLE (version 0+)
// - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
"name": "DeleteShareGroupOffsetsRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"latestVersionUnstable": true,
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
"about": "The group identifier." },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,15 @@
"name": "DeleteShareGroupStateRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"latestVersionUnstable": true,
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0+",
"about":"The group identifier." },
"about": "The group identifier." },
{ "name": "Topics", "type": "[]DeleteStateData", "versions": "0+",
"about": "The data for the topics.", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+",
"about": "The topic identifier." },
{ "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
"about": "The data for the partitions.", "fields": [
"about": "The data for the partitions.", "fields": [
{ "name": "Partition", "type": "int32", "versions": "0+",
"about": "The partition index." }
]}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
"name": "DescribeShareGroupOffsetsRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"latestVersionUnstable": true,
"fields": [
{ "name": "Groups", "type": "[]DescribeShareGroupOffsetsRequestGroup", "versions": "0+",
"about": "The groups to describe offsets for.", "fields": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
"name": "InitializeShareGroupStateRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"latestVersionUnstable": true,
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0+",
"about": "The group identifier." },
Expand All @@ -29,7 +28,7 @@
{ "name": "TopicId", "type": "uuid", "versions": "0+",
"about": "The topic identifier." },
{ "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
"about": "The data for the partitions.", "fields": [
"about": "The data for the partitions.", "fields": [
{ "name": "Partition", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "StateEpoch", "type": "int32", "versions": "0+",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,15 @@
"name": "ReadShareGroupStateRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"latestVersionUnstable": true,
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0+",
"about":"The group identifier." },
"about": "The group identifier." },
{ "name": "Topics", "type": "[]ReadStateData", "versions": "0+",
"about": "The data for the topics.", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+",
"about": "The topic identifier." },
{ "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
"about": "The data for the partitions.", "fields": [
"about": "The data for the partitions.", "fields": [
{ "name": "Partition", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "LeaderEpoch", "type": "int32", "versions": "0+",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
// - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
// - GROUP_ID_NOT_FOUND (version 0+)
// - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
// - FENCED_LEADER_EPOCH (version 0+)
// - INVALID_REQUEST (version 0+)
"fields": [
{ "name": "Results", "type": "[]ReadStateResult", "versions": "0+",
Expand All @@ -39,17 +40,17 @@
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The error message, or null if there was no error." },
{ "name": "StateEpoch", "type": "int32", "versions": "0+",
"about": "The state epoch for this share-partition." },
"about": "The state epoch of the share-partition." },
{ "name": "StartOffset", "type": "int64", "versions": "0+",
"about": "The share-partition start offset, which can be -1 if it is not yet initialized." },
{ "name": "StateBatches", "type": "[]StateBatch", "versions": "0+",
"about": "The state batches for this share-partition.", "fields":[
{ "name": "FirstOffset", "type": "int64", "versions": "0+",
"about": "The base offset of this state batch." },
"about": "The first offset of this state batch." },
{ "name": "LastOffset", "type": "int64", "versions": "0+",
"about": "The last offset of this state batch." },
{ "name": "DeliveryState", "type": "int8", "versions": "0+",
"about": "The state - 0:Available,2:Acked,4:Archived." },
"about": "The delivery state - 0:Available,2:Acked,4:Archived." },
{ "name": "DeliveryCount", "type": "int16", "versions": "0+",
"about": "The delivery count." }
]}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,15 @@
"name": "ReadShareGroupStateSummaryRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"latestVersionUnstable": true,
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0+",
"about":"The group identifier." },
"about": "The group identifier." },
{ "name": "Topics", "type": "[]ReadStateSummaryData", "versions": "0+",
"about": "The data for the topics.", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+",
"about": "The topic identifier." },
{ "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
"about": "The data for the partitions.", "fields": [
"about": "The data for the partitions.", "fields": [
{ "name": "Partition", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "LeaderEpoch", "type": "int32", "versions": "0+",
Expand Down
Loading