Skip to content

Commit 62db144

Browse files
committed
add writer state log
1 parent 0f2c95a commit 62db144

File tree

2 files changed

+20
-0
lines changed

2 files changed

+20
-0
lines changed

fluss-server/src/main/java/org/apache/fluss/server/log/LogLoader.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,10 @@ public LoadedLogOffsets load() throws IOException {
113113
// Additionally, using 0 versus using logStartOffset does not affect correctness—they both
114114
// can restore the complete WriterState. The only difference is that using logStartOffset
115115
// can potentially skip over more segments.
116+
LOG.info("In load, before rebuild: {}", writerStateManager.toJsonString());
116117
LogTablet.rebuildWriterState(
117118
writerStateManager, logSegments, 0, nextOffset, isCleanShutdown);
119+
LOG.info("In load, after rebuild: {}", writerStateManager.toJsonString());
118120

119121
LogSegment activeSegment = logSegments.lastSegment().get();
120122
activeSegment.resizeIndexes((int) conf.get(ConfigOptions.LOG_INDEX_FILE_SIZE).getBytes());
@@ -264,12 +266,14 @@ private int recoverSegment(LogSegment segment) throws IOException {
264266
// Additionally, using 0 versus using logStartOffset does not affect correctness—they both
265267
// can restore the complete WriterState. The only difference is that using logStartOffset
266268
// can potentially skip over more segments.
269+
LOG.info("In recoverSegment, before rebuild: {}", writerStateManager.toJsonString());
267270
LogTablet.rebuildWriterState(
268271
writerStateManager, logSegments, 0, segment.getBaseOffset(), false);
269272
int bytesTruncated = segment.recover();
270273
// once we have recovered the segment's data, take a snapshot to ensure that we won't
271274
// need to reload the same segment again while recovering another segment.
272275
writerStateManager.takeSnapshot();
276+
LOG.info("In recoverSegment, after rebuild: {}", writerStateManager.toJsonString());
273277
return bytesTruncated;
274278
}
275279

fluss-server/src/main/java/org/apache/fluss/server/log/WriterStateManager.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -478,6 +478,22 @@ private static void writeSnapshot(File file, Map<Long, WriterStateEntry> entries
478478
}
479479
}
480480

481+
public String toJsonString() {
482+
List<WriterSnapshotEntry> snapshotEntries = new ArrayList<>();
483+
writers.forEach(
484+
(writerId, writerStateEntry) ->
485+
snapshotEntries.add(
486+
new WriterSnapshotEntry(
487+
writerId,
488+
writerStateEntry.lastBatchSequence(),
489+
writerStateEntry.lastDataOffset(),
490+
writerStateEntry.lastOffsetDelta(),
491+
writerStateEntry.lastBatchTimestamp())));
492+
byte[] jsonBytes = new WriterSnapshotMap(snapshotEntries).toJsonBytes();
493+
494+
return new String(jsonBytes);
495+
}
496+
481497
/** Writer snapshot map json serde. */
482498
public static class WriterSnapshotMapJsonSerde
483499
implements JsonSerializer<WriterSnapshotMap>, JsonDeserializer<WriterSnapshotMap> {

0 commit comments

Comments
 (0)