1919
2020import org .apache .fluss .config .ConfigOptions ;
2121import org .apache .fluss .config .Configuration ;
22+ import org .apache .fluss .exception .InvalidOffsetException ;
2223import org .apache .fluss .exception .LogSegmentOffsetOverflowException ;
2324import org .apache .fluss .exception .LogStorageException ;
2425import org .apache .fluss .metadata .LogFormat ;
26+ import org .apache .fluss .server .exception .CorruptIndexException ;
2527import org .apache .fluss .utils .FlussPaths ;
2628import org .apache .fluss .utils .types .Tuple2 ;
2729
3133import java .io .File ;
3234import java .io .IOException ;
3335import java .nio .file .Files ;
36+ import java .nio .file .NoSuchFileException ;
37+ import java .util .ArrayList ;
3438import java .util .Arrays ;
3539import java .util .Comparator ;
40+ import java .util .Iterator ;
41+ import java .util .List ;
42+ import java .util .stream .Collectors ;
3643
3744/* This file is based on source code of Apache Kafka Project (https://kafka.apache.org/), licensed by the Apache
3845 * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
@@ -129,6 +136,61 @@ public LoadedLogOffsets load() throws IOException {
129136 * overflow
130137 */
131138 private Tuple2 <Long , Long > recoverLog () throws IOException {
139+ if (!isCleanShutdown ) {
140+ List <LogSegment > unflushed =
141+ logSegments .values (recoveryPointCheckpoint , Long .MAX_VALUE );
142+ int numUnflushed = unflushed .size ();
143+ Iterator <LogSegment > unflushedIter = unflushed .iterator ();
144+ boolean truncated = false ;
145+ int numFlushed = 1 ;
146+
147+ while (unflushedIter .hasNext () && !truncated ) {
148+ LogSegment segment = unflushedIter .next ();
149+ LOG .info (
150+ "Recovering unflushed segment {}. {}/{} recovered for bucket {}" ,
151+ segment .getBaseOffset (),
152+ numFlushed ,
153+ numUnflushed ,
154+ logSegments .getTableBucket ());
155+
156+ try {
157+ segment .sanityCheck ();
158+ } catch (NoSuchFileException | CorruptIndexException e ) {
159+ LOG .warn (
160+ "Found invalid index file corresponding log file {} for bucket {}, "
161+ + "recovering segment and rebuilding index files..." ,
162+ segment .getFileLogRecords ().file ().getAbsoluteFile (),
163+ logSegments .getTableBucket (),
164+ e );
165+
166+ int truncatedBytes = -1 ;
167+ try {
168+ truncatedBytes = recoverSegment (segment );
169+ } catch (InvalidOffsetException invalidOffsetException ) {
170+ long startOffset = segment .getBaseOffset ();
171+ LOG .warn (
172+ "Found invalid offset during recovery for bucket {}. Deleting the corrupt segment "
173+ + "and creating an empty one with starting offset {}" ,
174+ logSegments .getTableBucket (),
175+ startOffset );
176+ truncatedBytes = segment .truncateTo (startOffset );
177+ }
178+
179+ if (truncatedBytes > 0 ) {
180+ // we had an invalid message, delete all remaining log
181+ LOG .warn (
182+ "Corruption found in segment {} for bucket {}, truncating to offset {}" ,
183+ segment .getBaseOffset (),
184+ logSegments .getTableBucket (),
185+ segment .readNextOffset ());
186+ removeAndDeleteSegments (unflushedIter );
187+ truncated = true ;
188+ }
189+ }
190+ numFlushed += 1 ;
191+ }
192+ }
193+
132194 // TODO truncate log to recover maybe unflush segments.
133195 if (logSegments .isEmpty ()) {
134196 logSegments .add (LogSegment .open (logTabletDir , 0L , conf , logFormat ));
@@ -137,6 +199,80 @@ private Tuple2<Long, Long> recoverLog() throws IOException {
137199 return Tuple2 .of (recoveryPointCheckpoint , logEndOffset );
138200 }
139201
202+ /**
203+ * This method deletes the given log segments and the associated writer snapshots.
204+ *
205+ * <p>This method does not need to convert IOException to {@link LogStorageException} because it
206+ * is either called before all logs are loaded or the immediate caller will catch and handle
207+ * IOException
208+ *
209+ * @param segmentsToDelete The log segments to schedule for deletion
210+ */
211+ private void removeAndDeleteSegments (Iterator <LogSegment > segmentsToDelete ) {
212+ if (segmentsToDelete .hasNext ()) {
213+ List <LogSegment > toDelete = new ArrayList <>();
214+ segmentsToDelete .forEachRemaining (toDelete ::add );
215+
216+ LOG .info (
217+ "Deleting segments for bucket {} as part of log recovery: {}" ,
218+ logSegments .getTableBucket (),
219+ toDelete .stream ().map (LogSegment ::toString ).collect (Collectors .joining ("," )));
220+ toDelete .forEach (segment -> logSegments .remove (segment .getBaseOffset ()));
221+
222+ try {
223+ LocalLog .deleteSegmentFiles (
224+ toDelete , LocalLog .SegmentDeletionReason .LOG_TRUNCATION );
225+ } catch (IOException e ) {
226+ LOG .error (
227+ "Failed to delete truncated segments {} for bucket {}" ,
228+ toDelete ,
229+ logSegments .getTableBucket (),
230+ e );
231+ }
232+
233+ try {
234+ LogTablet .deleteWriterSnapshots (toDelete , writerStateManager );
235+ } catch (IOException e ) {
236+ LOG .error (
237+ "Failed to delete truncated writer snapshots {} for bucket {}" ,
238+ toDelete ,
239+ logSegments .getTableBucket (),
240+ e );
241+ }
242+ }
243+ }
244+
245+ /**
246+ * Just recovers the given segment, without adding it to the provided segments.
247+ *
248+ * @param segment Segment to recover
249+ * @return The number of bytes truncated from the segment
250+ * @throws LogSegmentOffsetOverflowException if the segment contains messages that cause index
251+ * offset overflow
252+ */
253+ private int recoverSegment (LogSegment segment ) throws IOException {
254+ WriterStateManager writerStateManager =
255+ new WriterStateManager (
256+ logSegments .getTableBucket (),
257+ logTabletDir ,
258+ this .writerStateManager .writerExpirationMs ());
259+ // TODO, Here, we use 0 as the logStartOffset passed into rebuildWriterState. The reason is
260+ // that the current implementation of logStartOffset in Fluss is not yet fully refined, and
261+ // there may be cases where logStartOffset is not updated. As a result, logStartOffset is
262+ // not yet reliable. Once the issue with correctly updating logStartOffset is resolved in
263+ // issue https://github.com/apache/fluss/issues/744, we can use logStartOffset here.
264+ // Additionally, using 0 versus using logStartOffset does not affect correctness—they both
265+ // can restore the complete WriterState. The only difference is that using logStartOffset
266+ // can potentially skip over more segments.
267+ LogTablet .rebuildWriterState (
268+ writerStateManager , logSegments , 0 , segment .getBaseOffset (), false );
269+ int bytesTruncated = segment .recover ();
270+ // once we have recovered the segment's data, take a snapshot to ensure that we won't
271+ // need to reload the same segment again while recovering another segment.
272+ writerStateManager .takeSnapshot ();
273+ return bytesTruncated ;
274+ }
275+
140276 /** Loads segments from disk into the provided segments. */
141277 private void loadSegmentFiles () throws IOException {
142278 File [] sortedFiles = logTabletDir .listFiles ();
0 commit comments