Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
1165438
Add Compacted in LogFormat
polyzos Aug 28, 2025
d0698ae
Introduce CompactedLogRecord and related functionality
polyzos Aug 30, 2025
8ced8f5
Add tests for CompactedLogRecord
polyzos Aug 30, 2025
88a06f8
fix vcs.xml
polyzos Aug 30, 2025
5396cc7
apply spotless
polyzos Aug 30, 2025
b362b4b
revert vcs changes
polyzos Aug 30, 2025
6caa7a1
update AppendWriterImpl to support compactedRow
polyzos Aug 30, 2025
8cad339
apply spotless
polyzos Aug 30, 2025
40db54b
CompactedLogWriteBatch and MemoryLogRecordsCompactedBuilder with tests
polyzos Aug 30, 2025
e6d4567
refactor to minimize duplicate code between indexed and compactedrow …
polyzos Aug 30, 2025
8286065
allow kv to use compacted log format
polyzos Aug 30, 2025
372818d
add end2end functionality for compact log write and tests
polyzos Aug 30, 2025
7c4ef28
throw exception if projection is used in compacted log format
polyzos Aug 30, 2025
ac1ca45
small improvements
polyzos Sep 1, 2025
f1eb795
Update fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRe…
polyzos Sep 1, 2025
f887182
fix violations
polyzos Sep 1, 2025
f6fdc58
fix compilation issues after rebase
polyzos Sep 10, 2025
bbf0511
fix import issue
polyzos Sep 10, 2025
8843745
fix import issue
polyzos Sep 10, 2025
a2ec69f
fix checkstyle issue
polyzos Sep 10, 2025
a79d9ac
fix remaining import issues
polyzos Sep 10, 2025
5a91d49
fix tests
polyzos Sep 10, 2025
c17103c
fix failing tests
polyzos Sep 10, 2025
9efb9d2
rebase branch
polyzos Dec 4, 2025
d7dd776
fix rebase compilation issues
polyzos Dec 4, 2025
c63180c
fix rebase issues
polyzos Dec 21, 2025
cf23fc3
improve some code
wuchong Dec 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.row.InternalRow.FieldGetter;
import org.apache.fluss.row.compacted.CompactedRow;
import org.apache.fluss.row.encode.CompactedRowEncoder;
import org.apache.fluss.row.encode.IndexedRowEncoder;
import org.apache.fluss.row.encode.KeyEncoder;
import org.apache.fluss.row.indexed.IndexedRow;
import org.apache.fluss.types.DataType;
import org.apache.fluss.types.RowType;

import javax.annotation.Nullable;
Expand All @@ -44,6 +47,7 @@ class AppendWriterImpl extends AbstractTableWriter implements AppendWriter {

private final LogFormat logFormat;
private final IndexedRowEncoder indexedRowEncoder;
private final CompactedRowEncoder compactedRowEncoder;
private final FieldGetter[] fieldGetters;
private final TableInfo tableInfo;

Expand All @@ -58,8 +62,12 @@ class AppendWriterImpl extends AbstractTableWriter implements AppendWriter {
this.bucketKeyEncoder = KeyEncoder.of(rowType, bucketKeys, lakeFormat);
}

DataType[] fieldDataTypes =
tableInfo.getSchema().getRowType().getChildren().toArray(new DataType[0]);

this.logFormat = tableInfo.getTableConfig().getLogFormat();
this.indexedRowEncoder = new IndexedRowEncoder(tableInfo.getRowType());
this.compactedRowEncoder = new CompactedRowEncoder(fieldDataTypes);
this.fieldGetters = InternalRow.createFieldGetters(tableInfo.getRowType());
this.tableInfo = tableInfo;
}
Expand All @@ -80,13 +88,30 @@ public CompletableFuture<AppendResult> append(InternalRow row) {
if (logFormat == LogFormat.INDEXED) {
IndexedRow indexedRow = encodeIndexedRow(row);
record = WriteRecord.forIndexedAppend(tableInfo, physicalPath, indexedRow, bucketKey);
} else if (logFormat == LogFormat.COMPACTED) {
CompactedRow compactedRow = encodeCompactedRow(row);
record =
WriteRecord.forCompactedAppend(
tableInfo, physicalPath, compactedRow, bucketKey);
} else {
// ARROW format supports general internal row
record = WriteRecord.forArrowAppend(tableInfo, physicalPath, row, bucketKey);
}
return send(record).thenApply(ignored -> APPEND_SUCCESS);
}

