Skip to content

Commit 4ad73b2

Browse files
committed
[server] Use logStartOffset when rebuild writer state
1 parent 96b46f3 commit 4ad73b2

File tree

5 files changed

+391
-32
lines changed

5 files changed

+391
-32
lines changed

fluss-server/src/main/java/org/apache/fluss/server/log/LogLoader.java

Lines changed: 10 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -107,17 +107,12 @@ public LoadedLogOffsets load() throws IOException {
107107
// WriterStateManager used during log recovery may have deleted some files without the
108108
// LogLoader.writerStateManager instance witnessing the deletion.
109109
writerStateManager.removeStraySnapshots(logSegments.baseOffsets());
110-
111-
// TODO, Here, we use 0 as the logStartOffset passed into rebuildWriterState. The reason is
112-
// that the current implementation of logStartOffset in Fluss is not yet fully refined, and
113-
// there may be cases where logStartOffset is not updated. As a result, logStartOffset is
114-
// not yet reliable. Once the issue with correctly updating logStartOffset is resolved in
115-
// issue https://github.com/apache/fluss/issues/744, we can use logStartOffset here.
116-
// Additionally, using 0 versus using logStartOffset does not affect correctness—they both
117-
// can restore the complete WriterState. The only difference is that using logStartOffset
118-
// can potentially skip over more segments.
119110
LogTablet.rebuildWriterState(
120-
writerStateManager, logSegments, 0, nextOffset, isCleanShutdown);
111+
writerStateManager,
112+
logSegments,
113+
logStartOffsetCheckpoint,
114+
nextOffset,
115+
isCleanShutdown);
121116

122117
LogSegment activeSegment = logSegments.lastSegment().get();
123118
activeSegment.resizeIndexes((int) conf.get(ConfigOptions.LOG_INDEX_FILE_SIZE).getBytes());
@@ -261,16 +256,12 @@ private int recoverSegment(LogSegment segment) throws IOException {
261256
logSegments.getTableBucket(),
262257
logTabletDir,
263258
this.writerStateManager.writerExpirationMs());
264-
// TODO, Here, we use 0 as the logStartOffset passed into rebuildWriterState. The reason is
265-
// that the current implementation of logStartOffset in Fluss is not yet fully refined, and
266-
// there may be cases where logStartOffset is not updated. As a result, logStartOffset is
267-
// not yet reliable. Once the issue with correctly updating logStartOffset is resolved in
268-
// issue https://github.com/apache/fluss/issues/744, we can use logStartOffset here.
269-
// Additionally, using 0 versus using logStartOffset does not affect correctness—they both
270-
// can restore the complete WriterState. The only difference is that using logStartOffset
271-
// can potentially skip over more segments.
272259
LogTablet.rebuildWriterState(
273-
writerStateManager, logSegments, 0, segment.getBaseOffset(), false);
260+
writerStateManager,
261+
logSegments,
262+
logStartOffsetCheckpoint,
263+
segment.getBaseOffset(),
264+
false);
274265
int bytesTruncated = segment.recover();
275266
// once we have recovered the segment's data, take a snapshot to ensure that we won't
276267
// need to reload the same segment again while recovering another segment.

fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1178,16 +1178,8 @@ private void rebuildWriterState(long lastOffset, WriterStateManager writerStateM
11781178
throws IOException {
11791179
synchronized (lock) {
11801180
localLog.checkIfMemoryMappedBufferClosed();
1181-
// TODO, Here, we use 0 as the logStartOffset passed into rebuildWriterState. The reason
1182-
// is that the current implementation of logStartOffset in Fluss is not yet fully
1183-
// refined, and there may be cases where logStartOffset is not updated. As a result,
1184-
// logStartOffset is not yet reliable. Once the issue with correctly updating
1185-
// logStartOffset is resolved in issue https://github.com/apache/fluss/issues/744, we
1186-
// can use logStartOffset here.
1187-
// Additionally, using 0 versus using logStartOffset does not affect correctness—they
1188-
// both can restore the complete WriterState. The only difference is that using
1189-
// logStartOffset can potentially skip over more segments.
1190-
rebuildWriterState(writerStateManager, localLog.getSegments(), 0, lastOffset, false);
1181+
rebuildWriterState(
1182+
writerStateManager, localLog.getSegments(), logStartOffset, lastOffset, false);
11911183
}
11921184
}
11931185

@@ -1386,7 +1378,7 @@ private static void loadWritersFromRecords(
13861378
Map<Long, WriterAppendInfo> loadedWriters = new HashMap<>();
13871379
for (LogRecordBatch batch : records.batches()) {
13881380
if (batch.hasWriterId()) {
1389-
updateWriterAppendInfo(writerStateManager, batch, loadedWriters, true);
1381+
updateWriterAppendInfo(writerStateManager, batch, loadedWriters, false);
13901382
}
13911383
}
13921384
loadedWriters.values().forEach(writerStateManager::update);

fluss-server/src/main/java/org/apache/fluss/server/log/WriterStateManager.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,16 @@ public void deleteSnapshotsBefore(long offset) throws IOException {
228228
}
229229
}
230230

231+
/**
232+
* Deletes the writer snapshot files after the given offset (exclusive) in a thread safe manner.
233+
*/
234+
@VisibleForTesting
235+
public void deleteSnapshotAfter(long offset) throws IOException {
236+
for (SnapshotFile snapshot : snapshots.tailMap(offset, false).values()) {
237+
removeAndDeleteSnapshot(snapshot.offset);
238+
}
239+
}
240+
231241
/** Fetch the snapshot file for the end offset of the log segment. */
232242
public Optional<File> fetchSnapshot(long offset) {
233243
return Optional.ofNullable(snapshots.get(offset)).map(SnapshotFile::file);

0 commit comments

Comments
 (0)