Skip to content

Commit 17ab374

Browse files
KAFKA-15371 MetadataShell is stuck when bootstrapping (#19419)
issue link https://issues.apache.org/jira/browse/KAFKA-15371 ## conclusion This issue isn’t caused by differences between the `log` file and the `checkpoint` file, but rather by the order in which asynchronous events occur. ## reliably reproduce In the current version, you can reliably reproduce this issue by adding a small sleep in `SnapshotFileReader#handleNextBatch` , like this: ``` private void handleNextBatch() { if (!batchIterator.hasNext()) { try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } beginShutdown("done"); return; } FileChannelRecordBatch batch = batchIterator.next(); if (batch.isControlBatch()) { handleControlBatch(batch); } else { handleMetadataBatch(batch); } scheduleHandleNextBatch(); lastOffset = batch.lastOffset(); } ``` you can download a test file [test checkpoint file](https://github.com/user-attachments/files/19659636/00000000000000007169-0000000001.checkpoint.log) ⚠️: Please remove the .log extension after downloading, since GitHub doesn’t allow uploading checkpoint files directly. After change code and gradle build , you can run `bin/kafka-metadata-shell.sh --snapshot ${your file path}` You will only see a loading message in the console like this: <img width="248" alt="image" src="https://github.com/user-attachments/assets/fe4b4eba-7a6a-4cee-9b56-c82a5fa02c89" /> ## Cause of the Bug After the `SnapshotFileReader startup`, it will enqueue the iterator’s events to its own kafkaQueue. The impontent method is: `SnapshotFileReader#scheduleHandleNextBatch` When processing each batch of the iterator, it adds metadata events for the batch to the kafkaQueue(different from the SnapshotFileReader.) of the metadataLoader. The impontent method is `SnapshotFileReader#handleMetadataBatch` and `MetadataLoader#handleCommit` When the MetadataLoader processes a MetadataDelta, it checks whether the high watermark has been updated. If not, it skips processing The impontent method is `MetadataLoader#maybePublishMetadata` and `maybePublishMetadata#stillNeedToCatchUp` The crucial high watermark update happens after the SnapshotFileReader’s iterator finishes reading, using the cleanup task of its kafkaQueue. So, if the MetadataLoader finishes processing all batches before the high watermark is updated, the main thread will keep waiting. <img width="1088" alt="image" src="https://github.com/user-attachments/assets/03daa288-ff39-49a3-bbc7-e7b5831a858b" /> <img width="867" alt="image" src="https://github.com/user-attachments/assets/fc0770dd-de54-4f69-b669-ab4e696bd2a7" /> ## Solution If we’ve reached the last batch in the iteration, we update the high watermark first before adding events to the MetadataLoader, ensuring that MetadataLoader runs at least once after the watermark is updated. After modifying the code, you’ll see the normal shell execution behavior. <img width="337" alt="image" src="https://github.com/user-attachments/assets/2791d03c-81ae-4762-a015-4d6d9e526455" /> Reviewers: PoAn Yang <[email protected]>, Jhen-Yung Hsu <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent 4cdd4b6 commit 17ab374

File tree

1 file changed

+4
-3
lines changed

1 file changed

+4
-3
lines changed

Diff for: metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileReader.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -96,12 +96,15 @@ private void handleNextBatch() {
9696
return;
9797
}
9898
FileChannelRecordBatch batch = batchIterator.next();
99+
lastOffset = batch.lastOffset();
100+
if (!batchIterator.hasNext()) {
101+
highWaterMark = OptionalLong.of(lastOffset);
102+
}
99103
if (batch.isControlBatch()) {
100104
handleControlBatch(batch);
101105
} else {
102106
handleMetadataBatch(batch);
103107
}
104-
lastOffset = batch.lastOffset();
105108
scheduleHandleNextBatch();
106109
}
107110

@@ -187,8 +190,6 @@ public void beginShutdown(String reason) {
187190
class ShutdownEvent implements EventQueue.Event {
188191
@Override
189192
public void run() throws Exception {
190-
// Expose the high water mark only once we've shut down.
191-
highWaterMark = OptionalLong.of(lastOffset);
192193

193194
if (fileRecords != null) {
194195
fileRecords.close();

0 commit comments

Comments
 (0)