diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/AppendWriterImpl.java b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/AppendWriterImpl.java index d702e9621e..b3a3f8131b 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/AppendWriterImpl.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/AppendWriterImpl.java @@ -26,9 +26,12 @@ import org.apache.fluss.metadata.TablePath; import org.apache.fluss.row.InternalRow; import org.apache.fluss.row.InternalRow.FieldGetter; +import org.apache.fluss.row.compacted.CompactedRow; +import org.apache.fluss.row.encode.CompactedRowEncoder; import org.apache.fluss.row.encode.IndexedRowEncoder; import org.apache.fluss.row.encode.KeyEncoder; import org.apache.fluss.row.indexed.IndexedRow; +import org.apache.fluss.types.DataType; import org.apache.fluss.types.RowType; import javax.annotation.Nullable; @@ -44,6 +47,7 @@ class AppendWriterImpl extends AbstractTableWriter implements AppendWriter { private final LogFormat logFormat; private final IndexedRowEncoder indexedRowEncoder; + private final CompactedRowEncoder compactedRowEncoder; private final FieldGetter[] fieldGetters; private final TableInfo tableInfo; @@ -58,8 +62,12 @@ class AppendWriterImpl extends AbstractTableWriter implements AppendWriter { this.bucketKeyEncoder = KeyEncoder.of(rowType, bucketKeys, lakeFormat); } + DataType[] fieldDataTypes = + tableInfo.getSchema().getRowType().getChildren().toArray(new DataType[0]); + this.logFormat = tableInfo.getTableConfig().getLogFormat(); this.indexedRowEncoder = new IndexedRowEncoder(tableInfo.getRowType()); + this.compactedRowEncoder = new CompactedRowEncoder(fieldDataTypes); this.fieldGetters = InternalRow.createFieldGetters(tableInfo.getRowType()); this.tableInfo = tableInfo; } @@ -80,6 +88,11 @@ public CompletableFuture append(InternalRow row) { if (logFormat == LogFormat.INDEXED) { IndexedRow indexedRow = encodeIndexedRow(row); record = WriteRecord.forIndexedAppend(tableInfo, physicalPath, indexedRow, bucketKey); + } else if (logFormat == LogFormat.COMPACTED) { + CompactedRow compactedRow = encodeCompactedRow(row); + record = + WriteRecord.forCompactedAppend( + tableInfo, physicalPath, compactedRow, bucketKey); } else { // ARROW format supports general internal row record = WriteRecord.forArrowAppend(tableInfo, physicalPath, row, bucketKey); @@ -87,6 +100,18 @@ record = WriteRecord.forArrowAppend(tableInfo, physicalPath, row, bucketKey); return send(record).thenApply(ignored -> APPEND_SUCCESS); } + private CompactedRow encodeCompactedRow(InternalRow row) { + if (row instanceof CompactedRow) { + return (CompactedRow) row; + } + + compactedRowEncoder.startNewRow(); + for (int i = 0; i < fieldCount; i++) { + compactedRowEncoder.encodeField(i, fieldGetters[i].getFieldOrNull(row)); + } + return compactedRowEncoder.finishRow(); + } + private IndexedRow encodeIndexedRow(InternalRow row) { if (row instanceof IndexedRow) { return (IndexedRow) row; diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java index 39f65592c1..20e5bcc5e5 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java @@ -17,6 +17,7 @@ package org.apache.fluss.client.table.writer; +import org.apache.fluss.client.write.WriteFormat; import org.apache.fluss.client.write.WriteRecord; import org.apache.fluss.client.write.WriterClient; import org.apache.fluss.metadata.DataLakeFormat; @@ -50,6 +51,7 @@ class UpsertWriterImpl extends AbstractTableWriter implements UpsertWriter { private final KeyEncoder bucketKeyEncoder; private final KvFormat kvFormat; + private final WriteFormat writeFormat; private final RowEncoder rowEncoder; private final FieldGetter[] fieldGetters; private final TableInfo tableInfo; @@ -78,6 +80,7 @@ class UpsertWriterImpl extends AbstractTableWriter implements UpsertWriter { : KeyEncoder.of(rowType, tableInfo.getBucketKeys(), lakeFormat); this.kvFormat = tableInfo.getTableConfig().getKvFormat(); + this.writeFormat = WriteFormat.fromKvFormat(this.kvFormat); this.rowEncoder = RowEncoder.create(kvFormat, rowType); this.fieldGetters = InternalRow.createFieldGetters(rowType); this.tableInfo = tableInfo; @@ -164,6 +167,7 @@ public CompletableFuture upsert(InternalRow row) { encodeRow(row), key, bucketKey, + writeFormat, targetColumns); return send(record).thenApply(ignored -> UPSERT_SUCCESS); } @@ -182,7 +186,12 @@ public CompletableFuture delete(InternalRow row) { bucketKeyEncoder == primaryKeyEncoder ? key : bucketKeyEncoder.encodeKey(row); WriteRecord record = WriteRecord.forDelete( - tableInfo, getPhysicalPath(row), key, bucketKey, targetColumns); + tableInfo, + getPhysicalPath(row), + key, + bucketKey, + writeFormat, + targetColumns); return send(record).thenApply(ignored -> DELETE_SUCCESS); } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/write/AbstractRowLogWriteBatch.java b/fluss-client/src/main/java/org/apache/fluss/client/write/AbstractRowLogWriteBatch.java new file mode 100644 index 0000000000..3c61cce82d --- /dev/null +++ b/fluss-client/src/main/java/org/apache/fluss/client/write/AbstractRowLogWriteBatch.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.write; + +import org.apache.fluss.exception.FlussRuntimeException; +import org.apache.fluss.memory.AbstractPagedOutputView; +import org.apache.fluss.memory.MemorySegment; +import org.apache.fluss.metadata.PhysicalTablePath; +import org.apache.fluss.record.ChangeType; +import org.apache.fluss.record.MemoryLogRecordsRowBuilder; +import org.apache.fluss.record.bytesview.BytesView; +import org.apache.fluss.row.InternalRow; + +import java.io.IOException; +import java.util.List; + +import static org.apache.fluss.utils.Preconditions.checkArgument; +import static org.apache.fluss.utils.Preconditions.checkNotNull; + +/** + * Abstract base class to deduplicate logic for row-based log write batches backed by in-memory + * builders. Concrete subclasses only need to provide a row-type validator/caster and a + * RecordsBuilderAdapter implementation. + */ +abstract class AbstractRowLogWriteBatch extends WriteBatch { + + private final AbstractPagedOutputView outputView; + private final MemoryLogRecordsRowBuilder recordsBuilder; + private final String buildErrorMessage; + + protected AbstractRowLogWriteBatch( + int bucketId, + PhysicalTablePath physicalTablePath, + long createdMs, + AbstractPagedOutputView outputView, + MemoryLogRecordsRowBuilder recordsBuilder, + String buildErrorMessage) { + super(bucketId, physicalTablePath, createdMs); + this.outputView = outputView; + this.recordsBuilder = recordsBuilder; + this.buildErrorMessage = buildErrorMessage; + } + + @Override + public boolean tryAppend(WriteRecord writeRecord, WriteCallback callback) throws Exception { + checkNotNull(callback, "write callback must be not null"); + InternalRow rowObj = writeRecord.getRow(); + checkNotNull(rowObj, "row must not be null for log record"); + checkArgument(writeRecord.getKey() == null, "key must be null for log record"); + checkArgument( + writeRecord.getTargetColumns() == null, + "target columns must be null for log record"); + + R row = requireAndCastRow(rowObj); + if (!recordsBuilder.hasRoomFor(row) || isClosed()) { + return false; + } + recordsBuilder.append(ChangeType.APPEND_ONLY, row); + recordCount++; + callbacks.add(callback); + return true; + } + + protected abstract R requireAndCastRow(InternalRow row); + + @Override + public boolean isLogBatch() { + return true; + } + + @Override + public BytesView build() { + try { + return recordsBuilder.build(); + } catch (IOException e) { + throw new FlussRuntimeException(buildErrorMessage, e); + } + } + + @Override + public boolean isClosed() { + return recordsBuilder.isClosed(); + } + + @Override + public void close() throws Exception { + recordsBuilder.close(); + reopened = false; + } + + @Override + public List pooledMemorySegments() { + return outputView.allocatedPooledSegments(); + } + + @Override + public void setWriterState(long writerId, int batchSequence) { + recordsBuilder.setWriterState(writerId, batchSequence); + } + + @Override + public long writerId() { + return recordsBuilder.writerId(); + } + + @Override + public int batchSequence() { + return recordsBuilder.batchSequence(); + } + + @Override + public void abortRecordAppends() { + recordsBuilder.abort(); + } + + @Override + public void resetWriterState(long writerId, int batchSequence) { + super.resetWriterState(writerId, batchSequence); + recordsBuilder.resetWriterState(writerId, batchSequence); + } + + @Override + public int estimatedSizeInBytes() { + return recordsBuilder.getSizeInBytes(); + } +} diff --git a/fluss-client/src/main/java/org/apache/fluss/client/write/CompactedLogWriteBatch.java b/fluss-client/src/main/java/org/apache/fluss/client/write/CompactedLogWriteBatch.java new file mode 100644 index 0000000000..19366a4dc7 --- /dev/null +++ b/fluss-client/src/main/java/org/apache/fluss/client/write/CompactedLogWriteBatch.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.write; + +import org.apache.fluss.annotation.Internal; +import org.apache.fluss.memory.AbstractPagedOutputView; +import org.apache.fluss.metadata.PhysicalTablePath; +import org.apache.fluss.record.MemoryLogRecordsCompactedBuilder; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.row.compacted.CompactedRow; +import org.apache.fluss.rpc.messages.ProduceLogRequest; + +import javax.annotation.concurrent.NotThreadSafe; + +import static org.apache.fluss.utils.Preconditions.checkArgument; + +/** + * A batch of log records managed in COMPACTED format that is or will be sent to server by {@link + * ProduceLogRequest}. + * + *

This class is not thread safe and external synchronization must be used when modifying it. + */ +@NotThreadSafe +@Internal +public final class CompactedLogWriteBatch extends AbstractRowLogWriteBatch { + + public CompactedLogWriteBatch( + int bucketId, + PhysicalTablePath physicalTablePath, + int schemaId, + int writeLimit, + AbstractPagedOutputView outputView, + long createdMs) { + super( + bucketId, + physicalTablePath, + createdMs, + outputView, + MemoryLogRecordsCompactedBuilder.builder(schemaId, writeLimit, outputView, true), + "Failed to build compacted log record batch."); + } + + @Override + protected CompactedRow requireAndCastRow(InternalRow row) { + checkArgument( + row instanceof CompactedRow, "row must be CompactedRow for compacted log table"); + return (CompactedRow) row; + } +} diff --git a/fluss-client/src/main/java/org/apache/fluss/client/write/IndexedLogWriteBatch.java b/fluss-client/src/main/java/org/apache/fluss/client/write/IndexedLogWriteBatch.java index fb96556588..c70dd83a8a 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/write/IndexedLogWriteBatch.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/write/IndexedLogWriteBatch.java @@ -18,23 +18,15 @@ package org.apache.fluss.client.write; import org.apache.fluss.annotation.Internal; -import org.apache.fluss.exception.FlussRuntimeException; import org.apache.fluss.memory.AbstractPagedOutputView; -import org.apache.fluss.memory.MemorySegment; import org.apache.fluss.metadata.PhysicalTablePath; -import org.apache.fluss.record.ChangeType; import org.apache.fluss.record.MemoryLogRecordsIndexedBuilder; -import org.apache.fluss.record.bytesview.BytesView; import org.apache.fluss.row.indexed.IndexedRow; import org.apache.fluss.rpc.messages.ProduceLogRequest; import javax.annotation.concurrent.NotThreadSafe; -import java.io.IOException; -import java.util.List; - import static org.apache.fluss.utils.Preconditions.checkArgument; -import static org.apache.fluss.utils.Preconditions.checkNotNull; /** * A batch of log records managed in INDEXED format that is or will be sent to server by {@link @@ -44,9 +36,7 @@ */ @NotThreadSafe @Internal -public final class IndexedLogWriteBatch extends WriteBatch { - private final AbstractPagedOutputView outputView; - private final MemoryLogRecordsIndexedBuilder recordsBuilder; +public final class IndexedLogWriteBatch extends AbstractRowLogWriteBatch { public IndexedLogWriteBatch( int bucketId, @@ -55,91 +45,18 @@ public IndexedLogWriteBatch( int writeLimit, AbstractPagedOutputView outputView, long createdMs) { - super(bucketId, physicalTablePath, createdMs); - this.outputView = outputView; - this.recordsBuilder = - MemoryLogRecordsIndexedBuilder.builder(schemaId, writeLimit, outputView, true); - } - - @Override - public boolean isLogBatch() { - return true; - } - - @Override - public boolean tryAppend(WriteRecord writeRecord, WriteCallback callback) throws Exception { - checkNotNull(callback, "write callback must be not null"); - checkNotNull(writeRecord.getRow(), "row must not be null for log record"); - checkArgument(writeRecord.getKey() == null, "key must be null for log record"); - checkArgument( - writeRecord.getTargetColumns() == null, - "target columns must be null for log record"); - checkArgument( - writeRecord.getRow() instanceof IndexedRow, - "row must not be IndexRow for indexed log table"); - IndexedRow row = (IndexedRow) writeRecord.getRow(); - if (!recordsBuilder.hasRoomFor(row) || isClosed()) { - return false; - } else { - recordsBuilder.append(ChangeType.APPEND_ONLY, row); - recordCount++; - callbacks.add(callback); - return true; - } - } - - @Override - public BytesView build() { - try { - return recordsBuilder.build(); - } catch (IOException e) { - throw new FlussRuntimeException("Failed to build indexed log record batch.", e); - } - } - - @Override - public boolean isClosed() { - return recordsBuilder.isClosed(); - } - - @Override - public void close() throws Exception { - recordsBuilder.close(); - reopened = false; - } - - @Override - public List pooledMemorySegments() { - return outputView.allocatedPooledSegments(); - } - - @Override - public void setWriterState(long writerId, int batchSequence) { - recordsBuilder.setWriterState(writerId, batchSequence); - } - - @Override - public long writerId() { - return recordsBuilder.writerId(); - } - - @Override - public int batchSequence() { - return recordsBuilder.batchSequence(); - } - - @Override - public void abortRecordAppends() { - recordsBuilder.abort(); - } - - public void resetWriterState(long writerId, int batchSequence) { - super.resetWriterState(writerId, batchSequence); - recordsBuilder.resetWriterState(writerId, batchSequence); + super( + bucketId, + physicalTablePath, + createdMs, + outputView, + MemoryLogRecordsIndexedBuilder.builder(schemaId, writeLimit, outputView, true), + "Failed to build indexed log record batch."); } @Override - public int estimatedSizeInBytes() { - return recordsBuilder.getSizeInBytes(); + protected IndexedRow requireAndCastRow(org.apache.fluss.row.InternalRow row) { + checkArgument(row instanceof IndexedRow, "row must be IndexRow for indexed log table"); + return (IndexedRow) row; } } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java b/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java index 3a282ba926..0c834a49f1 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java @@ -581,45 +581,15 @@ private RecordAppendResult appendNewBatch( PreAllocatedPagedOutputView outputView = new PreAllocatedPagedOutputView(segments); int schemaId = tableInfo.getSchemaId(); WriteFormat writeFormat = writeRecord.getWriteFormat(); - // If the table is kv table we need to create a kv batch, otherwise we create a log batch. - final WriteBatch batch; - if (writeFormat == WriteFormat.KV) { - batch = - new KvWriteBatch( - bucketId, - physicalTablePath, - tableInfo.getSchemaId(), - tableInfo.getTableConfig().getKvFormat(), - outputView.getPreAllocatedSize(), - outputView, - writeRecord.getTargetColumns(), - clock.milliseconds()); - } else if (writeFormat == WriteFormat.ARROW_LOG) { - ArrowWriter arrowWriter = - arrowWriterPool.getOrCreateWriter( - tableInfo.getTableId(), - schemaId, - outputView.getPreAllocatedSize(), - tableInfo.getRowType(), - tableInfo.getTableConfig().getArrowCompressionInfo()); - batch = - new ArrowLogWriteBatch( - bucketId, - physicalTablePath, - tableInfo.getSchemaId(), - arrowWriter, - outputView, - clock.milliseconds()); - } else { - batch = - new IndexedLogWriteBatch( - bucketId, - physicalTablePath, - tableInfo.getSchemaId(), - outputView.getPreAllocatedSize(), - outputView, - clock.milliseconds()); - } + final WriteBatch batch = + createWriteBatch( + writeRecord, + bucketId, + tableInfo, + writeFormat, + physicalTablePath, + outputView, + schemaId); batch.tryAppend(writeRecord, callback); deque.addLast(batch); @@ -627,6 +597,67 @@ private RecordAppendResult appendNewBatch( return new RecordAppendResult(deque.size() > 1 || batch.isClosed(), true, false); } + private WriteBatch createWriteBatch( + WriteRecord writeRecord, + int bucketId, + TableInfo tableInfo, + WriteFormat writeFormat, + PhysicalTablePath physicalTablePath, + PreAllocatedPagedOutputView outputView, + int schemaId) { + // If the table is kv table we need to create a kv batch, otherwise we create a log batch. + switch (writeFormat) { + case COMPACTED_KV: + case INDEXED_KV: + return new KvWriteBatch( + bucketId, + physicalTablePath, + tableInfo.getSchemaId(), + writeFormat.toKvFormat(), + outputView.getPreAllocatedSize(), + outputView, + writeRecord.getTargetColumns(), + clock.milliseconds()); + + case ARROW_LOG: + ArrowWriter arrowWriter = + arrowWriterPool.getOrCreateWriter( + tableInfo.getTableId(), + schemaId, + outputView.getPreAllocatedSize(), + tableInfo.getRowType(), + tableInfo.getTableConfig().getArrowCompressionInfo()); + return new ArrowLogWriteBatch( + bucketId, + physicalTablePath, + tableInfo.getSchemaId(), + arrowWriter, + outputView, + clock.milliseconds()); + + case COMPACTED_LOG: + return new CompactedLogWriteBatch( + bucketId, + physicalTablePath, + schemaId, + outputView.getPreAllocatedSize(), + outputView, + clock.milliseconds()); + + case INDEXED_LOG: + return new IndexedLogWriteBatch( + bucketId, + physicalTablePath, + tableInfo.getSchemaId(), + outputView.getPreAllocatedSize(), + outputView, + clock.milliseconds()); + + default: + throw new UnsupportedOperationException("Unsupported write format: " + writeFormat); + } + } + private RecordAppendResult tryAppend( WriteRecord writeRecord, WriteCallback callback, Deque deque) throws Exception { diff --git a/fluss-client/src/main/java/org/apache/fluss/client/write/WriteFormat.java b/fluss-client/src/main/java/org/apache/fluss/client/write/WriteFormat.java index 669c6807eb..24f07c9ce9 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/write/WriteFormat.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/write/WriteFormat.java @@ -18,11 +18,52 @@ package org.apache.fluss.client.write; import org.apache.fluss.annotation.Internal; +import org.apache.fluss.metadata.KvFormat; /** The format of the write record. */ @Internal public enum WriteFormat { - ARROW_LOG, - INDEXED_LOG, - KV + ARROW_LOG(true), + INDEXED_LOG(true), + COMPACTED_LOG(true), + INDEXED_KV(false), + COMPACTED_KV(false); + + private final boolean isLog; + + WriteFormat(boolean isLog) { + this.isLog = isLog; + } + + public boolean isLog() { + return isLog; + } + + public boolean isKv() { + return !isLog; + } + + /** Converts this {@link WriteFormat} to a {@link KvFormat}. */ + public KvFormat toKvFormat() { + switch (this) { + case INDEXED_KV: + return KvFormat.INDEXED; + case COMPACTED_KV: + return KvFormat.COMPACTED; + default: + throw new IllegalArgumentException("WriteFormat " + this + " is not a KvFormat"); + } + } + + /** Converts a {@link KvFormat} to a {@link WriteFormat}. */ + public static WriteFormat fromKvFormat(KvFormat kvFormat) { + switch (kvFormat) { + case INDEXED: + return INDEXED_KV; + case COMPACTED: + return COMPACTED_KV; + default: + throw new IllegalArgumentException("Unknown KvFormat: " + kvFormat); + } + } } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/write/WriteRecord.java b/fluss-client/src/main/java/org/apache/fluss/client/write/WriteRecord.java index 68a3fe8a2e..9265c5f779 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/write/WriteRecord.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/write/WriteRecord.java @@ -20,17 +20,20 @@ import org.apache.fluss.annotation.Internal; import org.apache.fluss.metadata.PhysicalTablePath; import org.apache.fluss.metadata.TableInfo; +import org.apache.fluss.record.CompactedLogRecord; import org.apache.fluss.record.DefaultKvRecord; -import org.apache.fluss.record.DefaultKvRecordBatch; import org.apache.fluss.record.IndexedLogRecord; import org.apache.fluss.row.BinaryRow; import org.apache.fluss.row.InternalRow; +import org.apache.fluss.row.compacted.CompactedRow; import org.apache.fluss.row.indexed.IndexedRow; import javax.annotation.Nullable; +import static org.apache.fluss.record.DefaultKvRecordBatch.RECORD_BATCH_HEADER_SIZE; import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE; import static org.apache.fluss.record.LogRecordBatchFormat.recordBatchHeaderSize; +import static org.apache.fluss.utils.Preconditions.checkArgument; import static org.apache.fluss.utils.Preconditions.checkNotNull; /** @@ -47,19 +50,20 @@ public static WriteRecord forUpsert( BinaryRow row, byte[] key, byte[] bucketKey, + WriteFormat writeFormat, @Nullable int[] targetColumns) { checkNotNull(row, "row must not be null"); checkNotNull(key, "key must not be null"); checkNotNull(bucketKey, "bucketKey must not be null"); - int estimatedSizeInBytes = - DefaultKvRecord.sizeOf(key, row) + DefaultKvRecordBatch.RECORD_BATCH_HEADER_SIZE; + checkArgument(writeFormat.isKv(), "writeFormat must be a KV format"); + int estimatedSizeInBytes = DefaultKvRecord.sizeOf(key, row) + RECORD_BATCH_HEADER_SIZE; return new WriteRecord( tableInfo, tablePath, key, bucketKey, row, - WriteFormat.KV, + writeFormat, targetColumns, estimatedSizeInBytes); } @@ -70,18 +74,19 @@ public static WriteRecord forDelete( PhysicalTablePath tablePath, byte[] key, byte[] bucketKey, + WriteFormat writeFormat, @Nullable int[] targetColumns) { checkNotNull(key, "key must not be null"); checkNotNull(bucketKey, "key must not be null"); - int estimatedSizeInBytes = - DefaultKvRecord.sizeOf(key, null) + DefaultKvRecordBatch.RECORD_BATCH_HEADER_SIZE; + checkArgument(writeFormat.isKv(), "writeFormat must be a KV format"); + int estimatedSizeInBytes = DefaultKvRecord.sizeOf(key, null) + RECORD_BATCH_HEADER_SIZE; return new WriteRecord( tableInfo, tablePath, key, bucketKey, null, - WriteFormat.KV, + writeFormat, targetColumns, estimatedSizeInBytes); } @@ -127,6 +132,26 @@ public static WriteRecord forArrowAppend( estimatedSizeInBytes); } + /** Creates a write record for append operation for Compacted format. */ + public static WriteRecord forCompactedAppend( + TableInfo tableInfo, + PhysicalTablePath tablePath, + CompactedRow row, + @Nullable byte[] bucketKey) { + checkNotNull(row); + int estimatedSizeInBytes = + CompactedLogRecord.sizeOf(row) + recordBatchHeaderSize(CURRENT_LOG_MAGIC_VALUE); + return new WriteRecord( + tableInfo, + tablePath, + null, + bucketKey, + row, + WriteFormat.COMPACTED_LOG, + null, + estimatedSizeInBytes); + } + // ------------------------------------------------------------------------------------------ private final PhysicalTablePath physicalTablePath; diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java index 0cc2f7f10f..27b6690a9e 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java @@ -742,7 +742,7 @@ void testAppendWhileTableMaybeNotReady() throws Exception { } @ParameterizedTest - @ValueSource(strings = {"INDEXED", "ARROW"}) + @ValueSource(strings = {"INDEXED", "ARROW", "COMPACTED"}) void testAppendAndPoll(String format) throws Exception { verifyAppendOrPut(true, format, null); } @@ -1417,4 +1417,197 @@ void testFileSystemRecognizeConnectionConf() throws Exception { Collections.singletonMap("client.fs.test.key", "fs_test_value")); } } + + // ---------------------- PK with COMPACTED log tests ---------------------- + @Test + void testPkUpsertAndPollWithCompactedLog() throws Exception { + Schema schema = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.INT()) + .column("c", DataTypes.STRING()) + .column("d", DataTypes.BIGINT()) + .primaryKey("a") + .build(); + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema(schema) + .kvFormat(KvFormat.COMPACTED) + .logFormat(LogFormat.COMPACTED) + .build(); + TablePath tablePath = TablePath.of("test_db_1", "test_pk_compacted_upsert_poll"); + createTable(tablePath, tableDescriptor, false); + + int expectedSize = 30; + try (Table table = conn.getTable(tablePath)) { + UpsertWriter upsertWriter = table.newUpsert().createWriter(); + for (int i = 0; i < expectedSize; i++) { + String value = i % 2 == 0 ? "hello, friend" + i : null; + GenericRow r = row(i, 100, value, i * 10L); + upsertWriter.upsert(r); + if (i % 10 == 0) { + upsertWriter.flush(); + } + } + upsertWriter.flush(); + + // normal scan + try (LogScanner logScanner = createLogScanner(table)) { + subscribeFromBeginning(logScanner, table); + int count = 0; + while (count < expectedSize) { + ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1)); + for (ScanRecord scanRecord : scanRecords) { + assertThat(scanRecord.getChangeType()).isEqualTo(ChangeType.INSERT); + InternalRow rr = scanRecord.getRow(); + assertThat(rr.getFieldCount()).isEqualTo(4); + assertThat(rr.getInt(0)).isEqualTo(count); + assertThat(rr.getInt(1)).isEqualTo(100); + if (count % 2 == 0) { + assertThat(rr.getString(2).toString()) + .isEqualTo("hello, friend" + count); + } else { + assertThat(rr.isNullAt(2)).isTrue(); + } + assertThat(rr.getLong(3)).isEqualTo(count * 10L); + count++; + } + } + assertThat(count).isEqualTo(expectedSize); + } + + // Creating a projected log scanner for COMPACTED should work + try (LogScanner scanner = createLogScanner(table, new int[] {0, 2})) { + subscribeFromBeginning(scanner, table); + int count = 0; + while (count < expectedSize) { + ScanRecords records = scanner.poll(Duration.ofSeconds(1)); + for (ScanRecord record : records) { + InternalRow row = record.getRow(); + assertThat(row.getFieldCount()).isEqualTo(2); + assertThat(row.getInt(0)).isEqualTo(count); + if (count % 2 == 0) { + assertThat(row.getString(1).toString()) + .isEqualTo("hello, friend" + count); + } else { + assertThat(row.isNullAt(1)).isTrue(); + } + count++; + } + } + assertThat(count).isEqualTo(expectedSize); + } + } + } + + @Test + void testPkUpdateAndDeleteWithCompactedLog() throws Exception { + Schema schema = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.INT()) + .primaryKey("a") + .build(); + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema(schema) + .kvFormat(KvFormat.COMPACTED) + .logFormat(LogFormat.COMPACTED) + .build(); + TablePath tablePath = TablePath.of("test_db_1", "test_pk_compacted_update_delete"); + createTable(tablePath, tableDescriptor, false); + + try (Table table = conn.getTable(tablePath)) { + UpsertWriter upsertWriter = table.newUpsert().createWriter(); + // initial insert + upsertWriter.upsert(row(1, 10)); + upsertWriter.flush(); + // update same key + upsertWriter.upsert(row(1, 20)); + upsertWriter.flush(); + // delete the key + upsertWriter.delete(row(1, 20)); + upsertWriter.flush(); + + LogScanner scanner = createLogScanner(table); + subscribeFromBeginning(scanner, table); + // Expect: +I(1,10), -U(1,10), +U(1,20), -D(1,20) + ChangeType[] expected = { + ChangeType.INSERT, + ChangeType.UPDATE_BEFORE, + ChangeType.UPDATE_AFTER, + ChangeType.DELETE + }; + int seen = 0; + while (seen < expected.length) { + ScanRecords recs = scanner.poll(Duration.ofSeconds(1)); + for (ScanRecord r : recs) { + assertThat(r.getChangeType()).isEqualTo(expected[seen]); + InternalRow row = r.getRow(); + assertThat(row.getInt(0)).isEqualTo(1); + // value field present + if (expected[seen] == ChangeType.UPDATE_AFTER + || expected[seen] == ChangeType.DELETE) { + assertThat(row.getInt(1)).isEqualTo(20); + } else { + assertThat(row.getInt(1)).isEqualTo(10); + } + seen++; + } + } + assertThat(seen).isEqualTo(expected.length); + scanner.close(); + } + } + + @Test + void testPkCompactedPollFromLatestNoRecords() throws Exception { + Schema schema = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.INT()) + .primaryKey("a") + .build(); + TableDescriptor td = + TableDescriptor.builder() + .schema(schema) + .kvFormat(KvFormat.COMPACTED) + .logFormat(LogFormat.COMPACTED) + .build(); + TablePath path = TablePath.of("test_db_1", "test_pk_compacted_latest"); + createTable(path, td, false); + + try (Table table = conn.getTable(path)) { + LogScanner scanner = createLogScanner(table); + subscribeFromLatestOffset(path, null, null, table, scanner, admin); + // Now write a few rows and ensure only these are seen + UpsertWriter upsert = table.newUpsert().createWriter(); + for (int i = 0; i < 5; i++) { + upsert.upsert(row(i, i)); + } + upsert.flush(); + + int seen = 0; + while (seen < 5) { + ScanRecords recs = scanner.poll(Duration.ofSeconds(1)); + for (ScanRecord r : recs) { + assertThat(r.getChangeType()).isEqualTo(ChangeType.INSERT); + assertThat(r.getRow().getInt(0)).isBetween(0, 4); + seen++; + } + } + + // delete non-existent key + upsert.delete(row(42, 0)); + upsert.flush(); + // poll a few times to ensure no accidental records + int total = 0; + for (int i = 0; i < 3; i++) { + total += scanner.poll(Duration.ofSeconds(1)).count(); + } + assertThat(total).isEqualTo(0); + + scanner.close(); + } + } } diff --git a/fluss-client/src/test/java/org/apache/fluss/client/write/CompactedLogWriteBatchTest.java b/fluss-client/src/test/java/org/apache/fluss/client/write/CompactedLogWriteBatchTest.java new file mode 100644 index 0000000000..cbd6680a72 --- /dev/null +++ b/fluss-client/src/test/java/org/apache/fluss/client/write/CompactedLogWriteBatchTest.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.write; + +import org.apache.fluss.memory.MemorySegment; +import org.apache.fluss.memory.PreAllocatedPagedOutputView; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.record.ChangeType; +import org.apache.fluss.record.CompactedLogRecord; +import org.apache.fluss.record.LogRecord; +import org.apache.fluss.record.LogRecordBatch; +import org.apache.fluss.record.LogRecordReadContext; +import org.apache.fluss.record.MemoryLogRecords; +import org.apache.fluss.record.bytesview.BytesView; +import org.apache.fluss.row.compacted.CompactedRow; +import org.apache.fluss.utils.CloseableIterator; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE; +import static org.apache.fluss.record.LogRecordBatchFormat.recordBatchHeaderSize; +import static org.apache.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH; +import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE; +import static org.apache.fluss.record.TestData.DATA1_TABLE_ID; +import static org.apache.fluss.record.TestData.DATA1_TABLE_INFO; +import static org.apache.fluss.testutils.DataTestUtils.compactedRow; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Test for {@link CompactedLogWriteBatch}. */ +public class CompactedLogWriteBatchTest { + private CompactedRow row; + private int estimatedSizeInBytes; + + @BeforeEach + void setup() { + row = compactedRow(DATA1_ROW_TYPE, new Object[] {1, "a"}); + estimatedSizeInBytes = CompactedLogRecord.sizeOf(row); + } + + @Test + void testTryAppendWithWriteLimit() throws Exception { + int bucketId = 0; + int writeLimit = 100; + CompactedLogWriteBatch logProducerBatch = + createLogWriteBatch( + new TableBucket(DATA1_TABLE_ID, bucketId), + 0L, + writeLimit, + MemorySegment.allocateHeapMemory(writeLimit)); + + int maxRecordsPerBatch = + (writeLimit - recordBatchHeaderSize(CURRENT_LOG_MAGIC_VALUE)) + / estimatedSizeInBytes; + for (int i = 0; i < maxRecordsPerBatch; i++) { + boolean appendResult = + logProducerBatch.tryAppend(createWriteRecord(), newWriteCallback()); + assertThat(appendResult).isTrue(); + } + + // batch full. + boolean appendResult = logProducerBatch.tryAppend(createWriteRecord(), newWriteCallback()); + assertThat(appendResult).isFalse(); + } + + @Test + void testToBytes() throws Exception { + int bucketId = 0; + CompactedLogWriteBatch logProducerBatch = + createLogWriteBatch(new TableBucket(DATA1_TABLE_ID, bucketId), 0L); + boolean appendResult = logProducerBatch.tryAppend(createWriteRecord(), newWriteCallback()); + assertThat(appendResult).isTrue(); + + logProducerBatch.close(); + BytesView bytesView = logProducerBatch.build(); + MemoryLogRecords logRecords = MemoryLogRecords.pointToBytesView(bytesView); + Iterator iterator = logRecords.batches().iterator(); + assertDefaultLogRecordBatchEquals(iterator.next()); + assertThat(iterator.hasNext()).isFalse(); + } + + @Test + void testCompleteTwice() throws Exception { + int bucketId = 0; + CompactedLogWriteBatch logWriteBatch = + createLogWriteBatch(new TableBucket(DATA1_TABLE_ID, bucketId), 0L); + boolean appendResult = logWriteBatch.tryAppend(createWriteRecord(), newWriteCallback()); + assertThat(appendResult).isTrue(); + + assertThat(logWriteBatch.complete()).isTrue(); + assertThatThrownBy(logWriteBatch::complete) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining( + "A SUCCEEDED batch must not attempt another state change to SUCCEEDED"); + } + + @Test + void testFailedTwice() throws Exception { + int bucketId = 0; + CompactedLogWriteBatch logWriteBatch = + createLogWriteBatch(new TableBucket(DATA1_TABLE_ID, bucketId), 0L); + boolean appendResult = logWriteBatch.tryAppend(createWriteRecord(), newWriteCallback()); + assertThat(appendResult).isTrue(); + + assertThat(logWriteBatch.completeExceptionally(new IllegalStateException("test failed."))) + .isTrue(); + // FAILED --> FAILED transitions are ignored. + assertThat(logWriteBatch.completeExceptionally(new IllegalStateException("test failed."))) + .isFalse(); + } + + @Test + void testClose() throws Exception { + int bucketId = 0; + CompactedLogWriteBatch logProducerBatch = + createLogWriteBatch(new TableBucket(DATA1_TABLE_ID, bucketId), 0L); + boolean appendResult = logProducerBatch.tryAppend(createWriteRecord(), newWriteCallback()); + assertThat(appendResult).isTrue(); + + logProducerBatch.close(); + assertThat(logProducerBatch.isClosed()).isTrue(); + + appendResult = logProducerBatch.tryAppend(createWriteRecord(), newWriteCallback()); + assertThat(appendResult).isFalse(); + } + + @Test + void testBatchAborted() throws Exception { + int bucketId = 0; + int writeLimit = 10240; + CompactedLogWriteBatch logProducerBatch = + createLogWriteBatch( + new TableBucket(DATA1_TABLE_ID, bucketId), + 0L, + writeLimit, + MemorySegment.allocateHeapMemory(writeLimit)); + + int recordCount = 5; + List> futures = new ArrayList<>(); + for (int i = 0; i < recordCount; i++) { + CompletableFuture future = new CompletableFuture<>(); + logProducerBatch.tryAppend( + createWriteRecord(), + exception -> { + if (exception != null) { + future.completeExceptionally(exception); + } else { + future.complete(null); + } + }); + futures.add(future); + } + + logProducerBatch.abortRecordAppends(); + logProducerBatch.abort(new RuntimeException("close with record batch abort")); + + // first try to append. + assertThatThrownBy( + () -> logProducerBatch.tryAppend(createWriteRecord(), newWriteCallback())) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining( + "Tried to append a record, but MemoryLogRecordsCompactedBuilder has already been aborted"); + + // try to build. + assertThatThrownBy(logProducerBatch::build) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("Attempting to build an aborted record batch"); + + // verify record append future is completed with exception. + for (CompletableFuture future : futures) { + assertThatThrownBy(future::join) + .rootCause() + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("close with record batch abort"); + } + } + + private WriteRecord createWriteRecord() { + return WriteRecord.forCompactedAppend( + DATA1_TABLE_INFO, DATA1_PHYSICAL_TABLE_PATH, row, null); + } + + private CompactedLogWriteBatch createLogWriteBatch(TableBucket tb, long baseLogOffset) + throws Exception { + return createLogWriteBatch( + tb, baseLogOffset, Integer.MAX_VALUE, MemorySegment.allocateHeapMemory(1000)); + } + + private CompactedLogWriteBatch createLogWriteBatch( + TableBucket tb, long baseLogOffset, int writeLimit, MemorySegment memorySegment) { + return new CompactedLogWriteBatch( + tb.getBucket(), + DATA1_PHYSICAL_TABLE_PATH, + DATA1_TABLE_INFO.getSchemaId(), + writeLimit, + new PreAllocatedPagedOutputView(Collections.singletonList(memorySegment)), + System.currentTimeMillis()); + } + + private void assertDefaultLogRecordBatchEquals(LogRecordBatch recordBatch) { + assertThat(recordBatch.getRecordCount()).isEqualTo(1); + assertThat(recordBatch.baseLogOffset()).isEqualTo(0L); + assertThat(recordBatch.schemaId()).isEqualTo((short) DATA1_TABLE_INFO.getSchemaId()); + try (LogRecordReadContext readContext = + LogRecordReadContext.createCompactedRowReadContext( + DATA1_ROW_TYPE, DATA1_TABLE_INFO.getSchemaId()); + CloseableIterator iterator = recordBatch.records(readContext)) { + assertThat(iterator.hasNext()).isTrue(); + LogRecord record = iterator.next(); + assertThat(record.getChangeType()).isEqualTo(ChangeType.APPEND_ONLY); + assertThat(record.getRow()).isEqualTo(row); + assertThat(iterator.hasNext()).isFalse(); + } + } + + private WriteCallback newWriteCallback() { + return exception -> { + if (exception != null) { + throw new RuntimeException(exception); + } + }; + } +} diff --git a/fluss-client/src/test/java/org/apache/fluss/client/write/IndexedLogWriteBatchTest.java b/fluss-client/src/test/java/org/apache/fluss/client/write/IndexedLogWriteBatchTest.java index db39493bd7..aca1cde1b5 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/write/IndexedLogWriteBatchTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/write/IndexedLogWriteBatchTest.java @@ -74,11 +74,10 @@ void testTryAppendWithWriteLimit() throws Exception { writeLimit, MemorySegment.allocateHeapMemory(writeLimit)); - for (int i = 0; - i - < (writeLimit - recordBatchHeaderSize(CURRENT_LOG_MAGIC_VALUE)) - / estimatedSizeInBytes; - i++) { + int maxRecordsPerBatch = + (writeLimit - recordBatchHeaderSize(CURRENT_LOG_MAGIC_VALUE)) + / estimatedSizeInBytes; + for (int i = 0; i < maxRecordsPerBatch; i++) { boolean appendResult = logProducerBatch.tryAppend(createWriteRecord(), newWriteCallback()); assertThat(appendResult).isTrue(); diff --git a/fluss-client/src/test/java/org/apache/fluss/client/write/KvWriteBatchTest.java b/fluss-client/src/test/java/org/apache/fluss/client/write/KvWriteBatchTest.java index 82f6ee5906..c62c7b0297 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/write/KvWriteBatchTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/write/KvWriteBatchTest.java @@ -207,6 +207,7 @@ protected WriteRecord createWriteRecord() { row, key, key, + WriteFormat.COMPACTED_KV, null); } diff --git a/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsCompactedBuilder.java b/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsCompactedBuilder.java index e23aa5cc3b..13cda3325e 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsCompactedBuilder.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsCompactedBuilder.java @@ -30,8 +30,7 @@ * Default builder for {@link MemoryLogRecords} of log records in {@link LogFormat#COMPACTED} * format. */ -public class MemoryLogRecordsCompactedBuilder - extends AbstractRowMemoryLogRecordsBuilder { +public class MemoryLogRecordsCompactedBuilder extends MemoryLogRecordsRowBuilder { private MemoryLogRecordsCompactedBuilder( long baseLogOffset, diff --git a/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsIndexedBuilder.java b/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsIndexedBuilder.java index a713c34ee2..34688ef555 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsIndexedBuilder.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsIndexedBuilder.java @@ -29,7 +29,7 @@ * Default builder for {@link MemoryLogRecords} of log records in {@link * org.apache.fluss.metadata.LogFormat#INDEXED} format. */ -public class MemoryLogRecordsIndexedBuilder extends AbstractRowMemoryLogRecordsBuilder { +public class MemoryLogRecordsIndexedBuilder extends MemoryLogRecordsRowBuilder { private MemoryLogRecordsIndexedBuilder( long baseLogOffset, diff --git a/fluss-common/src/main/java/org/apache/fluss/record/AbstractRowMemoryLogRecordsBuilder.java b/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsRowBuilder.java similarity index 98% rename from fluss-common/src/main/java/org/apache/fluss/record/AbstractRowMemoryLogRecordsBuilder.java rename to fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsRowBuilder.java index 423b35a3b9..127503aa02 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/AbstractRowMemoryLogRecordsBuilder.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsRowBuilder.java @@ -39,7 +39,7 @@ import static org.apache.fluss.utils.Preconditions.checkArgument; /** Abstract base builder for row-based MemoryLogRecords builders sharing common logic. */ -abstract class AbstractRowMemoryLogRecordsBuilder implements AutoCloseable { +public abstract class MemoryLogRecordsRowBuilder implements AutoCloseable { protected static final int BUILDER_DEFAULT_OFFSET = 0; protected final long baseLogOffset; @@ -59,7 +59,7 @@ abstract class AbstractRowMemoryLogRecordsBuilder implements AutoCloseable { private volatile boolean isClosed; private boolean aborted = false; - protected AbstractRowMemoryLogRecordsBuilder( + protected MemoryLogRecordsRowBuilder( long baseLogOffset, int schemaId, int writeLimit,