Skip to content

Commit d5ed600

Browse files
committed
add static config reporting
1 parent 288b479 commit d5ed600

6 files changed

Lines changed: 82 additions & 17 deletions

File tree

clients/src/main/resources/common/message/BrokerRegistrationRequest.json

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,13 @@
1818
// Version 3 adds the PreviousBrokerEpoch for the KIP-966
1919
// Version 4 fixes KAFKA-17011, which blocked SupportedFeatures.MinVersion in the response from being 0.
2020
// Version 5 adds the CordonedLogDirs flexible field
21+
// Version 6 adds StaticConfigs for broker static config reporting.
2122
{
2223
"apiKey":62,
2324
"type": "request",
2425
"listeners": ["controller"],
2526
"name": "BrokerRegistrationRequest",
26-
"validVersions": "0-5",
27+
"validVersions": "0-6",
2728
"flexibleVersions": "0+",
2829
"fields": [
2930
{ "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
@@ -63,6 +64,13 @@
6364
{ "name": "PreviousBrokerEpoch", "type": "int64", "versions": "3+", "default": "-1", "ignorable": true,
6465
"about": "The epoch before a clean shutdown." },
6566
{ "name": "CordonedLogDirs", "type": "[]uuid", "versions": "5+", "taggedVersions": "5+",
66-
"tag": "0", "about": "Log directories that are cordoned." }
67+
"tag": "0", "about": "Log directories that are cordoned." },
68+
{ "name": "StaticConfigs", "type": "[]StaticConfig", "versions": "6+",
69+
"about": "static configs from the broker's server.properties and default value.", "fields": [
70+
{ "name": "Name", "type": "string", "versions": "6+",
71+
"about": "The config name." },
72+
{ "name": "Value", "type": "string", "versions": "6+", "nullableVersions": "6+",
73+
"about": "The config value. Null if the config is sensitive." }
74+
]}
6775
]
68-
}
76+
}

