Skip to content

Commit 5aa71a4

Browse files
committed
WIP
1 parent 3ed5902 commit 5aa71a4

6 files changed

+103
-56
lines changed

Diff for: metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ static class Builder {
8989
private SnapshotRegistry snapshotRegistry = null;
9090
private long sessionTimeoutNs = DEFAULT_SESSION_TIMEOUT_NS;
9191
private ReplicaPlacer replicaPlacer = null;
92-
private FeatureControlManager featureControl = null;
92+
private LimitedFeatureControlManager featureControl = null;
9393
private BrokerShutdownHandler brokerShutdownHandler = null;
9494
private String interBrokerListenerName = "PLAINTEXT";
9595

@@ -123,7 +123,7 @@ Builder setReplicaPlacer(ReplicaPlacer replicaPlacer) {
123123
return this;
124124
}
125125

126-
Builder setFeatureControlManager(FeatureControlManager featureControl) {
126+
Builder setFeatureControlManager(LimitedFeatureControlManager featureControl) {
127127
this.featureControl = featureControl;
128128
return this;
129129
}
@@ -250,7 +250,7 @@ boolean check() {
250250
/**
251251
* The feature control manager.
252252
*/
253-
private final FeatureControlManager featureControl;
253+
private final LimitedFeatureControlManager featureControl;
254254

255255
private final BrokerShutdownHandler brokerShutdownHandler;
256256

@@ -276,7 +276,7 @@ private ClusterControlManager(
276276
SnapshotRegistry snapshotRegistry,
277277
long sessionTimeoutNs,
278278
ReplicaPlacer replicaPlacer,
279-
FeatureControlManager featureControl,
279+
LimitedFeatureControlManager featureControl,
280280
BrokerShutdownHandler brokerShutdownHandler,
281281
String interBrokerListenerName
282282
) {

Diff for: metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java

+11-11
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public class ConfigurationControlManager {
7575
private final TimelineHashSet<Integer> brokersWithConfigs;
7676
private final Map<String, Object> staticConfig;
7777
private final ConfigResource currentController;
78-
private final FeatureControlManager featureControl;
78+
private final LimitedFeatureControlManager featureControl;
7979

8080
static class Builder {
8181
private LogContext logContext = null;
@@ -86,7 +86,7 @@ static class Builder {
8686
private ConfigurationValidator validator = ConfigurationValidator.NO_OP;
8787
private Map<String, Object> staticConfig = Map.of();
8888
private int nodeId = 0;
89-
private FeatureControlManager featureControl = null;
89+
private LimitedFeatureControlManager featureControl = null;
9090

9191
Builder setLogContext(LogContext logContext) {
9292
this.logContext = logContext;
@@ -128,7 +128,7 @@ Builder setNodeId(int nodeId) {
128128
return this;
129129
}
130130

131-
Builder setFeatureControl(FeatureControlManager featureControl) {
131+
Builder setFeatureControl(LimitedFeatureControlManager featureControl) {
132132
this.featureControl = featureControl;
133133
return this;
134134
}
@@ -156,14 +156,14 @@ ConfigurationControlManager build() {
156156
}
157157

158158
private ConfigurationControlManager(LogContext logContext,
159-
SnapshotRegistry snapshotRegistry,
160-
KafkaConfigSchema configSchema,
161-
Consumer<ConfigResource> existenceChecker,
162-
Optional<AlterConfigPolicy> alterConfigPolicy,
163-
ConfigurationValidator validator,
164-
Map<String, Object> staticConfig,
165-
int nodeId,
166-
FeatureControlManager featureControl
159+
SnapshotRegistry snapshotRegistry,
160+
KafkaConfigSchema configSchema,
161+
Consumer<ConfigResource> existenceChecker,
162+
Optional<AlterConfigPolicy> alterConfigPolicy,
163+
ConfigurationValidator validator,
164+
Map<String, Object> staticConfig,
165+
int nodeId,
166+
LimitedFeatureControlManager featureControl
167167
) {
168168
this.log = logContext.logger(ConfigurationControlManager.class);
169169
this.snapshotRegistry = snapshotRegistry;

Diff for: metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java

+12-6
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050
import static org.apache.kafka.controller.QuorumController.MAX_RECORDS_PER_USER_OP;
5151

5252

53-
public class FeatureControlManager {
53+
public class FeatureControlManager implements LimitedFeatureControlManager {
5454
public static class Builder {
5555
private LogContext logContext = null;
5656
private SnapshotRegistry snapshotRegistry = null;
@@ -141,7 +141,8 @@ private FeatureControlManager(
141141
this.clusterSupportDescriber = clusterSupportDescriber;
142142
}
143143

144-
ControllerResult<ApiError> updateFeatures(
144+
@Override
145+
public ControllerResult<ApiError> updateFeatures(
145146
Map<String, Short> updates,
146147
Map<String, FeatureUpdate.UpgradeType> upgradeTypes,
147148
boolean validateOnly
@@ -168,11 +169,13 @@ ControllerResult<ApiError> updateFeatures(
168169
}
169170
}
170171

171-
Optional<MetadataVersion> metadataVersion() {
172+
@Override
173+
public Optional<MetadataVersion> metadataVersion() {
172174
return metadataVersion.get();
173175
}
174176

175-
MetadataVersion metadataVersionOrThrow() {
177+
@Override
178+
public MetadataVersion metadataVersionOrThrow() {
176179
return metadataVersionOrThrow(SnapshotRegistry.LATEST_EPOCH);
177180
}
178181

@@ -361,6 +364,7 @@ FinalizedControllerFeatures finalizedFeatures(long epoch) {
361364
return new FinalizedControllerFeatures(features, epoch);
362365
}
363366

367+
@Override
364368
public void replay(FeatureLevelRecord record) {
365369
VersionRange range = quorumFeatures.localSupportedFeature(record.name());
366370
if (!range.contains(record.featureLevel())) {
@@ -387,11 +391,13 @@ public void replay(FeatureLevelRecord record) {
387391
}
388392
}
389393

390-
boolean isControllerId(int nodeId) {
394+
@Override
395+
public boolean isControllerId(int nodeId) {
391396
return quorumFeatures.isControllerId(nodeId);
392397
}
393398

394-
boolean isElrFeatureEnabled() {
399+
@Override
400+
public boolean isElrFeatureEnabled() {
395401
return finalizedVersions.getOrDefault(EligibleLeaderReplicasVersion.FEATURE_NAME, (short) 0) >=
396402
EligibleLeaderReplicasVersion.ELRV_1.featureLevel();
397403
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package org.apache.kafka.controller;
2+
3+
import org.apache.kafka.clients.admin.FeatureUpdate;
4+
import org.apache.kafka.common.metadata.FeatureLevelRecord;
5+
import org.apache.kafka.common.requests.ApiError;
6+
import org.apache.kafka.server.common.MetadataVersion;
7+
8+
import java.util.Map;
9+
import java.util.Optional;
10+
11+
public interface LimitedFeatureControlManager {
12+
13+
ControllerResult<ApiError> updateFeatures(
14+
Map<String, Short> updates,
15+
Map<String, FeatureUpdate.UpgradeType> upgradeTypes,
16+
boolean validateOnly
17+
);
18+
19+
Optional<MetadataVersion> metadataVersion();
20+
21+
MetadataVersion metadataVersionOrThrow();
22+
23+
void replay(FeatureLevelRecord record);
24+
25+
boolean isControllerId(int nodeId);
26+
27+
boolean isElrFeatureEnabled();
28+
}

0 commit comments

Comments
 (0)