Skip to content

Commit 643d434

Browse files
committed
add writer state log
1 parent 0f2c95a commit 643d434

File tree

3 files changed

+42
-0
lines changed

3 files changed

+42
-0
lines changed

fluss-server/src/main/java/org/apache/fluss/server/log/LogLoader.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,18 @@ public LoadedLogOffsets load() throws IOException {
113113
// Additionally, using 0 versus using logStartOffset does not affect correctness—they both
114114
// can restore the complete WriterState. The only difference is that using logStartOffset
115115
// can potentially skip over more segments.
116+
LOG.info(
117+
"In load for bucket {}, end offset {}, before rebuild: {}",
118+
logSegments.getTableBucket(),
119+
writerStateManager.mapEndOffset(),
120+
writerStateManager.toJsonString());
116121
LogTablet.rebuildWriterState(
117122
writerStateManager, logSegments, 0, nextOffset, isCleanShutdown);
123+
LOG.info(
124+
"In load for bucket {}, end offset {}, after rebuild: {}",
125+
logSegments.getTableBucket(),
126+
writerStateManager.mapEndOffset(),
127+
writerStateManager.toJsonString());
118128

119129
LogSegment activeSegment = logSegments.lastSegment().get();
120130
activeSegment.resizeIndexes((int) conf.get(ConfigOptions.LOG_INDEX_FILE_SIZE).getBytes());
@@ -264,12 +274,24 @@ private int recoverSegment(LogSegment segment) throws IOException {
264274
// Additionally, using 0 versus using logStartOffset does not affect correctness—they both
265275
// can restore the complete WriterState. The only difference is that using logStartOffset
266276
// can potentially skip over more segments.
277+
LOG.info(
278+
"In recoverSegment for bucket {} for segment {}, end offset {}, before rebuild: {}",
279+
logSegments.getTableBucket(),
280+
segment.getBaseOffset(),
281+
writerStateManager.mapEndOffset(),
282+
writerStateManager.toJsonString());
267283
LogTablet.rebuildWriterState(
268284
writerStateManager, logSegments, 0, segment.getBaseOffset(), false);
269285
int bytesTruncated = segment.recover();
270286
// once we have recovered the segment's data, take a snapshot to ensure that we won't
271287
// need to reload the same segment again while recovering another segment.
272288
writerStateManager.takeSnapshot();
289+
LOG.info(
290+
"In recoverSegment for bucket {} for segment {}, end offset {}, after rebuild: {}",
291+
logSegments.getTableBucket(),
292+
segment.getBaseOffset(),
293+
writerStateManager.mapEndOffset(),
294+
writerStateManager.toJsonString());
273295
return bytesTruncated;
274296
}
275297

fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1256,6 +1256,10 @@ static void rebuildWriterState(
12561256
FetchDataInfo fetchDataInfo =
12571257
segment.read(startOffset, Integer.MAX_VALUE, maxPosition, false);
12581258
if (fetchDataInfo != null) {
1259+
LOG.info(
1260+
"Loading writer state for bucket {} from segment {}",
1261+
segments.getTableBucket(),
1262+
segment);
12591263
loadWritersFromRecords(writerStateManager, fetchDataInfo.getRecords());
12601264
}
12611265
}

fluss-server/src/main/java/org/apache/fluss/server/log/WriterStateManager.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -478,6 +478,22 @@ private static void writeSnapshot(File file, Map<Long, WriterStateEntry> entries
478478
}
479479
}
480480

481+
public String toJsonString() {
482+
List<WriterSnapshotEntry> snapshotEntries = new ArrayList<>();
483+
writers.forEach(
484+
(writerId, writerStateEntry) ->
485+
snapshotEntries.add(
486+
new WriterSnapshotEntry(
487+
writerId,
488+
writerStateEntry.lastBatchSequence(),
489+
writerStateEntry.lastDataOffset(),
490+
writerStateEntry.lastOffsetDelta(),
491+
writerStateEntry.lastBatchTimestamp())));
492+
byte[] jsonBytes = new WriterSnapshotMap(snapshotEntries).toJsonBytes();
493+
494+
return new String(jsonBytes);
495+
}
496+
481497
/** Writer snapshot map json serde. */
482498
public static class WriterSnapshotMapJsonSerde
483499
implements JsonSerializer<WriterSnapshotMap>, JsonDeserializer<WriterSnapshotMap> {

0 commit comments

Comments
 (0)