2121import com .alibaba .fluss .config .Configuration ;
2222import com .alibaba .fluss .exception .OutOfOrderSequenceException ;
2323import com .alibaba .fluss .metadata .TableBucket ;
24+ import com .alibaba .fluss .utils .clock .ManualClock ;
2425import com .alibaba .fluss .utils .types .Tuple2 ;
2526
2627import org .junit .jupiter .api .BeforeEach ;
@@ -124,14 +125,15 @@ void testValidationOnFirstEntryWhenLoadingLog() {
124125 @ Test
125126 void testPrepareUpdateDoesNotMutate () {
126127 WriterAppendInfo appendInfo = stateManager .prepareUpdate (writerId );
127- appendInfo .appendDataBatch (0 , new LogOffsetMetadata (15L ), 20L , System .currentTimeMillis ());
128+ appendInfo .appendDataBatch (
129+ 0 , new LogOffsetMetadata (15L ), 20L , false , System .currentTimeMillis ());
128130 assertThat (stateManager .lastEntry (writerId )).isNotPresent ();
129131 stateManager .update (appendInfo );
130132 assertThat (stateManager .lastEntry (writerId )).isPresent ();
131133
132134 WriterAppendInfo nextAppendInfo = stateManager .prepareUpdate (writerId );
133135 nextAppendInfo .appendDataBatch (
134- 1 , new LogOffsetMetadata (26L ), 30L , System .currentTimeMillis ());
136+ 1 , new LogOffsetMetadata (26L ), 30L , false , System .currentTimeMillis ());
135137 assertThat (stateManager .lastEntry (writerId )).isPresent ();
136138
137139 WriterStateEntry lastEntry = stateManager .lastEntry (writerId ).get ();
@@ -179,8 +181,8 @@ void testFetchSnapshotEmptySnapshot() {
179181
180182 @ Test
181183 void testRemoveExpiredWritersOnReload () throws IOException {
182- append (stateManager , writerId , 0 , 0L , 0 );
183- append (stateManager , writerId , 1 , 1L , 1 );
184+ append (stateManager , writerId , 0 , 0L , false , 0 );
185+ append (stateManager , writerId , 1 , 1L , false , 1 );
184186
185187 stateManager .takeSnapshot ();
186188 WriterStateManager recoveredMapping =
@@ -194,21 +196,49 @@ void testRemoveExpiredWritersOnReload() throws IOException {
194196 // the writer mapping. If writing with the same writerId and non-zero batch sequence, the
195197 // OutOfOrderSequenceException will throw. If you want to continue to write, you need to get
196198 // a new writer id.
197- assertThatThrownBy (() -> append (recoveredMapping , writerId , 2 , 2L , 70001 ))
199+ assertThatThrownBy (() -> append (recoveredMapping , writerId , 2 , 2L , false , 3000L ))
198200 .isInstanceOf (OutOfOrderSequenceException .class )
199201 .hasMessageContaining (
200202 "Out of order batch sequence for writer 1 at offset 2 in "
201203 + "table-bucket TableBucket{tableId=1001, bucket=0}"
202204 + " : 2 (incoming batch seq.), -1 (current batch seq.)" );
203205
204- append (recoveredMapping , 2L , 0 , 2L , 70002 );
206+ append (recoveredMapping , 2L , 0 , 2L , false , 70002 );
205207
206208 assertThat (recoveredMapping .activeWriters ().size ()).isEqualTo (1 );
207209 assertThat (recoveredMapping .activeWriters ().values ().iterator ().next ().lastBatchSequence ())
208210 .isEqualTo (0 );
209211 assertThat (recoveredMapping .mapEndOffset ()).isEqualTo (3L );
210212 }
211213
214+ @ Test
215+ void testAppendAnExpiredBatchWithEmptyWriterStatus () throws Exception {
216+ ManualClock clock = new ManualClock (5000L );
217+
218+ // 2 seconds to expire the writer.
219+ conf .set (ConfigOptions .WRITER_ID_EXPIRATION_TIME , Duration .ofSeconds (2 ));
220+ WriterStateManager stateManager1 =
221+ new WriterStateManager (
222+ tableBucket ,
223+ logDir ,
224+ (int ) conf .get (ConfigOptions .WRITER_ID_EXPIRATION_TIME ).toMillis ());
225+
226+ // If we try to append an expired batch with none zero batch sequence, the
227+ // OutOfOrderSequenceException will not been throw.
228+ append (stateManager1 , 1L , 10 , 10L , true , clock .milliseconds ());
229+ assertThat (stateManager1 .activeWriters ().size ()).isEqualTo (1 );
230+ assertThat (stateManager1 .activeWriters ().values ().iterator ().next ().lastBatchSequence ())
231+ .isEqualTo (10 );
232+
233+ // If we try to append a none-expired batch with none zero batch sequence, the
234+ // OutOfOrderSequenceException will throw.
235+ assertThatThrownBy (() -> append (stateManager1 , 2L , 10 , 10L , false , 1000L ))
236+ .isInstanceOf (OutOfOrderSequenceException .class )
237+ .hasMessageContaining (
238+ "Out of order batch sequence for writer 2 at offset 10 in table-bucket "
239+ + "TableBucket{tableId=1001, bucket=0} : 10 (incoming batch seq.), -1 (current batch seq.)" );
240+ }
241+
212242 @ Test
213243 void testDeleteSnapshotsBefore () throws IOException {
214244 append (stateManager , writerId , 0 , 0L );
@@ -322,7 +352,7 @@ void testLoadFromSnapshotRetainsNonExpiredWriters() throws IOException {
322352
323353 @ Test
324354 void testSkipSnapshotIfOffsetUnchanged () throws IOException {
325- append (stateManager , writerId , 0 , 0L , 0L );
355+ append (stateManager , writerId , 0 , 0L , false , 0L );
326356
327357 stateManager .takeSnapshot ();
328358 assertThat (Objects .requireNonNull (logDir .listFiles ()).length ).isEqualTo (1 );
@@ -475,17 +505,23 @@ private void testLoadFromCorruptSnapshot(Consumer<FileChannel> makeFileCorrupt)
475505
476506 private void append (
477507 WriterStateManager stateManager , long writerId , int batchSequence , long offset ) {
478- append (stateManager , writerId , batchSequence , offset , System .currentTimeMillis ());
508+ append (stateManager , writerId , batchSequence , offset , false , System .currentTimeMillis ());
479509 }
480510
481511 private void append (
482512 WriterStateManager stateManager ,
483513 long writerId ,
484514 int batchSequence ,
485515 long offset ,
486- long timestamp ) {
516+ boolean isWriterInBatchExpired ,
517+ long lastTimestamp ) {
487518 WriterAppendInfo appendInfo = stateManager .prepareUpdate (writerId );
488- appendInfo .appendDataBatch (batchSequence , new LogOffsetMetadata (offset ), offset , timestamp );
519+ appendInfo .appendDataBatch (
520+ batchSequence ,
521+ new LogOffsetMetadata (offset ),
522+ offset ,
523+ isWriterInBatchExpired ,
524+ lastTimestamp );
489525 stateManager .update (appendInfo );
490526 stateManager .updateMapEndOffset (offset + 1 );
491527 }
0 commit comments