Skip to content

Commit fd28911

Browse files
committed
add writer state log
1 parent 0f2c95a commit fd28911

File tree

3 files changed

+100
-57
lines changed

3 files changed

+100
-57
lines changed

fluss-server/src/main/java/org/apache/fluss/server/log/LogLoader.java

Lines changed: 80 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,9 @@
1919

2020
import org.apache.fluss.config.ConfigOptions;
2121
import org.apache.fluss.config.Configuration;
22-
import org.apache.fluss.exception.InvalidOffsetException;
2322
import org.apache.fluss.exception.LogSegmentOffsetOverflowException;
2423
import org.apache.fluss.exception.LogStorageException;
2524
import org.apache.fluss.metadata.LogFormat;
26-
import org.apache.fluss.server.exception.CorruptIndexException;
2725
import org.apache.fluss.utils.FlussPaths;
2826
import org.apache.fluss.utils.types.Tuple2;
2927

@@ -33,7 +31,6 @@
3331
import java.io.File;
3432
import java.io.IOException;
3533
import java.nio.file.Files;
36-
import java.nio.file.NoSuchFileException;
3734
import java.util.ArrayList;
3835
import java.util.Arrays;
3936
import java.util.Comparator;
@@ -113,8 +110,18 @@ public LoadedLogOffsets load() throws IOException {
113110
// Additionally, using 0 versus using logStartOffset does not affect correctness—they both
114111
// can restore the complete WriterState. The only difference is that using logStartOffset
115112
// can potentially skip over more segments.
113+
LOG.info(
114+
"In load for bucket {}, end offset {}, before rebuild: {}",
115+
logSegments.getTableBucket(),
116+
writerStateManager.mapEndOffset(),
117+
writerStateManager.toJsonString());
116118
LogTablet.rebuildWriterState(
117119
writerStateManager, logSegments, 0, nextOffset, isCleanShutdown);
120+
LOG.info(
121+
"In load for bucket {}, end offset {}, after rebuild: {}",
122+
logSegments.getTableBucket(),
123+
writerStateManager.mapEndOffset(),
124+
writerStateManager.toJsonString());
118125

119126
LogSegment activeSegment = logSegments.lastSegment().get();
120127
activeSegment.resizeIndexes((int) conf.get(ConfigOptions.LOG_INDEX_FILE_SIZE).getBytes());
@@ -136,60 +143,64 @@ public LoadedLogOffsets load() throws IOException {
136143
* overflow
137144
*/
138145
private Tuple2<Long, Long> recoverLog() throws IOException {
139-
if (!isCleanShutdown) {
140-
List<LogSegment> unflushed =
141-
logSegments.values(recoveryPointCheckpoint, Long.MAX_VALUE);
142-
int numUnflushed = unflushed.size();
143-
Iterator<LogSegment> unflushedIter = unflushed.iterator();
144-
boolean truncated = false;
145-
int numFlushed = 1;
146-
147-
while (unflushedIter.hasNext() && !truncated) {
148-
LogSegment segment = unflushedIter.next();
149-
LOG.info(
150-
"Recovering unflushed segment {}. {}/{} recovered for bucket {}",
151-
segment.getBaseOffset(),
152-
numFlushed,
153-
numUnflushed,
154-
logSegments.getTableBucket());
155-
156-
try {
157-
segment.sanityCheck();
158-
} catch (NoSuchFileException | CorruptIndexException e) {
159-
LOG.warn(
160-
"Found invalid index file corresponding log file {} for bucket {}, "
161-
+ "recovering segment and rebuilding index files...",
162-
segment.getFileLogRecords().file().getAbsoluteFile(),
163-
logSegments.getTableBucket(),
164-
e);
165-
166-
int truncatedBytes = -1;
167-
try {
168-
truncatedBytes = recoverSegment(segment);
169-
} catch (InvalidOffsetException invalidOffsetException) {
170-
long startOffset = segment.getBaseOffset();
171-
LOG.warn(
172-
"Found invalid offset during recovery for bucket {}. Deleting the corrupt segment "
173-
+ "and creating an empty one with starting offset {}",
174-
logSegments.getTableBucket(),
175-
startOffset);
176-
truncatedBytes = segment.truncateTo(startOffset);
177-
}
178-
179-
if (truncatedBytes > 0) {
180-
// we had an invalid message, delete all remaining log
181-
LOG.warn(
182-
"Corruption found in segment {} for bucket {}, truncating to offset {}",
183-
segment.getBaseOffset(),
184-
logSegments.getTableBucket(),
185-
segment.readNextOffset());
186-
removeAndDeleteSegments(unflushedIter);
187-
truncated = true;
188-
}
189-
}
190-
numFlushed += 1;
191-
}
192-
}
146+
// if (!isCleanShutdown) {
147+
// List<LogSegment> unflushed =
148+
// logSegments.values(recoveryPointCheckpoint, Long.MAX_VALUE);
149+
// int numUnflushed = unflushed.size();
150+
// Iterator<LogSegment> unflushedIter = unflushed.iterator();
151+
// boolean truncated = false;
152+
// int numFlushed = 1;
153+
//
154+
// while (unflushedIter.hasNext() && !truncated) {
155+
// LogSegment segment = unflushedIter.next();
156+
// LOG.info(
157+
// "Recovering unflushed segment {}. {}/{} recovered for bucket {}",
158+
// segment.getBaseOffset(),
159+
// numFlushed,
160+
// numUnflushed,
161+
// logSegments.getTableBucket());
162+
//
163+
// try {
164+
// segment.sanityCheck();
165+
// } catch (NoSuchFileException | CorruptIndexException e) {
166+
// LOG.warn(
167+
// "Found invalid index file corresponding log file {} for bucket
168+
// {}, "
169+
// + "recovering segment and rebuilding index files...",
170+
// segment.getFileLogRecords().file().getAbsoluteFile(),
171+
// logSegments.getTableBucket(),
172+
// e);
173+
//
174+
// int truncatedBytes = -1;
175+
// try {
176+
// truncatedBytes = recoverSegment(segment);
177+
// } catch (InvalidOffsetException invalidOffsetException) {
178+
// long startOffset = segment.getBaseOffset();
179+
// LOG.warn(
180+
// "Found invalid offset during recovery for bucket {}.
181+
// Deleting the corrupt segment "
182+
// + "and creating an empty one with starting offset
183+
// {}",
184+
// logSegments.getTableBucket(),
185+
// startOffset);
186+
// truncatedBytes = segment.truncateTo(startOffset);
187+
// }
188+
//
189+
// if (truncatedBytes > 0) {
190+
// // we had an invalid message, delete all remaining log
191+
// LOG.warn(
192+
// "Corruption found in segment {} for bucket {}, truncating
193+
// to offset {}",
194+
// segment.getBaseOffset(),
195+
// logSegments.getTableBucket(),
196+
// segment.readNextOffset());
197+
// removeAndDeleteSegments(unflushedIter);
198+
// truncated = true;
199+
// }
200+
// }
201+
// numFlushed += 1;
202+
// }
203+
// }
193204

194205
// TODO truncate log to recover maybe unflush segments.
195206
if (logSegments.isEmpty()) {
@@ -264,12 +275,24 @@ private int recoverSegment(LogSegment segment) throws IOException {
264275
// Additionally, using 0 versus using logStartOffset does not affect correctness—they both
265276
// can restore the complete WriterState. The only difference is that using logStartOffset
266277
// can potentially skip over more segments.
278+
LOG.info(
279+
"In recoverSegment for bucket {} for segment {}, end offset {}, before rebuild: {}",
280+
logSegments.getTableBucket(),
281+
segment.getBaseOffset(),
282+
writerStateManager.mapEndOffset(),
283+
writerStateManager.toJsonString());
267284
LogTablet.rebuildWriterState(
268285
writerStateManager, logSegments, 0, segment.getBaseOffset(), false);
269286
int bytesTruncated = segment.recover();
270287
// once we have recovered the segment's data, take a snapshot to ensure that we won't
271288
// need to reload the same segment again while recovering another segment.
272289
writerStateManager.takeSnapshot();
290+
LOG.info(
291+
"In recoverSegment for bucket {} for segment {}, end offset {}, after rebuild: {}",
292+
logSegments.getTableBucket(),
293+
segment.getBaseOffset(),
294+
writerStateManager.mapEndOffset(),
295+
writerStateManager.toJsonString());
273296
return bytesTruncated;
274297
}
275298

fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1256,6 +1256,10 @@ static void rebuildWriterState(
12561256
FetchDataInfo fetchDataInfo =
12571257
segment.read(startOffset, Integer.MAX_VALUE, maxPosition, false);
12581258
if (fetchDataInfo != null) {
1259+
LOG.info(
1260+
"Loading WriterState for bucket {} from segment {}",
1261+
segments.getTableBucket(),
1262+
segment);
12591263
loadWritersFromRecords(writerStateManager, fetchDataInfo.getRecords());
12601264
}
12611265
}

fluss-server/src/main/java/org/apache/fluss/server/log/WriterStateManager.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -478,6 +478,22 @@ private static void writeSnapshot(File file, Map<Long, WriterStateEntry> entries
478478
}
479479
}
480480

481+
public String toJsonString() {
482+
List<WriterSnapshotEntry> snapshotEntries = new ArrayList<>();
483+
writers.forEach(
484+
(writerId, writerStateEntry) ->
485+
snapshotEntries.add(
486+
new WriterSnapshotEntry(
487+
writerId,
488+
writerStateEntry.lastBatchSequence(),
489+
writerStateEntry.lastDataOffset(),
490+
writerStateEntry.lastOffsetDelta(),
491+
writerStateEntry.lastBatchTimestamp())));
492+
byte[] jsonBytes = new WriterSnapshotMap(snapshotEntries).toJsonBytes();
493+
494+
return new String(jsonBytes);
495+
}
496+
481497
/** Writer snapshot map json serde. */
482498
public static class WriterSnapshotMapJsonSerde
483499
implements JsonSerializer<WriterSnapshotMap>, JsonDeserializer<WriterSnapshotMap> {

0 commit comments

Comments
 (0)