4444import java .util .ArrayList ;
4545import java .util .Collections ;
4646import java .util .List ;
47+ import java .util .Set ;
4748import java .util .stream .Collectors ;
4849
4950import static org .apache .fluss .record .TestData .DATA1_ROW_TYPE ;
@@ -64,7 +65,7 @@ final class LogLoaderTest extends LogTestBase {
6465
6566 @ BeforeEach
6667 public void setup () throws Exception {
67- conf .set (ConfigOptions .LOG_SEGMENT_FILE_SIZE , MemorySize .parse ("1kb " ));
68+ conf .set (ConfigOptions .LOG_SEGMENT_FILE_SIZE , MemorySize .parse ("10kb " ));
6869 conf .set (ConfigOptions .LOG_INDEX_INTERVAL_SIZE , MemorySize .parse ("1b" ));
6970
7071 logDir =
@@ -89,14 +90,16 @@ void testCorruptIndexRebuild() throws Exception {
8990 LogTablet logTablet = createLogTablet (true );
9091 appendRecords (logTablet , numRecords );
9192 // collect all the index files
92- List <File > indexFiles = collectIndexFiles (logTablet );
93+ List <File > indexFiles = collectIndexFiles (logTablet . logSegments () );
9394 logTablet .close ();
9495
9596 // corrupt all the index files
9697 for (File indexFile : indexFiles ) {
9798 try (FileChannel fileChannel =
9899 FileChannel .open (indexFile .toPath (), StandardOpenOption .APPEND )) {
99- fileChannel .write (ByteBuffer .wrap (new byte [] {0 }));
100+ for (int i = 0 ; i < 12 ; i ++) {
101+ fileChannel .write (ByteBuffer .wrap (new byte [] {0 }));
102+ }
100103 }
101104 }
102105
@@ -105,9 +108,29 @@ void testCorruptIndexRebuild() throws Exception {
105108 for (LogSegment segment : logTablet .logSegments ()) {
106109 if (segment .getBaseOffset () != logTablet .activeLogSegment ().getBaseOffset ()) {
107110 assertThatThrownBy (segment .offsetIndex ()::sanityCheck )
108- .isInstanceOf (CorruptIndexException .class );
111+ .isInstanceOf (CorruptIndexException .class )
112+ .hasMessage (
113+ String .format (
114+ "Index file %s is corrupt, found %d bytes which is neither positive nor a multiple of %d" ,
115+ segment .offsetIndex ().file ().getAbsolutePath (),
116+ segment .offsetIndex ().length (),
117+ segment .offsetIndex ().entrySize ()));
109118 assertThatThrownBy (segment .timeIndex ()::sanityCheck )
110- .isInstanceOf (CorruptIndexException .class );
119+ .isInstanceOf (CorruptIndexException .class )
120+ .hasMessageContaining (
121+ String .format (
122+ "Corrupt time index found, time index file (%s) has non-zero size but the last timestamp is 0 which is less than the first timestamp" ,
123+ segment .timeIndex ().file ().getAbsolutePath ()));
124+ } else {
125+ // the active segment will be resized, which case no corruption exception when doing
126+ // sanity check
127+ segment .offsetIndex ().sanityCheck ();
128+ assertThatThrownBy (segment .timeIndex ()::sanityCheck )
129+ .isInstanceOf (CorruptIndexException .class )
130+ .hasMessageContaining (
131+ String .format (
132+ "Corrupt time index found, time index file (%s) has non-zero size but the last timestamp is 0 which is less than the first timestamp" ,
133+ segment .timeIndex ().file ().getAbsolutePath ()));
111134 }
112135 }
113136 logTablet .close ();
@@ -126,14 +149,64 @@ void testCorruptIndexRebuild() throws Exception {
126149 logTablet .close ();
127150 }
128151
152+ @ Test
153+ void testCorruptIndexRebuildWithRecoveryPoint () throws Exception {
154+ // publish the records and close the log
155+ int numRecords = 200 ;
156+ LogTablet logTablet = createLogTablet (true );
157+ appendRecords (logTablet , numRecords );
158+ // collect all the index files
159+ long recoveryPoint = logTablet .localLogEndOffset () / 2 ;
160+ List <File > indexFiles = collectIndexFiles (logTablet .logSegments ());
161+ logTablet .close ();
162+
163+ // corrupt all the index files
164+ for (File indexFile : indexFiles ) {
165+ try (FileChannel fileChannel =
166+ FileChannel .open (indexFile .toPath (), StandardOpenOption .APPEND )) {
167+ for (int i = 0 ; i < 12 ; i ++) {
168+ fileChannel .write (ByteBuffer .wrap (new byte [] {0 }));
169+ }
170+ }
171+ }
172+
173+ // test reopen the log with recovery point
174+ logTablet = createLogTablet (false , recoveryPoint );
175+ List <LogSegment > logSegments = logTablet .logSegments (recoveryPoint , Long .MAX_VALUE );
176+ assertThat (logSegments .size () < logTablet .logSegments ().size ()).isTrue ();
177+ Set <Long > recoveredSegments =
178+ logSegments .stream ().map (LogSegment ::getBaseOffset ).collect (Collectors .toSet ());
179+ for (LogSegment segment : logTablet .logSegments ()) {
180+ if (recoveredSegments .contains (segment .getBaseOffset ())) {
181+ segment .offsetIndex ().sanityCheck ();
182+ segment .timeIndex ().sanityCheck ();
183+ } else {
184+ assertThatThrownBy (segment .offsetIndex ()::sanityCheck )
185+ .isInstanceOf (CorruptIndexException .class )
186+ .hasMessage (
187+ String .format (
188+ "Index file %s is corrupt, found %d bytes which is neither positive nor a multiple of %d" ,
189+ segment .offsetIndex ().file ().getAbsolutePath (),
190+ segment .offsetIndex ().length (),
191+ segment .offsetIndex ().entrySize ()));
192+ assertThatThrownBy (segment .timeIndex ()::sanityCheck )
193+ .isInstanceOf (CorruptIndexException .class )
194+ .hasMessageContaining (
195+ String .format (
196+ "Corrupt time index found, time index file (%s) has non-zero size but the last timestamp is 0 which is less than the first timestamp" ,
197+ segment .timeIndex ().file ().getAbsolutePath ()));
198+ }
199+ }
200+ }
201+
129202 @ Test
130203 void testIndexRebuild () throws Exception {
131204 // publish the records and close the log
132205 int numRecords = 200 ;
133206 LogTablet logTablet = createLogTablet (true );
134207 appendRecords (logTablet , numRecords );
135208 // collect all index files
136- List <File > indexFiles = collectIndexFiles (logTablet );
209+ List <File > indexFiles = collectIndexFiles (logTablet . logSegments () );
137210 logTablet .close ();
138211
139212 // delete all the index files
@@ -153,12 +226,17 @@ void testIndexRebuild() throws Exception {
153226 }
154227
155228 private LogTablet createLogTablet (boolean isCleanShutdown ) throws Exception {
229+ return createLogTablet (isCleanShutdown , 0 );
230+ }
231+
232+ private LogTablet createLogTablet (boolean isCleanShutdown , long recoveryPoint )
233+ throws Exception {
156234 return LogTablet .create (
157235 PhysicalTablePath .of (DATA1_TABLE_PATH ),
158236 logDir ,
159237 conf ,
160238 TestingMetricGroups .TABLET_SERVER_METRICS ,
161- 0 ,
239+ recoveryPoint ,
162240 scheduler ,
163241 LogFormat .ARROW ,
164242 1 ,
@@ -195,9 +273,9 @@ private void appendRecords(LogTablet logTablet, int numRecords) throws Exception
195273 }
196274 }
197275
198- private List <File > collectIndexFiles (LogTablet logTablet ) throws IOException {
276+ private List <File > collectIndexFiles (List < LogSegment > logSegments ) throws IOException {
199277 List <File > indexFiles = new ArrayList <>();
200- for (LogSegment segment : logTablet . logSegments () ) {
278+ for (LogSegment segment : logSegments ) {
201279 indexFiles .add (segment .offsetIndex ().file ());
202280 indexFiles .add (segment .timeIndex ().file ());
203281 }
0 commit comments