1919
2020import org .apache .fluss .config .ConfigOptions ;
2121import org .apache .fluss .config .Configuration ;
22- import org .apache .fluss .exception .InvalidOffsetException ;
2322import org .apache .fluss .exception .LogSegmentOffsetOverflowException ;
2423import org .apache .fluss .exception .LogStorageException ;
2524import org .apache .fluss .metadata .LogFormat ;
25+ import org .apache .fluss .server .exception .CorruptIndexException ;
2626import org .apache .fluss .utils .FlussPaths ;
2727import org .apache .fluss .utils .types .Tuple2 ;
2828
3333import java .io .IOException ;
3434import java .nio .file .Files ;
3535import java .nio .file .NoSuchFileException ;
36- import java .util .ArrayList ;
3736import java .util .Arrays ;
3837import java .util .Comparator ;
39- import java .util .Iterator ;
40- import java .util .List ;
41- import java .util .stream .Collectors ;
4238
4339/* This file is based on source code of Apache Kafka Project (https://kafka.apache.org/), licensed by the Apache
4440 * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
@@ -172,61 +168,6 @@ private int recoverSegment(LogSegment segment) throws IOException {
172168 * overflow
173169 */
174170 private Tuple2 <Long , Long > recoverLog () throws IOException {
175- if (!isCleanShutdown ) {
176- List <LogSegment > unflushed =
177- logSegments .values (recoveryPointCheckpoint , Long .MAX_VALUE );
178- int numUnflushed = unflushed .size ();
179- Iterator <LogSegment > unflushedIter = unflushed .iterator ();
180- boolean truncated = false ;
181- int numFlushed = 1 ;
182-
183- long startTimeMs = System .currentTimeMillis ();
184- while (unflushedIter .hasNext () && !truncated ) {
185- LogSegment segment = unflushedIter .next ();
186- LOG .info (
187- "Recovering unflushed segment {}. {}/{} recovered for bucket {}" ,
188- segment .getBaseOffset (),
189- numFlushed ,
190- numUnflushed ,
191- logSegments .getTableBucket ());
192-
193- int truncatedBytes = -1 ;
194- try {
195- truncatedBytes = recoverSegment (segment );
196- } catch (Exception e ) {
197- if (e instanceof InvalidOffsetException ) {
198- long startOffset = segment .getBaseOffset ();
199- LOG .warn (
200- "Found invalid offset during recovery for bucket {}. Deleting the corrupt segment "
201- + "and creating an empty one with starting offset {}" ,
202- logSegments .getTableBucket (),
203- startOffset );
204- truncatedBytes = segment .truncateTo (startOffset );
205- } else {
206- throw e ;
207- }
208- }
209-
210- if (truncatedBytes > 0 ) {
211- // we had an invalid message, delete all remaining log
212- LOG .warn (
213- "Corruption found in segment {} for bucket {}, truncating to offset {}" ,
214- segment .getBaseOffset (),
215- logSegments .getTableBucket (),
216- segment .readNextOffset ());
217- removeAndDeleteSegments (unflushedIter );
218- truncated = true ;
219- } else {
220- numFlushed += 1 ;
221- }
222- }
223-
224- LOG .info (
225- "Recovery--11 for bucket {} completed in {} ms" ,
226- logSegments .getTableBucket (),
227- System .currentTimeMillis () - startTimeMs );
228- }
229-
230171 if (logSegments .isEmpty ()) {
231172 // TODO: use logStartOffset if issue https://github.com/apache/fluss/issues/744 ready
232173 logSegments .add (LogSegment .open (logTabletDir , 0L , conf , logFormat ));
@@ -235,49 +176,6 @@ private Tuple2<Long, Long> recoverLog() throws IOException {
235176 return Tuple2 .of (recoveryPointCheckpoint , logEndOffset );
236177 }
237178
238- /**
239- * This method deletes the given log segments and the associated writer snapshots.
240- *
241- * <p>This method does not need to convert IOException to {@link LogStorageException} because it
242- * is either called before all logs are loaded or the immediate caller will catch and handle
243- * IOException
244- *
245- * @param segmentsToDelete The log segments to schedule for deletion
246- */
247- private void removeAndDeleteSegments (Iterator <LogSegment > segmentsToDelete ) {
248- if (segmentsToDelete .hasNext ()) {
249- List <LogSegment > toDelete = new ArrayList <>();
250- segmentsToDelete .forEachRemaining (toDelete ::add );
251-
252- LOG .info (
253- "Deleting segments for bucket {} as part of log recovery: {}" ,
254- logSegments .getTableBucket (),
255- toDelete .stream ().map (LogSegment ::toString ).collect (Collectors .joining ("," )));
256- toDelete .forEach (segment -> logSegments .remove (segment .getBaseOffset ()));
257-
258- try {
259- LocalLog .deleteSegmentFiles (
260- toDelete , LocalLog .SegmentDeletionReason .LOG_TRUNCATION );
261- } catch (IOException e ) {
262- LOG .error (
263- "Failed to delete truncated segments {} for bucket {}" ,
264- toDelete ,
265- logSegments .getTableBucket (),
266- e );
267- }
268-
269- try {
270- LogTablet .deleteWriterSnapshots (toDelete , writerStateManager );
271- } catch (IOException e ) {
272- LOG .error (
273- "Failed to delete truncated writer snapshots {} for bucket {}" ,
274- toDelete ,
275- logSegments .getTableBucket (),
276- e );
277- }
278- }
279- }
280-
281179 /** Loads segments from disk into the provided segments. */
282180 private void loadSegmentFiles () throws IOException {
283181 long startTimeMs = System .currentTimeMillis ();
@@ -305,17 +203,25 @@ private void loadSegmentFiles() throws IOException {
305203
306204 try {
307205 segment .sanityCheck (timeIndexFileNewlyCreated );
308- } catch (IOException e ) {
206+ } catch (Exception e ) {
309207 if (e instanceof NoSuchFileException ) {
310208 if (isCleanShutdown
311209 || segment .getBaseOffset () < recoveryPointCheckpoint ) {
312210 LOG .error (
313211 "Could not find offset index file corresponding to log file {} "
314212 + "for bucket {}, recovering segment and rebuilding index files..." ,
315- logSegments . getTableBucket (),
316- segment . getFileLogRecords (). file (). getAbsoluteFile ());
213+ segment . getFileLogRecords (). file (). getAbsoluteFile (),
214+ logSegments . getTableBucket ());
317215 }
318216 recoverSegment (segment );
217+ } else if (e instanceof CorruptIndexException ) {
218+ LOG .warn (
219+ "Found a corrupt index file corresponding to log file {} for bucket {}, recovering "
220+ + "segment and rebuilding index files..." ,
221+ segment .getFileLogRecords ().file ().getAbsoluteFile (),
222+ logSegments .getTableBucket (),
223+ e );
224+ recoverSegment (segment );
319225 } else {
320226 throw e ;
321227 }
0 commit comments