1919
2020import org .apache .fluss .config .ConfigOptions ;
2121import org .apache .fluss .config .Configuration ;
22- import org .apache .fluss .exception .InvalidOffsetException ;
2322import org .apache .fluss .exception .LogSegmentOffsetOverflowException ;
2423import org .apache .fluss .exception .LogStorageException ;
2524import org .apache .fluss .metadata .LogFormat ;
3231import java .io .File ;
3332import java .io .IOException ;
3433import java .nio .file .Files ;
35- import java .nio .file .NoSuchFileException ;
36- import java .util .ArrayList ;
3734import java .util .Arrays ;
3835import java .util .Comparator ;
39- import java .util .Iterator ;
40- import java .util .List ;
41- import java .util .stream .Collectors ;
4236
4337/* This file is based on source code of Apache Kafka Project (https://kafka.apache.org/), licensed by the Apache
4438 * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
@@ -123,37 +117,6 @@ public LoadedLogOffsets load() throws IOException {
123117 nextOffset , activeSegment .getBaseOffset (), activeSegment .getSizeInBytes ()));
124118 }
125119
126- /**
127- * Just recovers the given segment, without adding it to the provided segments.
128- *
129- * @param segment Segment to recover
130- * @return The number of bytes truncated from the segment
131- * @throws LogSegmentOffsetOverflowException if the segment contains messages that cause index
132- * offset overflow
133- */
134- private int recoverSegment (LogSegment segment ) throws IOException {
135- WriterStateManager writerStateManager =
136- new WriterStateManager (
137- logSegments .getTableBucket (),
138- logTabletDir ,
139- this .writerStateManager .writerExpirationMs ());
140- // TODO, Here, we use 0 as the logStartOffset passed into rebuildWriterState. The reason is
141- // that the current implementation of logStartOffset in Fluss is not yet fully refined, and
142- // there may be cases where logStartOffset is not updated. As a result, logStartOffset is
143- // not yet reliable. Once the issue with correctly updating logStartOffset is resolved in
144- // issue https://github.com/apache/fluss/issues/744, we can use logStartOffset here.
145- // Additionally, using 0 versus using logStartOffset does not affect correctness—they both
146- // can restore the complete WriterState. The only difference is that using logStartOffset
147- // can potentially skip over more segments.
148- LogTablet .rebuildWriterState (
149- writerStateManager , logSegments , 0 , segment .getBaseOffset (), false );
150- int bytesTruncated = segment .recover ();
151- // once we have recovered the segment's data, take a snapshot to ensure that we won't
152- // need to reload the same segment again while recovering another segment.
153- writerStateManager .takeSnapshot ();
154- return bytesTruncated ;
155- }
156-
157120 /**
158121 * Recover the log segments (if there was an unclean shutdown). Ensures there is at least one
159122 * active segment, and returns the updated recovery point and next offset after recovery.
@@ -166,106 +129,14 @@ private int recoverSegment(LogSegment segment) throws IOException {
166129 * overflow
167130 */
168131 private Tuple2 <Long , Long > recoverLog () throws IOException {
169- if (!isCleanShutdown ) {
170- List <LogSegment > unflushed =
171- logSegments .values (recoveryPointCheckpoint , Long .MAX_VALUE );
172- int numUnflushed = unflushed .size ();
173- Iterator <LogSegment > unflushedIter = unflushed .iterator ();
174- boolean truncated = false ;
175- int numFlushed = 1 ;
176-
177- while (unflushedIter .hasNext () && !truncated ) {
178- LogSegment segment = unflushedIter .next ();
179- LOG .info (
180- "Recovering unflushed segment {}. {}/{} recovered for bucket {}" ,
181- segment .getBaseOffset (),
182- numFlushed ,
183- numUnflushed ,
184- logSegments .getTableBucket ());
185-
186- int truncatedBytes = -1 ;
187- try {
188- truncatedBytes = recoverSegment (segment );
189- } catch (Exception e ) {
190- if (e instanceof InvalidOffsetException ) {
191- long startOffset = segment .getBaseOffset ();
192- LOG .warn (
193- "Found invalid offset during recovery for bucket {}. Deleting the corrupt segment "
194- + "and creating an empty one with starting offset {}" ,
195- logSegments .getTableBucket (),
196- startOffset );
197- truncatedBytes = segment .truncateTo (startOffset );
198- } else {
199- throw e ;
200- }
201- }
202-
203- if (truncatedBytes > 0 ) {
204- // we had an invalid message, delete all remaining log
205- LOG .warn (
206- "Corruption found in segment {} for bucket {}, truncating to offset {}" ,
207- segment .getBaseOffset (),
208- logSegments .getTableBucket (),
209- segment .readNextOffset ());
210- removeAndDeleteSegments (unflushedIter );
211- truncated = true ;
212- } else {
213- numFlushed += 1 ;
214- }
215- }
216- }
217-
132+ // TODO truncate log to recover maybe unflush segments.
218133 if (logSegments .isEmpty ()) {
219- // TODO: use logStartOffset if issue https://github.com/apache/fluss/issues/744 ready
220134 logSegments .add (LogSegment .open (logTabletDir , 0L , conf , logFormat ));
221135 }
222136 long logEndOffset = logSegments .lastSegment ().get ().readNextOffset ();
223137 return Tuple2 .of (recoveryPointCheckpoint , logEndOffset );
224138 }
225139
226- /**
227- * This method deletes the given log segments and the associated writer snapshots.
228- *
229- * <p>This method does not need to convert IOException to {@link LogStorageException} because it
230- * is either called before all logs are loaded or the immediate caller will catch and handle
231- * IOException
232- *
233- * @param segmentsToDelete The log segments to schedule for deletion
234- */
235- private void removeAndDeleteSegments (Iterator <LogSegment > segmentsToDelete ) {
236- if (segmentsToDelete .hasNext ()) {
237- List <LogSegment > toDelete = new ArrayList <>();
238- segmentsToDelete .forEachRemaining (toDelete ::add );
239-
240- LOG .info (
241- "Deleting segments for bucket {} as part of log recovery: {}" ,
242- logSegments .getTableBucket (),
243- toDelete .stream ().map (LogSegment ::toString ).collect (Collectors .joining ("," )));
244- toDelete .forEach (segment -> logSegments .remove (segment .getBaseOffset ()));
245-
246- try {
247- LocalLog .deleteSegmentFiles (
248- toDelete , LocalLog .SegmentDeletionReason .LOG_TRUNCATION );
249- } catch (IOException e ) {
250- LOG .error (
251- "Failed to delete truncated segments {} for bucket {}" ,
252- toDelete ,
253- logSegments .getTableBucket (),
254- e );
255- }
256-
257- try {
258- LogTablet .deleteWriterSnapshots (toDelete , writerStateManager );
259- } catch (IOException e ) {
260- LOG .error (
261- "Failed to delete truncated writer snapshots {} for bucket {}" ,
262- toDelete ,
263- logSegments .getTableBucket (),
264- e );
265- }
266- }
267- }
268-
269140 /** Loads segments from disk into the provided segments. */
270141 private void loadSegmentFiles () throws IOException {
271142 File [] sortedFiles = logTabletDir .listFiles ();
@@ -285,28 +156,8 @@ private void loadSegmentFiles() throws IOException {
285156 }
286157 } else if (LocalLog .isLogFile (file )) {
287158 long baseOffset = FlussPaths .offsetFromFile (file );
288- boolean timeIndexFileNewlyCreated =
289- !FlussPaths .timeIndexFile (logTabletDir , baseOffset ).exists ();
290159 LogSegment segment =
291160 LogSegment .open (logTabletDir , baseOffset , conf , true , 0 , logFormat );
292-
293- try {
294- segment .sanityCheck (timeIndexFileNewlyCreated );
295- } catch (IOException e ) {
296- if (e instanceof NoSuchFileException ) {
297- if (isCleanShutdown
298- || segment .getBaseOffset () < recoveryPointCheckpoint ) {
299- LOG .error (
300- "Could not find offset index file corresponding to log file {} "
301- + "for bucket {}, recovering segment and rebuilding index files..." ,
302- logSegments .getTableBucket (),
303- segment .getFileLogRecords ().file ().getAbsoluteFile ());
304- }
305- recoverSegment (segment );
306- } else {
307- throw e ;
308- }
309- }
310161 logSegments .add (segment );
311162 }
312163 }
0 commit comments