@@ -266,20 +266,35 @@ private Tuple2<List<LogRecord>, List<LogRecord>> genPrimaryKeyTableRecords(
266266 toRecord (++offset , rows .get (0 ), INSERT ),
267267 toRecord (++offset , rows .get (1 ), UPDATE_BEFORE ),
268268 toRecord (++offset , rows .get (2 ), UPDATE_AFTER )));
269- expectLogRecords .add (toRecord (offset , rows .get (2 ), UPDATE_AFTER ));
269+ expectLogRecords .add (
270+ toRecord (
271+ offset ,
272+ writtenLogRecords .get (writtenLogRecords .size () - 1 ).timestamp (),
273+ rows .get (2 ),
274+ UPDATE_AFTER ));
270275
271276 // gen +I, +U
272277 rows = genKvRow (partition , bucket , 2 , 7 , 9 );
273278 writtenLogRecords .addAll (
274279 Arrays .asList (
275280 toRecord (++offset , rows .get (0 ), INSERT ),
276281 toRecord (++offset , rows .get (1 ), UPDATE_AFTER )));
277- expectLogRecords .add (toRecord (offset , rows .get (1 ), UPDATE_AFTER ));
282+ expectLogRecords .add (
283+ toRecord (
284+ offset ,
285+ writtenLogRecords .get (writtenLogRecords .size () - 1 ).timestamp (),
286+ rows .get (1 ),
287+ UPDATE_AFTER ));
278288
279289 // gen +I
280290 rows = genKvRow (partition , bucket , 3 , 9 , 10 );
281291 writtenLogRecords .add (toRecord (++offset , rows .get (0 ), INSERT ));
282- expectLogRecords .add (toRecord (offset , rows .get (0 ), INSERT ));
292+ expectLogRecords .add (
293+ toRecord (
294+ offset ,
295+ writtenLogRecords .get (writtenLogRecords .size () - 1 ).timestamp (),
296+ rows .get (0 ),
297+ INSERT ));
283298
284299 return Tuple2 .of (writtenLogRecords , expectLogRecords );
285300 }
@@ -302,7 +317,12 @@ private List<GenericRow> genKvRow(
302317 }
303318
304319 private GenericRecord toRecord (long offset , GenericRow row , ChangeType changeType ) {
305- return new GenericRecord (offset , System .currentTimeMillis (), changeType , row );
320+ return toRecord (offset , System .currentTimeMillis (), row , changeType );
321+ }
322+
323+ private GenericRecord toRecord (
324+ long offset , long timestamp , GenericRow row , ChangeType changeType ) {
325+ return new GenericRecord (offset , timestamp , changeType , row );
306326 }
307327
308328 private CloseableIterator <InternalRow > getPaimonRows (
0 commit comments