@@ -122,8 +122,8 @@ void testCorruptIndexRebuild() throws Exception {
122122 "Corrupt time index found, time index file (%s) has non-zero size but the last timestamp is 0 which is less than the first timestamp" ,
123123 segment .timeIndex ().file ().getAbsolutePath ()));
124124 } else {
125- // the active segment will be resized, which case no corruption exception when doing
126- // sanity check
125+ // the offset index file of active segment will be resized, which case no corruption
126+ // exception when doing sanity check
127127 segment .offsetIndex ().sanityCheck ();
128128 assertThatThrownBy (segment .timeIndex ()::sanityCheck )
129129 .isInstanceOf (CorruptIndexException .class )
@@ -181,6 +181,8 @@ void testCorruptIndexRebuildWithRecoveryPoint() throws Exception {
181181 segment .offsetIndex ().sanityCheck ();
182182 segment .timeIndex ().sanityCheck ();
183183 } else {
184+ // the segments before recovery point will not be recovered, so sanity check should
185+ // still throw corrupt exception
184186 assertThatThrownBy (segment .offsetIndex ()::sanityCheck )
185187 .isInstanceOf (CorruptIndexException .class )
186188 .hasMessage (
@@ -225,6 +227,45 @@ void testIndexRebuild() throws Exception {
225227 logTablet .close ();
226228 }
227229
230+ @ Test
231+ void testInvalidOffsetRebuild () throws Exception {
232+ // publish the records and close the log
233+ int numRecords = 200 ;
234+ LogTablet logTablet = createLogTablet (true );
235+ appendRecords (logTablet , numRecords );
236+
237+ List <LogSegment > logSegments = logTablet .logSegments ();
238+ int corruptSegmentIndex = logSegments .size () / 2 ;
239+ assertThat (corruptSegmentIndex < logSegments .size ()).isTrue ();
240+ LogSegment corruptSegment = logSegments .get (corruptSegmentIndex );
241+
242+ // append an invalid offset batch
243+ List <Object []> objects = Collections .singletonList (new Object [] {1 , "a" });
244+ List <ChangeType > changeTypes =
245+ objects .stream ().map (row -> ChangeType .APPEND_ONLY ).collect (Collectors .toList ());
246+ MemoryLogRecords memoryLogRecords =
247+ createBasicMemoryLogRecords (
248+ DATA1_ROW_TYPE ,
249+ DEFAULT_SCHEMA_ID ,
250+ corruptSegment .getBaseOffset (),
251+ clock .milliseconds (),
252+ magic ,
253+ System .currentTimeMillis (),
254+ 0 ,
255+ changeTypes ,
256+ objects ,
257+ LogFormat .ARROW ,
258+ ArrowCompressionInfo .DEFAULT_COMPRESSION );
259+ corruptSegment .getFileLogRecords ().append (memoryLogRecords );
260+ logTablet .close ();
261+
262+ logTablet = createLogTablet (false );
263+ // the corrupt segment should be truncated to base offset
264+ assertThat (logTablet .localLogEndOffset ()).isEqualTo (corruptSegment .getBaseOffset ());
265+ // segments after the corrupt segment should be removed
266+ assertThat (logTablet .logSegments ().size ()).isEqualTo (corruptSegmentIndex + 1 );
267+ }
268+
228269 private LogTablet createLogTablet (boolean isCleanShutdown ) throws Exception {
229270 return createLogTablet (isCleanShutdown , 0 );
230271 }
0 commit comments