1919
2020import org .apache .fluss .config .ConfigOptions ;
2121import org .apache .fluss .config .Configuration ;
22+ import org .apache .fluss .exception .InvalidOffsetException ;
2223import org .apache .fluss .exception .LogSegmentOffsetOverflowException ;
2324import org .apache .fluss .exception .LogStorageException ;
2425import org .apache .fluss .metadata .LogFormat ;
3132import java .io .File ;
3233import java .io .IOException ;
3334import java .nio .file .Files ;
35+ import java .util .ArrayList ;
3436import java .util .Arrays ;
3537import java .util .Comparator ;
38+ import java .util .Iterator ;
39+ import java .util .List ;
40+ import java .util .stream .Collectors ;
3641
3742/* This file is based on source code of Apache Kafka Project (https://kafka.apache.org/), licensed by the Apache
3843 * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
@@ -76,7 +81,7 @@ public LogLoader(
7681 *
7782 * @return the offsets of the Log successfully loaded from disk
7883 */
79- public LoadedLogOffsets load () throws IOException {
84+ public LoadedLogOffsets load () throws Exception {
8085 // load all the log and index files.
8186 logSegments .close ();
8287 logSegments .clear ();
@@ -117,6 +122,37 @@ public LoadedLogOffsets load() throws IOException {
117122 nextOffset , activeSegment .getBaseOffset (), activeSegment .getSizeInBytes ()));
118123 }
119124
125+ /**
126+ * Just recovers the given segment, without adding it to the provided segments.
127+ *
128+ * @param segment Segment to recover
129+ * @return The number of bytes truncated from the segment
130+ * @throws LogSegmentOffsetOverflowException if the segment contains messages that cause index
131+ * offset overflow
132+ */
133+ private int recoverSegment (LogSegment segment ) throws Exception {
134+ WriterStateManager writerStateManager =
135+ new WriterStateManager (
136+ logSegments .getTableBucket (),
137+ logTabletDir ,
138+ this .writerStateManager .writerExpirationMs ());
139+ // TODO, Here, we use 0 as the logStartOffset passed into rebuildWriterState. The reason is
140+ // that the current implementation of logStartOffset in Fluss is not yet fully refined, and
141+ // there may be cases where logStartOffset is not updated. As a result, logStartOffset is
142+ // not yet reliable. Once the issue with correctly updating logStartOffset is resolved in
143+ // issue https://github.com/apache/fluss/issues/744, we can use logStartOffset here.
144+ // Additionally, using 0 versus using logStartOffset does not affect correctness—they both
145+ // can restore the complete WriterState. The only difference is that using logStartOffset
146+ // can potentially skip over more segments.
147+ LogTablet .rebuildWriterState (
148+ writerStateManager , logSegments , 0 , segment .getBaseOffset (), false );
149+ int bytesTruncated = segment .recover ();
150+ // once we have recovered the segment's data, take a snapshot to ensure that we won't
151+ // need to reload the same segment again while recovering another segment.
152+ writerStateManager .takeSnapshot ();
153+ return bytesTruncated ;
154+ }
155+
120156 /**
121157 * Recover the log segments (if there was an unclean shutdown). Ensures there is at least one
122158 * active segment, and returns the updated recovery point and next offset after recovery.
@@ -129,16 +165,106 @@ public LoadedLogOffsets load() throws IOException {
129165 * overflow
130166 */
131167 private Tuple2 <Long , Long > recoverLog () throws IOException {
132- // TODO truncate log to recover maybe unflush segments.
168+ if (!isCleanShutdown ) {
169+ List <LogSegment > unflushed =
170+ logSegments .values (recoveryPointCheckpoint , Long .MAX_VALUE );
171+ int numUnflushed = unflushed .size ();
172+ Iterator <LogSegment > unflushedIter = unflushed .iterator ();
173+ boolean truncated = false ;
174+ int numFlushed = 1 ;
175+
176+ while (unflushedIter .hasNext () && !truncated ) {
177+ LogSegment segment = unflushedIter .next ();
178+ LOG .info (
179+ "Recovering unflushed segment {}. {}/{} recovered for bucket {}" ,
180+ segment .getBaseOffset (),
181+ numFlushed ,
182+ numUnflushed ,
183+ logSegments .getTableBucket ());
184+
185+ int truncatedBytes = -1 ;
186+ try {
187+ truncatedBytes = recoverSegment (segment );
188+ } catch (Exception e ) {
189+ if (e instanceof InvalidOffsetException ) {
190+ long startOffset = segment .getBaseOffset ();
191+ LOG .warn (
192+ "Found invalid offset during recovery for bucket {}. Deleting the corrupt segment "
193+ + "and creating an empty one with starting offset {}" ,
194+ logSegments .getTableBucket (),
195+ startOffset );
196+ truncatedBytes = segment .truncateTo (startOffset );
197+ }
198+ }
199+
200+ if (truncatedBytes > 0 ) {
201+ // we had an invalid message, delete all remaining log
202+ LOG .warn (
203+ "Corruption found in segment {} for bucket {}, truncating to offset {}" ,
204+ segment .getBaseOffset (),
205+ logSegments .getTableBucket (),
206+ segment .readNextOffset ());
207+ removeAndDeleteSegments (unflushedIter );
208+ truncated = true ;
209+ } else {
210+ numFlushed += 1 ;
211+ }
212+ }
213+ }
214+
133215 if (logSegments .isEmpty ()) {
216+ // TODO: use logStartOffset if issue https://github.com/apache/fluss/issues/744 ready
134217 logSegments .add (LogSegment .open (logTabletDir , 0L , conf , logFormat ));
135218 }
136219 long logEndOffset = logSegments .lastSegment ().get ().readNextOffset ();
137220 return Tuple2 .of (recoveryPointCheckpoint , logEndOffset );
138221 }
139222
223+ /**
224+ * This method deletes the given log segments and the associated writer snapshots.
225+ *
226+ * <p>This method does not need to convert IOException to {@link LogStorageException} because it
227+ * is either called before all logs are loaded or the immediate caller will catch and handle
228+ * IOException
229+ *
230+ * @param segmentsToDelete The log segments to schedule for deletion
231+ */
232+ private void removeAndDeleteSegments (Iterator <LogSegment > segmentsToDelete ) {
233+ if (segmentsToDelete .hasNext ()) {
234+ List <LogSegment > toDelete = new ArrayList <>();
235+ segmentsToDelete .forEachRemaining (toDelete ::add );
236+
237+ LOG .info (
238+ "Deleting segments for bucket {} as part of log recovery: {}" ,
239+ logSegments .getTableBucket (),
240+ toDelete .stream ().map (LogSegment ::toString ).collect (Collectors .joining ("," )));
241+ toDelete .forEach (segment -> logSegments .remove (segment .getBaseOffset ()));
242+
243+ try {
244+ LocalLog .deleteSegmentFiles (
245+ toDelete , LocalLog .SegmentDeletionReason .LOG_TRUNCATION );
246+ } catch (IOException e ) {
247+ LOG .error (
248+ "Failed to delete truncated segments {} for bucket {}" ,
249+ toDelete ,
250+ logSegments .getTableBucket (),
251+ e );
252+ }
253+
254+ try {
255+ LogTablet .deleteWriterSnapshots (toDelete , writerStateManager );
256+ } catch (IOException e ) {
257+ LOG .error (
258+ "Failed to delete truncated writer snapshots {} for bucket {}" ,
259+ toDelete ,
260+ logSegments .getTableBucket (),
261+ e );
262+ }
263+ }
264+ }
265+
140266 /** Loads segments from disk into the provided segments. */
141- private void loadSegmentFiles () throws IOException {
267+ private void loadSegmentFiles () throws Exception {
142268 File [] sortedFiles = logTabletDir .listFiles ();
143269 if (sortedFiles != null ) {
144270 Arrays .sort (sortedFiles , Comparator .comparing (File ::getName ));
@@ -155,8 +281,26 @@ private void loadSegmentFiles() throws IOException {
155281 }
156282 } else if (LocalLog .isLogFile (file )) {
157283 long baseOffset = FlussPaths .offsetFromFile (file );
284+ boolean timeIndexFileNewlyCreated =
285+ !FlussPaths .timeIndexFile (logTabletDir , baseOffset ).exists ();
158286 LogSegment segment =
159287 LogSegment .open (logTabletDir , baseOffset , conf , true , 0 , logFormat );
288+
289+ try {
290+ segment .sanityCheck (timeIndexFileNewlyCreated );
291+ } catch (Exception e ) {
292+ if (e instanceof NoSuchFieldException ) {
293+ if (isCleanShutdown
294+ || segment .getBaseOffset () < recoveryPointCheckpoint ) {
295+ LOG .error (
296+ "Could not find offset index file corresponding to log file {} "
297+ + "for bucket {}, recovering segment and rebuilding index files..." ,
298+ logSegments .getTableBucket (),
299+ segment .getFileLogRecords ().file ().getAbsoluteFile ());
300+ }
301+ recoverSegment (segment );
302+ }
303+ }
160304 logSegments .add (segment );
161305 }
162306 }
0 commit comments