clients/src/main/resources/common/message/BrokerRegistrationResponse.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,12 @@
1818
// Version 3 is the same as version 2 (new field in request).
1919
// Version 4 is the same as version 2 (new field in request).
2020
// Version 5 is the same as version 2 (new field in request).
21+
// Version 6 is the same as version 2 (new field in request).
2122
{
2223
"apiKey": 62,
2324
"type": "response",
2425
"name": "BrokerRegistrationResponse",
25-
"validVersions": "0-5",
26+
"validVersions": "0-6",
2627
"flexibleVersions": "0+",
2728
"fields": [
2829
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",

metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
2525
import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpoint;
2626
import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerFeature;
27+
import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerStaticConfig;
2728
import org.apache.kafka.common.security.auth.SecurityProtocol;
2829
import org.apache.kafka.image.writer.ImageWriterOptions;
2930
import org.apache.kafka.server.common.ApiMessageAndVersion;
@@ -55,6 +56,7 @@ public static class Builder {
5556
private boolean isMigratingZkBroker;
5657
private List<Uuid> directories;
5758
private List<Uuid> cordonedDirectories;
59+
private Map<String, String> staticConfigs;
5860

5961
public Builder() {
6062
this.id = 0;
@@ -68,6 +70,7 @@ public Builder() {
6870
this.isMigratingZkBroker = false;
6971
this.directories = List.of();
7072
this.cordonedDirectories = List.of();
73+
this.staticConfigs = Map.of();
7174
}
7275

7376
public Builder setId(int id) {
@@ -135,6 +138,11 @@ public Builder setCordonedDirectories(List<Uuid> cordonedDirectories) {
135138
return this;
136139
}
137140

141+
public Builder setStaticConfigs(Map<String, String> staticConfigs) {
142+
this.staticConfigs = staticConfigs;
143+
return this;
144+
}
145+
138146
public BrokerRegistration build() {
139147
return new BrokerRegistration(
140148
id,
@@ -147,7 +155,8 @@ public BrokerRegistration build() {
147155
inControlledShutdown,
148156
isMigratingZkBroker,
149157
directories,
150-
cordonedDirectories);
158+
cordonedDirectories,
159+
staticConfigs);
151160
}
152161
}
153162

@@ -162,6 +171,7 @@ public BrokerRegistration build() {
162171
private final boolean isMigratingZkBroker;
163172
private final List<Uuid> directories;
164173
private final List<Uuid> cordonedDirectories;
174+
private final Map<String, String> staticConfigs;
165175

166176
private BrokerRegistration(
167177
int id,
@@ -174,7 +184,8 @@ private BrokerRegistration(
174184
boolean inControlledShutdown,
175185
boolean isMigratingZkBroker,
176186
List<Uuid> directories,
177-
List<Uuid> cordonedDirectories
187+
List<Uuid> cordonedDirectories,
188+
Map<String, String> staticConfigs
178189
) {
179190
this.id = id;
180191
this.epoch = epoch;
@@ -197,6 +208,7 @@ private BrokerRegistration(
197208
directories.sort(Uuid::compareTo);
198209
this.directories = Collections.unmodifiableList(directories);
199210
this.cordonedDirectories = Collections.unmodifiableList(cordonedDirectories);
211+
this.staticConfigs = Collections.unmodifiableMap(staticConfigs);
200212
}
201213

202214
public static BrokerRegistration fromRecord(RegisterBrokerRecord record) {
@@ -212,6 +224,10 @@ public static BrokerRegistration fromRecord(RegisterBrokerRecord record) {
212224
supportedFeatures.put(feature.name(), VersionRange.of(
213225
feature.minSupportedVersion(), feature.maxSupportedVersion()));
214226
}
227+
Map<String, String> staticConfigs = new HashMap<>();
228+
for (BrokerStaticConfig sc : record.staticConfigs()) {
229+
staticConfigs.put(sc.name(), sc.value());
230+
}
215231
return new BrokerRegistration(record.brokerId(),
216232
record.brokerEpoch(),
217233
record.incarnationId(),
@@ -222,7 +238,8 @@ public static BrokerRegistration fromRecord(RegisterBrokerRecord record) {
222238
record.inControlledShutdown(),
223239
record.isMigratingZkBroker(),
224240
record.logDirs(),
225-
record.cordonedLogDirs());
241+
record.cordonedLogDirs(),
242+
staticConfigs);
226243
}
227244

228245
public int id() {
@@ -277,6 +294,10 @@ public List<Uuid> cordonedDirectories() {
277294
return cordonedDirectories;
278295
}
279296

297+
public Map<String, String> staticConfigs() {
298+
return staticConfigs;
299+
}
300+
280301
public boolean hasOnlineDir(Uuid dir) {
281302
return DirectoryId.isOnline(dir, directories);
282303
}
@@ -353,14 +374,22 @@ public ApiMessageAndVersion toRecord(ImageWriterOptions options) {
353374
setMaxSupportedVersion(entry.getValue().max()));
354375
}
355376

377+
if (options.metadataVersion().isStaticConfigReportingSupported()) {
378+
for (Entry<String, String> entry : staticConfigs.entrySet()) {
379+
registrationRecord.staticConfigs().add(new BrokerStaticConfig().
380+
setName(entry.getKey()).
381+
setValue(entry.getValue()));
382+
}
383+
}
384+
356385
return new ApiMessageAndVersion(registrationRecord,
357386
options.metadataVersion().registerBrokerRecordVersion());
358387
}
359388

360389
@Override
361390
public int hashCode() {
362391
return Objects.hash(id, epoch, incarnationId, listeners, supportedFeatures,
363-
rack, fenced, inControlledShutdown, isMigratingZkBroker, directories, cordonedDirectories);
392+
rack, fenced, inControlledShutdown, isMigratingZkBroker, directories, cordonedDirectories, staticConfigs);
364393
}
365394

366395
@Override
@@ -376,7 +405,8 @@ public boolean equals(Object o) {
376405
other.inControlledShutdown == inControlledShutdown &&
377406
other.isMigratingZkBroker == isMigratingZkBroker &&
378407
other.directories.equals(directories) &&
379-
other.cordonedDirectories.equals(cordonedDirectories);
408+
other.cordonedDirectories.equals(cordonedDirectories) &&
409+
other.staticConfigs.equals(staticConfigs);
380410
}
381411

382412
@Override
@@ -399,6 +429,7 @@ public String toString() {
399429
", isMigratingZkBroker=" + isMigratingZkBroker +
400430
", directories=" + directories +
401431
", cordonedDirectories=" + cordonedDirectories +
432+
", staticConfigs=" + staticConfigs +
402433
")";
403434
}
404435

@@ -430,7 +461,8 @@ public BrokerRegistration cloneWith(
430461
newInControlledShutdownChange,
431462
isMigratingZkBroker,
432463
newDirectories,
433-
newCordonedDirectories
464+
newCordonedDirectories,
465+
staticConfigs
434466
);
435467
}
436-
}
468+
}

metadata/src/main/resources/common/metadata/RegisterBrokerRecord.json

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,12 @@
1717
// Version 2 adds IsMigratingZkBroker
1818
// Version 3 adds LogDirs
1919
// Version 4 adds CordonedLogDirs
20+
// Version 5 adds StaticConfigs for broker static config reporting.
2021
{
2122
"apiKey": 0,
2223
"type": "metadata",
2324
"name": "RegisterBrokerRecord",
24-
"validVersions": "0-4",
25+
"validVersions": "0-5",
2526
"flexibleVersions": "0+",
2627
"fields": [
2728
{ "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
@@ -61,6 +62,14 @@
6162
{ "name": "LogDirs", "type": "[]uuid", "versions": "3+", "taggedVersions": "3+", "tag": 0,
6263
"about": "Log directories configured in this broker which are available." },
6364
{ "name": "CordonedLogDirs", "type": "[]uuid", "versions": "4+", "taggedVersions": "4+", "tag": "1",
64-
"about": "Log directories that are cordoned." }
65+
"about": "Log directories that are cordoned." },
66+
{ "name": "StaticConfigs", "type": "[]BrokerStaticConfig", "versions": "5+",
67+
"taggedVersions": "5+", "tag": 2,
68+
"about": "Non-default static configs reported by the broker.", "fields": [
69+
{ "name": "Name", "type": "string", "versions": "5+",
70+
"about": "The config name." },
71+
{ "name": "Value", "type": "string", "versions": "5+", "nullableVersions": "5+",
72+
"about": "The config value. Null if the config is sensitive." }
73+
]}
6574
]
66-
}
75+
}

server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,9 @@ public enum MetadataVersion {
130130
// they have set the configuration unstable.feature.versions.enable=true.
131131
// Please move this comment when updating the LATEST_PRODUCTION constant.
132132
//
133-
IBP_4_4_IV0(31, "4.4", "IV0", false);
133+
IBP_4_4_IV0(31, "4.4", "IV0", false),
134+
135+
IBP_4_4_IV1(32, "4.4", "IV1", true);
134136

135137

136138
// NOTES when adding a new version:
@@ -219,8 +221,14 @@ public boolean isCordonedLogDirsSupported() {
219221
return this.isAtLeast(MetadataVersion.IBP_4_3_IV0);
220222
}
221223

224+
public boolean isStaticConfigReportingSupported() {
225+
return this.isAtLeast(MetadataVersion.IBP_4_4_IV1);
226+
}
227+
222228
public short registerBrokerRecordVersion() {
223-
if (isCordonedLogDirsSupported()) {
229+
if (isStaticConfigReportingSupported()) {
230+
return (short) 5;
231+
} else if (isCordonedLogDirsSupported()) {
224232
// new cordonedLogDirs field
225233
return (short) 4;
226234
} else if (isDirectoryAssignmentSupported()) {

server/src/main/java/org/apache/kafka/server/BrokerLifecycleManager.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -517,6 +517,12 @@ private void sendBrokerRegistration() {
517517
);
518518
List<Uuid> sortedLogDirs = new ArrayList<>(logDirs);
519519
sortedLogDirs.sort(Uuid::compareTo);
520+
List<BrokerRegistrationRequestData.StaticConfig> staticConfigs = new ArrayList<>();
521+
for (Map.Entry<String, ?> entry : config.values().entrySet()) {
522+
staticConfigs.add(new BrokerRegistrationRequestData.StaticConfig()
523+
.setName(entry.getKey())
524+
.setValue(String.valueOf(entry.getValue())));
525+
}
520526
BrokerRegistrationRequestData data = new BrokerRegistrationRequestData()
521527
.setBrokerId(nodeId)
522528
.setIsMigratingZkBroker(false)
@@ -527,7 +533,8 @@ private void sendBrokerRegistration() {
527533
.setRack(rack.orElse(null))
528534
.setPreviousBrokerEpoch(previousBrokerEpoch.orElse(-1L))
529535
.setLogDirs(sortedLogDirs)
530-
.setCordonedLogDirs(cordonedLogDirs.entrySet().stream().filter(Map.Entry::getValue).map(Map.Entry::getKey).toList());
536+
.setCordonedLogDirs(cordonedLogDirs.entrySet().stream().filter(Map.Entry::getValue).map(Map.Entry::getKey).toList())
537+
.setStaticConfigs(staticConfigs);
531538
if (logger.isDebugEnabled()) {
532539
logger.debug("Sending broker registration {}", data);
533540
}

0 commit comments

Comments
 (0)