private CompactedRow encodeCompactedRow(InternalRow row) {
if (row instanceof CompactedRow) {
return (CompactedRow) row;
}

compactedRowEncoder.startNewRow();
for (int i = 0; i < fieldCount; i++) {
compactedRowEncoder.encodeField(i, fieldGetters[i].getFieldOrNull(row));
}
return compactedRowEncoder.finishRow();
}

private IndexedRow encodeIndexedRow(InternalRow row) {
if (row instanceof IndexedRow) {
return (IndexedRow) row;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.fluss.client.table.writer;

import org.apache.fluss.client.write.WriteFormat;
import org.apache.fluss.client.write.WriteRecord;
import org.apache.fluss.client.write.WriterClient;
import org.apache.fluss.metadata.DataLakeFormat;
Expand Down Expand Up @@ -50,6 +51,7 @@ class UpsertWriterImpl extends AbstractTableWriter implements UpsertWriter {
private final KeyEncoder bucketKeyEncoder;

private final KvFormat kvFormat;
private final WriteFormat writeFormat;
private final RowEncoder rowEncoder;
private final FieldGetter[] fieldGetters;
private final TableInfo tableInfo;
Expand Down Expand Up @@ -78,6 +80,7 @@ class UpsertWriterImpl extends AbstractTableWriter implements UpsertWriter {
: KeyEncoder.of(rowType, tableInfo.getBucketKeys(), lakeFormat);

this.kvFormat = tableInfo.getTableConfig().getKvFormat();
this.writeFormat = WriteFormat.fromKvFormat(this.kvFormat);
this.rowEncoder = RowEncoder.create(kvFormat, rowType);
this.fieldGetters = InternalRow.createFieldGetters(rowType);
this.tableInfo = tableInfo;
Expand Down Expand Up @@ -164,6 +167,7 @@ public CompletableFuture<UpsertResult> upsert(InternalRow row) {
encodeRow(row),
key,
bucketKey,
writeFormat,
targetColumns);
return send(record).thenApply(ignored -> UPSERT_SUCCESS);
}
Expand All @@ -182,7 +186,12 @@ public CompletableFuture<DeleteResult> delete(InternalRow row) {
bucketKeyEncoder == primaryKeyEncoder ? key : bucketKeyEncoder.encodeKey(row);
WriteRecord record =
WriteRecord.forDelete(
tableInfo, getPhysicalPath(row), key, bucketKey, targetColumns);
tableInfo,
getPhysicalPath(row),
key,
bucketKey,
writeFormat,
targetColumns);
return send(record).thenApply(ignored -> DELETE_SUCCESS);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.client.write;

import org.apache.fluss.exception.FlussRuntimeException;
import org.apache.fluss.memory.AbstractPagedOutputView;
import org.apache.fluss.memory.MemorySegment;
import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.record.ChangeType;
import org.apache.fluss.record.MemoryLogRecordsRowBuilder;
import org.apache.fluss.record.bytesview.BytesView;
import org.apache.fluss.row.InternalRow;

import java.io.IOException;
import java.util.List;

import static org.apache.fluss.utils.Preconditions.checkArgument;
import static org.apache.fluss.utils.Preconditions.checkNotNull;

/**
* Abstract base class to deduplicate logic for row-based log write batches backed by in-memory
* builders. Concrete subclasses only need to provide a row-type validator/caster and a
* RecordsBuilderAdapter implementation.
*/
abstract class AbstractRowLogWriteBatch<R> extends WriteBatch {

private final AbstractPagedOutputView outputView;
private final MemoryLogRecordsRowBuilder<R> recordsBuilder;
private final String buildErrorMessage;

protected AbstractRowLogWriteBatch(
int bucketId,
PhysicalTablePath physicalTablePath,
long createdMs,
AbstractPagedOutputView outputView,
MemoryLogRecordsRowBuilder<R> recordsBuilder,
String buildErrorMessage) {
super(bucketId, physicalTablePath, createdMs);
this.outputView = outputView;
this.recordsBuilder = recordsBuilder;
this.buildErrorMessage = buildErrorMessage;
}

@Override
public boolean tryAppend(WriteRecord writeRecord, WriteCallback callback) throws Exception {
checkNotNull(callback, "write callback must be not null");
InternalRow rowObj = writeRecord.getRow();
checkNotNull(rowObj, "row must not be null for log record");
checkArgument(writeRecord.getKey() == null, "key must be null for log record");
checkArgument(
writeRecord.getTargetColumns() == null,
"target columns must be null for log record");

R row = requireAndCastRow(rowObj);
if (!recordsBuilder.hasRoomFor(row) || isClosed()) {
return false;
}
recordsBuilder.append(ChangeType.APPEND_ONLY, row);
recordCount++;
callbacks.add(callback);
return true;
}

protected abstract R requireAndCastRow(InternalRow row);

@Override
public boolean isLogBatch() {
return true;
}

@Override
public BytesView build() {
try {
return recordsBuilder.build();
} catch (IOException e) {
throw new FlussRuntimeException(buildErrorMessage, e);
}
}

@Override
public boolean isClosed() {
return recordsBuilder.isClosed();
}

@Override
public void close() throws Exception {
recordsBuilder.close();
reopened = false;
}

@Override
public List<MemorySegment> pooledMemorySegments() {
return outputView.allocatedPooledSegments();
}

@Override
public void setWriterState(long writerId, int batchSequence) {
recordsBuilder.setWriterState(writerId, batchSequence);
}

@Override
public long writerId() {
return recordsBuilder.writerId();
}

@Override
public int batchSequence() {
return recordsBuilder.batchSequence();
}

@Override
public void abortRecordAppends() {
recordsBuilder.abort();
}

@Override
public void resetWriterState(long writerId, int batchSequence) {
super.resetWriterState(writerId, batchSequence);
recordsBuilder.resetWriterState(writerId, batchSequence);
}

@Override
public int estimatedSizeInBytes() {
return recordsBuilder.getSizeInBytes();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.client.write;

import org.apache.fluss.annotation.Internal;
import org.apache.fluss.memory.AbstractPagedOutputView;
import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.record.MemoryLogRecordsCompactedBuilder;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.row.compacted.CompactedRow;
import org.apache.fluss.rpc.messages.ProduceLogRequest;

import javax.annotation.concurrent.NotThreadSafe;

import static org.apache.fluss.utils.Preconditions.checkArgument;

/**
* A batch of log records managed in COMPACTED format that is or will be sent to server by {@link
* ProduceLogRequest}.
*
* <p>This class is not thread safe and external synchronization must be used when modifying it.
*/
@NotThreadSafe
@Internal
public final class CompactedLogWriteBatch extends AbstractRowLogWriteBatch<CompactedRow> {

public CompactedLogWriteBatch(
int bucketId,
PhysicalTablePath physicalTablePath,
int schemaId,
int writeLimit,
AbstractPagedOutputView outputView,
long createdMs) {
super(
bucketId,
physicalTablePath,
createdMs,
outputView,
MemoryLogRecordsCompactedBuilder.builder(schemaId, writeLimit, outputView, true),
"Failed to build compacted log record batch.");
}

@Override
protected CompactedRow requireAndCastRow(InternalRow row) {
checkArgument(
row instanceof CompactedRow, "row must be CompactedRow for compacted log table");
return (CompactedRow) row;
}
}
Loading