Skip to content

Commit 9dfc5a7

Browse files
authored
MINOR: Prefer MetadataRecordSerde.INSTANCE over new instances (apache#22289)
Replace `new MetadataRecordSerde()` with `MetadataRecordSerde.INSTANCE` across the codebase. MetadataRecordSerde is stateless and thread-safe, so a single shared instance is sufficient. Also add Javadoc on the public constructor to guide future callers toward the singleton. Relevant discussions: - apache#22116 (comment) - apache#10990 (comment) Verified that MetadataRecordSerde (and its parent AbstractApiMessageSerde) has no mutable instance fields. All methods (read/write/recordSize) operate solely on their parameters and local variables, confirming no thread-safety or state-recycling concerns with sharing a single instance. Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
1 parent 9505119 commit 9dfc5a7

11 files changed

Lines changed: 33 additions & 39 deletions

File tree

core/src/main/scala/kafka/server/SharedServer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,7 @@ class SharedServer(
294294
clusterId,
295295
sharedServerConfig,
296296
metaPropsEnsemble.logDirProps.get(metaPropsEnsemble.metadataLogDir.get).directoryId.get,
297-
new MetadataRecordSerde,
297+
MetadataRecordSerde.INSTANCE,
298298
KafkaRaftServer.MetadataPartition,
299299
KafkaRaftServer.MetadataTopicId,
300300
time,

metadata/src/main/java/org/apache/kafka/metadata/MetadataRecordSerde.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,13 @@
2323
public class MetadataRecordSerde extends AbstractApiMessageSerde {
2424
public static final MetadataRecordSerde INSTANCE = new MetadataRecordSerde();
2525

26+
/**
27+
* Prefer using {@link #INSTANCE} instead of creating new instances,
28+
* as this class is stateless and thread-safe.
29+
*/
30+
public MetadataRecordSerde() {
31+
}
32+
2633
@Override
2734
public ApiMessage apiMessageFor(short apiKey) {
2835
return MetadataRecordType.fromId(apiKey).newMetadataRecord();

metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -506,7 +506,7 @@ static void writeDynamicQuorumSnapshot(
506506
Snapshots.BOOTSTRAP_SNAPSHOT_ID)).
507507
setKraftVersion(KRaftVersion.fromFeatureLevel(kraftVersion)).
508508
setVoterSet(Optional.of(voterSet));
509-
try (RecordsSnapshotWriter<ApiMessageAndVersion> writer = builder.build(new MetadataRecordSerde())) {
509+
try (RecordsSnapshotWriter<ApiMessageAndVersion> writer = builder.build(MetadataRecordSerde.INSTANCE)) {
510510
writer.freeze();
511511
}
512512
}

metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileReader.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,12 +74,10 @@ public record BatchAndType(Batch<ApiMessageAndVersion> batch, boolean isControl)
7474

7575
private final FileRecords fileRecords;
7676
private Iterator<FileChannelRecordBatch> batchIterator;
77-
private final MetadataRecordSerde serde;
7877

7978
private BatchFileReader(FileRecords fileRecords) {
8079
this.fileRecords = fileRecords;
8180
this.batchIterator = fileRecords.batchIterator();
82-
this.serde = new MetadataRecordSerde();
8381
}
8482

8583
@Override
@@ -142,7 +140,7 @@ private BatchAndType nextDataBatch(FileChannelRecordBatch input) {
142140
for (Record record : input) {
143141
try {
144142
ByteBufferAccessor accessor = new ByteBufferAccessor(record.value());
145-
ApiMessageAndVersion messageAndVersion = serde.read(accessor, record.valueSize());
143+
ApiMessageAndVersion messageAndVersion = MetadataRecordSerde.INSTANCE.read(accessor, record.valueSize());
146144
messages.add(messageAndVersion);
147145
} catch (Throwable e) {
148146
throw new RuntimeException("unable to deserialize record at offset " + record.offset(), e);

metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public static BatchFileWriter open(Path snapshotPath) throws IOException {
9797
new BatchMemoryPool(5, MAX_BATCH_SIZE_BYTES),
9898
time,
9999
Compression.NONE,
100-
new MetadataRecordSerde()
100+
MetadataRecordSerde.INSTANCE
101101
);
102102

103103
// Append the snapshot header control record and force it to create a batch

metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileReader.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ public final class SnapshotFileReader implements AutoCloseable {
5858
private final CompletableFuture<Void> caughtUpFuture;
5959
private FileRecords fileRecords;
6060
private Iterator<FileChannelRecordBatch> batchIterator;
61-
private final MetadataRecordSerde serde = new MetadataRecordSerde();
6261
private long lastOffset = -1L;
6362
private volatile OptionalLong highWaterMark = OptionalLong.empty();
6463

@@ -155,7 +154,7 @@ private void handleMetadataBatch(FileChannelRecordBatch batch) {
155154
for (Record record : batch) {
156155
ByteBufferAccessor accessor = new ByteBufferAccessor(record.value());
157156
try {
158-
ApiMessageAndVersion messageAndVersion = serde.read(accessor, record.valueSize());
157+
ApiMessageAndVersion messageAndVersion = MetadataRecordSerde.INSTANCE.read(accessor, record.valueSize());
159158
messages.add(messageAndVersion);
160159
} catch (Throwable e) {
161160
log.error("unable to read metadata record at offset {}", record.offset(), e);

metadata/src/test/java/org/apache/kafka/controller/MockRaftClient.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -479,7 +479,7 @@ private void scheduleLogCheck() {
479479
listenerData.handleLoadSnapshot(
480480
RecordsSnapshotReader.of(
481481
snapshot.get(),
482-
new MetadataRecordSerde(),
482+
MetadataRecordSerde.INSTANCE,
483483
BufferSupplier.create(),
484484
Integer.MAX_VALUE,
485485
true,
@@ -554,7 +554,7 @@ private void scheduleLogCheck() {
554554
}
555555

556556
private static int messageSize(ApiMessageAndVersion messageAndVersion, ObjectSerializationCache objectCache) {
557-
return new MetadataRecordSerde().recordSize(messageAndVersion, objectCache);
557+
return MetadataRecordSerde.INSTANCE.recordSize(messageAndVersion, objectCache);
558558
}
559559

560560
public void beginShutdown() {
@@ -729,7 +729,7 @@ public Optional<SnapshotWriter<ApiMessageAndVersion>> createSnapshot(
729729
.setLastContainedLogTimestamp(lastContainedLogTimestamp)
730730
.setTime(new MockTime())
731731
.setRawSnapshotWriter(createNewSnapshot(snapshotId))
732-
.build(new MetadataRecordSerde())
732+
.build(MetadataRecordSerde.INSTANCE)
733733
);
734734
}
735735

metadata/src/test/java/org/apache/kafka/metadata/MetadataRecordSerdeTest.java

Lines changed: 14 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -41,22 +41,22 @@ public void testSerde() {
4141
.setName("foo")
4242
.setTopicId(Uuid.randomUuid());
4343

44-
MetadataRecordSerde serde = new MetadataRecordSerde();
44+
4545

4646
for (short version = TopicRecord.LOWEST_SUPPORTED_VERSION; version <= TopicRecord.HIGHEST_SUPPORTED_VERSION; version++) {
4747
ApiMessageAndVersion messageAndVersion = new ApiMessageAndVersion(topicRecord, version);
4848

4949
ObjectSerializationCache cache = new ObjectSerializationCache();
50-
int size = serde.recordSize(messageAndVersion, cache);
50+
int size = MetadataRecordSerde.INSTANCE.recordSize(messageAndVersion, cache);
5151

5252
ByteBuffer buffer = ByteBuffer.allocate(size);
5353
ByteBufferAccessor bufferAccessor = new ByteBufferAccessor(buffer);
5454

55-
serde.write(messageAndVersion, cache, bufferAccessor);
55+
MetadataRecordSerde.INSTANCE.write(messageAndVersion, cache, bufferAccessor);
5656
buffer.flip();
5757

5858
assertEquals(size, buffer.remaining());
59-
ApiMessageAndVersion readMessageAndVersion = serde.read(bufferAccessor, size);
59+
ApiMessageAndVersion readMessageAndVersion = MetadataRecordSerde.INSTANCE.read(bufferAccessor, size);
6060
assertEquals(messageAndVersion, readMessageAndVersion);
6161
}
6262
}
@@ -67,18 +67,16 @@ public void testDeserializeWithUnhandledFrameVersion() {
6767
ByteUtils.writeUnsignedVarint(15, buffer);
6868
buffer.flip();
6969

70-
MetadataRecordSerde serde = new MetadataRecordSerde();
7170
assertStartsWith("Could not deserialize metadata record due to unknown frame version",
7271
assertThrows(MetadataParseException.class,
73-
() -> serde.read(new ByteBufferAccessor(buffer), 16)).getMessage());
72+
() -> MetadataRecordSerde.INSTANCE.read(new ByteBufferAccessor(buffer), 16)).getMessage());
7473
}
7574

7675
/**
7776
* Test attempting to parse an event which has a malformed frame version type varint.
7877
*/
7978
@Test
8079
public void testParsingMalformedFrameVersionVarint() {
81-
MetadataRecordSerde serde = new MetadataRecordSerde();
8280
ByteBuffer buffer = ByteBuffer.allocate(64);
8381
buffer.clear();
8482
buffer.put((byte) 0x80);
@@ -91,15 +89,14 @@ public void testParsingMalformedFrameVersionVarint() {
9189
buffer.limit(64);
9290
assertStartsWith("Error while reading frame version",
9391
assertThrows(MetadataParseException.class,
94-
() -> serde.read(new ByteBufferAccessor(buffer), buffer.remaining())).getMessage());
92+
() -> MetadataRecordSerde.INSTANCE.read(new ByteBufferAccessor(buffer), buffer.remaining())).getMessage());
9593
}
9694

9795
/**
9896
* Test attempting to parse an event which has a malformed message type varint.
9997
*/
10098
@Test
10199
public void testParsingMalformedMessageTypeVarint() {
102-
MetadataRecordSerde serde = new MetadataRecordSerde();
103100
ByteBuffer buffer = ByteBuffer.allocate(64);
104101
buffer.clear();
105102
buffer.put((byte) 0x01);
@@ -113,15 +110,14 @@ public void testParsingMalformedMessageTypeVarint() {
113110
buffer.limit(64);
114111
assertStartsWith("Error while reading type",
115112
assertThrows(MetadataParseException.class,
116-
() -> serde.read(new ByteBufferAccessor(buffer), buffer.remaining())).getMessage());
113+
() -> MetadataRecordSerde.INSTANCE.read(new ByteBufferAccessor(buffer), buffer.remaining())).getMessage());
117114
}
118115

119116
/**
120117
* Test attempting to parse an event which has a malformed message version varint.
121118
*/
122119
@Test
123120
public void testParsingMalformedMessageVersionVarint() {
124-
MetadataRecordSerde serde = new MetadataRecordSerde();
125121
ByteBuffer buffer = ByteBuffer.allocate(64);
126122
buffer.clear();
127123
buffer.put((byte) 0x01);
@@ -135,15 +131,14 @@ public void testParsingMalformedMessageVersionVarint() {
135131
buffer.limit(64);
136132
assertStartsWith("Error while reading version",
137133
assertThrows(MetadataParseException.class,
138-
() -> serde.read(new ByteBufferAccessor(buffer), buffer.remaining())).getMessage());
134+
() -> MetadataRecordSerde.INSTANCE.read(new ByteBufferAccessor(buffer), buffer.remaining())).getMessage());
139135
}
140136

141137
/**
142138
* Test attempting to parse an event which has a version > Short.MAX_VALUE
143139
*/
144140
@Test
145141
public void testParsingVersionTooLarge() {
146-
MetadataRecordSerde serde = new MetadataRecordSerde();
147142
ByteBuffer buffer = ByteBuffer.allocate(64);
148143
buffer.clear();
149144
buffer.put((byte) 0x01); // frame version
@@ -157,15 +152,14 @@ public void testParsingVersionTooLarge() {
157152
buffer.limit(64);
158153
assertStartsWith("Value for version was too large",
159154
assertThrows(MetadataParseException.class,
160-
() -> serde.read(new ByteBufferAccessor(buffer), buffer.remaining())).getMessage());
155+
() -> MetadataRecordSerde.INSTANCE.read(new ByteBufferAccessor(buffer), buffer.remaining())).getMessage());
161156
}
162157

163158
/**
164159
* Test attempting to parse an event which has a unsupported version
165160
*/
166161
@Test
167162
public void testParsingUnsupportedApiKey() {
168-
MetadataRecordSerde serde = new MetadataRecordSerde();
169163
ByteBuffer buffer = ByteBuffer.allocate(64);
170164
buffer.put((byte) 0x01); // frame version
171165
buffer.put((byte) 0xff); // apiKey
@@ -176,15 +170,14 @@ public void testParsingUnsupportedApiKey() {
176170
buffer.limit(64);
177171
assertStartsWith("Unknown metadata id ",
178172
assertThrows(MetadataParseException.class,
179-
() -> serde.read(new ByteBufferAccessor(buffer), buffer.remaining())).getCause().getMessage());
173+
() -> MetadataRecordSerde.INSTANCE.read(new ByteBufferAccessor(buffer), buffer.remaining())).getCause().getMessage());
180174
}
181175

182176
/**
183177
* Test attempting to parse an event which has a malformed message body.
184178
*/
185179
@Test
186180
public void testParsingMalformedMessage() {
187-
MetadataRecordSerde serde = new MetadataRecordSerde();
188181
ByteBuffer buffer = ByteBuffer.allocate(4);
189182
buffer.put((byte) 0x01); // frame version
190183
buffer.put((byte) 0x00); // apiKey
@@ -194,27 +187,26 @@ public void testParsingMalformedMessage() {
194187
buffer.limit(4);
195188
assertStartsWith("Failed to deserialize record with type",
196189
assertThrows(MetadataParseException.class,
197-
() -> serde.read(new ByteBufferAccessor(buffer), buffer.remaining())).getMessage());
190+
() -> MetadataRecordSerde.INSTANCE.read(new ByteBufferAccessor(buffer), buffer.remaining())).getMessage());
198191
}
199192

200193
/**
201194
* Test attempting to parse an event which has a malformed message version varint.
202195
*/
203196
@Test
204197
public void testParsingRecordWithGarbageAtEnd() {
205-
MetadataRecordSerde serde = new MetadataRecordSerde();
206198
RegisterBrokerRecord message = new RegisterBrokerRecord().setBrokerId(1).setBrokerEpoch(2);
207199

208200
ObjectSerializationCache cache = new ObjectSerializationCache();
209201
ApiMessageAndVersion messageAndVersion = new ApiMessageAndVersion(message, (short) 0);
210-
int size = serde.recordSize(messageAndVersion, cache);
202+
int size = MetadataRecordSerde.INSTANCE.recordSize(messageAndVersion, cache);
211203
ByteBuffer buffer = ByteBuffer.allocate(size + 1);
212204

213-
serde.write(messageAndVersion, cache, new ByteBufferAccessor(buffer));
205+
MetadataRecordSerde.INSTANCE.write(messageAndVersion, cache, new ByteBufferAccessor(buffer));
214206
buffer.clear();
215207
assertStartsWith("Found 1 byte(s) of garbage after",
216208
assertThrows(MetadataParseException.class,
217-
() -> serde.read(new ByteBufferAccessor(buffer), size + 1)).getMessage());
209+
() -> MetadataRecordSerde.INSTANCE.read(new ByteBufferAccessor(buffer), size + 1)).getMessage());
218210
}
219211

220212
private static void assertStartsWith(String prefix, String str) {

server/src/test/java/org/apache/kafka/server/RaftClusterSnapshotTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ public void testSnapshotsGenerated() throws Exception {
100100
for (var raftManager : raftManagers.values()) {
101101
try (var snapshot = RecordsSnapshotReader.of(
102102
raftManager.raftLog().latestSnapshot().get(),
103-
new MetadataRecordSerde(),
103+
MetadataRecordSerde.INSTANCE,
104104
BufferSupplier.create(),
105105
1,
106106
true,

tools/src/main/java/org/apache/kafka/tools/DumpLogSegments.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -731,13 +731,12 @@ protected JsonNode valueAsJson(ApiMessage message, short version) {
731731
}
732732

733733
private static class ClusterMetadataLogMessageParser implements MessageParser<String, String> {
734-
private final MetadataRecordSerde metadataRecordSerde = new MetadataRecordSerde();
735734

736735
@Override
737736
public ParseResult<String, String> parse(Record record) {
738737
String output;
739738
try {
740-
ApiMessageAndVersion messageAndVersion = metadataRecordSerde.read(
739+
ApiMessageAndVersion messageAndVersion = MetadataRecordSerde.INSTANCE.read(
741740
new ByteBufferAccessor(record.value()), record.valueSize());
742741
ObjectNode json = new ObjectNode(JsonNodeFactory.instance);
743742
json.set("type", new TextNode(

0 commit comments

Comments
 (0)