1919
2020import org .apache .fluss .config .ConfigOptions ;
2121import org .apache .fluss .config .Configuration ;
22+ import org .apache .fluss .exception .InvalidOffsetException ;
2223import org .apache .fluss .exception .LogSegmentOffsetOverflowException ;
2324import org .apache .fluss .exception .LogStorageException ;
2425import org .apache .fluss .metadata .LogFormat ;
3132import java .io .File ;
3233import java .io .IOException ;
3334import java .nio .file .Files ;
35+ import java .nio .file .NoSuchFileException ;
36+ import java .util .ArrayList ;
3437import java .util .Arrays ;
3538import java .util .Comparator ;
39+ import java .util .Iterator ;
40+ import java .util .List ;
41+ import java .util .stream .Collectors ;
3642
3743/* This file is based on source code of Apache Kafka Project (https://kafka.apache.org/), licensed by the Apache
3844 * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
@@ -117,6 +123,37 @@ public LoadedLogOffsets load() throws IOException {
117123 nextOffset , activeSegment .getBaseOffset (), activeSegment .getSizeInBytes ()));
118124 }
119125
126+ /**
127+ * Just recovers the given segment, without adding it to the provided segments.
128+ *
129+ * @param segment Segment to recover
130+ * @return The number of bytes truncated from the segment
131+ * @throws LogSegmentOffsetOverflowException if the segment contains messages that cause index
132+ * offset overflow
133+ */
134+ private int recoverSegment (LogSegment segment ) throws IOException {
135+ WriterStateManager writerStateManager =
136+ new WriterStateManager (
137+ logSegments .getTableBucket (),
138+ logTabletDir ,
139+ this .writerStateManager .writerExpirationMs ());
140+ // TODO, Here, we use 0 as the logStartOffset passed into rebuildWriterState. The reason is
141+ // that the current implementation of logStartOffset in Fluss is not yet fully refined, and
142+ // there may be cases where logStartOffset is not updated. As a result, logStartOffset is
143+ // not yet reliable. Once the issue with correctly updating logStartOffset is resolved in
144+ // issue https://github.com/apache/fluss/issues/744, we can use logStartOffset here.
145+ // Additionally, using 0 versus using logStartOffset does not affect correctness—they both
146+ // can restore the complete WriterState. The only difference is that using logStartOffset
147+ // can potentially skip over more segments.
148+ LogTablet .rebuildWriterState (
149+ writerStateManager , logSegments , 0 , segment .getBaseOffset (), false );
150+ int bytesTruncated = segment .recover ();
151+ // once we have recovered the segment's data, take a snapshot to ensure that we won't
152+ // need to reload the same segment again while recovering another segment.
153+ writerStateManager .takeSnapshot ();
154+ return bytesTruncated ;
155+ }
156+
120157 /**
121158 * Recover the log segments (if there was an unclean shutdown). Ensures there is at least one
122159 * active segment, and returns the updated recovery point and next offset after recovery.
@@ -129,14 +166,106 @@ public LoadedLogOffsets load() throws IOException {
129166 * overflow
130167 */
131168 private Tuple2 <Long , Long > recoverLog () throws IOException {
132- // TODO truncate log to recover maybe unflush segments.
169+ if (!isCleanShutdown ) {
170+ List <LogSegment > unflushed =
171+ logSegments .values (recoveryPointCheckpoint , Long .MAX_VALUE );
172+ int numUnflushed = unflushed .size ();
173+ Iterator <LogSegment > unflushedIter = unflushed .iterator ();
174+ boolean truncated = false ;
175+ int numFlushed = 1 ;
176+
177+ while (unflushedIter .hasNext () && !truncated ) {
178+ LogSegment segment = unflushedIter .next ();
179+ LOG .info (
180+ "Recovering unflushed segment {}. {}/{} recovered for bucket {}" ,
181+ segment .getBaseOffset (),
182+ numFlushed ,
183+ numUnflushed ,
184+ logSegments .getTableBucket ());
185+
186+ int truncatedBytes = -1 ;
187+ try {
188+ truncatedBytes = recoverSegment (segment );
189+ } catch (Exception e ) {
190+ if (e instanceof InvalidOffsetException ) {
191+ long startOffset = segment .getBaseOffset ();
192+ LOG .warn (
193+ "Found invalid offset during recovery for bucket {}. Deleting the corrupt segment "
194+ + "and creating an empty one with starting offset {}" ,
195+ logSegments .getTableBucket (),
196+ startOffset );
197+ truncatedBytes = segment .truncateTo (startOffset );
198+ } else {
199+ throw e ;
200+ }
201+ }
202+
203+ if (truncatedBytes > 0 ) {
204+ // we had an invalid message, delete all remaining log
205+ LOG .warn (
206+ "Corruption found in segment {} for bucket {}, truncating to offset {}" ,
207+ segment .getBaseOffset (),
208+ logSegments .getTableBucket (),
209+ segment .readNextOffset ());
210+ removeAndDeleteSegments (unflushedIter );
211+ truncated = true ;
212+ } else {
213+ numFlushed += 1 ;
214+ }
215+ }
216+ }
217+
133218 if (logSegments .isEmpty ()) {
219+ // TODO: use logStartOffset if issue https://github.com/apache/fluss/issues/744 ready
134220 logSegments .add (LogSegment .open (logTabletDir , 0L , conf , logFormat ));
135221 }
136222 long logEndOffset = logSegments .lastSegment ().get ().readNextOffset ();
137223 return Tuple2 .of (recoveryPointCheckpoint , logEndOffset );
138224 }
139225
226+ /**
227+ * This method deletes the given log segments and the associated writer snapshots.
228+ *
229+ * <p>This method does not need to convert IOException to {@link LogStorageException} because it
230+ * is either called before all logs are loaded or the immediate caller will catch and handle
231+ * IOException
232+ *
233+ * @param segmentsToDelete The log segments to schedule for deletion
234+ */
235+ private void removeAndDeleteSegments (Iterator <LogSegment > segmentsToDelete ) {
236+ if (segmentsToDelete .hasNext ()) {
237+ List <LogSegment > toDelete = new ArrayList <>();
238+ segmentsToDelete .forEachRemaining (toDelete ::add );
239+
240+ LOG .info (
241+ "Deleting segments for bucket {} as part of log recovery: {}" ,
242+ logSegments .getTableBucket (),
243+ toDelete .stream ().map (LogSegment ::toString ).collect (Collectors .joining ("," )));
244+ toDelete .forEach (segment -> logSegments .remove (segment .getBaseOffset ()));
245+
246+ try {
247+ LocalLog .deleteSegmentFiles (
248+ toDelete , LocalLog .SegmentDeletionReason .LOG_TRUNCATION );
249+ } catch (IOException e ) {
250+ LOG .error (
251+ "Failed to delete truncated segments {} for bucket {}" ,
252+ toDelete ,
253+ logSegments .getTableBucket (),
254+ e );
255+ }
256+
257+ try {
258+ LogTablet .deleteWriterSnapshots (toDelete , writerStateManager );
259+ } catch (IOException e ) {
260+ LOG .error (
261+ "Failed to delete truncated writer snapshots {} for bucket {}" ,
262+ toDelete ,
263+ logSegments .getTableBucket (),
264+ e );
265+ }
266+ }
267+ }
268+
140269 /** Loads segments from disk into the provided segments. */
141270 private void loadSegmentFiles () throws IOException {
142271 File [] sortedFiles = logTabletDir .listFiles ();
@@ -156,8 +285,28 @@ private void loadSegmentFiles() throws IOException {
156285 }
157286 } else if (LocalLog .isLogFile (file )) {
158287 long baseOffset = FlussPaths .offsetFromFile (file );
288+ boolean timeIndexFileNewlyCreated =
289+ !FlussPaths .timeIndexFile (logTabletDir , baseOffset ).exists ();
159290 LogSegment segment =
160291 LogSegment .open (logTabletDir , baseOffset , conf , true , 0 , logFormat );
292+
293+ try {
294+ segment .sanityCheck (timeIndexFileNewlyCreated );
295+ } catch (IOException e ) {
296+ if (e instanceof NoSuchFileException ) {
297+ if (isCleanShutdown
298+ || segment .getBaseOffset () < recoveryPointCheckpoint ) {
299+ LOG .error (
300+ "Could not find offset index file corresponding to log file {} "
301+ + "for bucket {}, recovering segment and rebuilding index files..." ,
302+ logSegments .getTableBucket (),
303+ segment .getFileLogRecords ().file ().getAbsoluteFile ());
304+ }
305+ recoverSegment (segment );
306+ } else {
307+ throw e ;
308+ }
309+ }
161310 logSegments .add (segment );
162311 }
163312 }
0 commit comments