File tree Expand file tree Collapse file tree 3 files changed +11
-7
lines changed
main/java/org/apache/fluss/record
test/java/org/apache/fluss/record Expand file tree Collapse file tree 3 files changed +11
-7
lines changed Original file line number Diff line number Diff line change 2626
2727import java .io .IOException ;
2828
29- import static org .apache .fluss .record .DefaultValueRecordBatch .RECORD_BATCH_HEADER_SIZE ;
3029import static org .apache .fluss .record .LogRecordBatch .CURRENT_LOG_MAGIC_VALUE ;
3130import static org .apache .fluss .record .LogRecordBatchFormat .BASE_OFFSET_LENGTH ;
3231import static org .apache .fluss .record .LogRecordBatchFormat .LENGTH_LENGTH ;
3635import static org .apache .fluss .record .LogRecordBatchFormat .NO_WRITER_ID ;
3736import static org .apache .fluss .record .LogRecordBatchFormat .crcOffset ;
3837import static org .apache .fluss .record .LogRecordBatchFormat .lastOffsetDeltaOffset ;
38+ import static org .apache .fluss .record .LogRecordBatchFormat .recordBatchHeaderSize ;
3939import static org .apache .fluss .record .LogRecordBatchFormat .schemaIdOffset ;
4040import static org .apache .fluss .utils .Preconditions .checkArgument ;
4141
@@ -82,8 +82,9 @@ protected AbstractRowMemoryLogRecordsBuilder(
8282 this .isClosed = false ;
8383
8484 // Skip header initially; will be written in build()
85- this .pagedOutputView .setPosition (RECORD_BATCH_HEADER_SIZE );
86- this .sizeInBytes = RECORD_BATCH_HEADER_SIZE ;
85+ int headerSize = recordBatchHeaderSize (magic );
86+ this .pagedOutputView .setPosition (headerSize );
87+ this .sizeInBytes = headerSize ;
8788 }
8889
8990 protected AbstractRowMemoryLogRecordsBuilder (
Original file line number Diff line number Diff line change 3434import java .nio .ByteBuffer ;
3535import java .util .NoSuchElementException ;
3636
37- import static org .apache .fluss .record .DefaultValueRecordBatch .RECORD_BATCH_HEADER_SIZE ;
3837import static org .apache .fluss .record .LogRecordBatchFormat .BASE_OFFSET_OFFSET ;
3938import static org .apache .fluss .record .LogRecordBatchFormat .COMMIT_TIMESTAMP_OFFSET ;
4039import static org .apache .fluss .record .LogRecordBatchFormat .LENGTH_OFFSET ;
@@ -285,7 +284,7 @@ private CloseableIterator<LogRecord> compactedRowRecordIterator(
285284 RowType rowType , long timestamp ) {
286285 DataType [] fieldTypes = rowType .getChildren ().toArray (new DataType [0 ]);
287286 return new LogRecordIterator () {
288- int position = DefaultLogRecordBatch .this .position + RECORD_BATCH_HEADER_SIZE ;
287+ int position = DefaultLogRecordBatch .this .position + recordBatchHeaderSize ( magic ) ;
289288 int rowId = 0 ;
290289
291290 @ Override
Original file line number Diff line number Diff line change @@ -115,7 +115,10 @@ void testNoRecordAppendAndBaseOffset() throws Exception {
115115 // base offset 0
116116 MemoryLogRecordsCompactedBuilder builder = createBuilder (0 , 1 , 1024 );
117117 MemoryLogRecords records = MemoryLogRecords .pointToBytesView (builder .build ());
118- assertThat (records .sizeInBytes ()).isEqualTo (48 ); // only batch header
118+ assertThat (records .sizeInBytes ())
119+ .isEqualTo (
120+ LogRecordBatchFormat .recordBatchHeaderSize (
121+ DEFAULT_MAGIC )); // only batch header
119122 LogRecordBatch batch = records .batches ().iterator ().next ();
120123 batch .ensureValid ();
121124 assertThat (batch .getRecordCount ()).isEqualTo (0 );
@@ -132,7 +135,8 @@ void testNoRecordAppendAndBaseOffset() throws Exception {
132135 // base offset 100
133136 builder = createBuilder (100 , 1 , 1024 );
134137 records = MemoryLogRecords .pointToBytesView (builder .build ());
135- assertThat (records .sizeInBytes ()).isEqualTo (48 );
138+ assertThat (records .sizeInBytes ())
139+ .isEqualTo (LogRecordBatchFormat .recordBatchHeaderSize (DEFAULT_MAGIC ));
136140 batch = records .batches ().iterator ().next ();
137141 batch .ensureValid ();
138142 assertThat (batch .getRecordCount ()).isEqualTo (0 );
You can’t perform that action at this time.
0 commit comments