Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1423,6 +1423,17 @@ public class ConfigOptions {
+ "The auto increment column can only be used in primary-key table. The data type of the auto increment column must be INT or BIGINT."
+ "Currently a table can have only one auto-increment column.");

public static final ConfigOption<Boolean> TABLE_CHANGELOG_IGNORE_UPDATE_BEFORE =
key("table.changelog.ignore-update-before")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to ignore UPDATE_BEFORE records in changelog for the primary key table. "
+ "When disabled (default), update operations produce both UPDATE_BEFORE and UPDATE_AFTER records. "
+ "When enabled, update operations only produce UPDATE_AFTER records, "
+ "which reduces storage and transmission costs but loses the ability to track previous values. "
+ "This option only affects primary key tables.");

// ------------------------------------------------------------------------
// ConfigOptions for Kv
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,15 @@ public Optional<DeleteBehavior> getDeleteBehavior() {
return config.getOptional(ConfigOptions.TABLE_DELETE_BEHAVIOR);
}

/**
* Whether to ignore UPDATE_BEFORE records in changelog for the primary key table. When false
* (default), update operations produce both UPDATE_BEFORE and UPDATE_AFTER records. When true,
* update operations only produce UPDATE_AFTER records.
*/
public boolean isChangelogIgnoreUpdateBefore() {
return config.get(ConfigOptions.TABLE_CHANGELOG_IGNORE_UPDATE_BEFORE);
}

/** Gets the Arrow compression type and compression level of the table. */
public ArrowCompressionInfo getArrowCompressionInfo() {
return ArrowCompressionInfo.fromConf(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,31 @@ public ChangelogMode getChangelogMode() {
if (mergeEngineType == MergeEngineType.FIRST_ROW) {
return ChangelogMode.insertOnly();
} else {
// Check delete behavior configuration
Configuration tableConf = Configuration.fromMap(tableOptions);
DeleteBehavior deleteBehavior =
tableConf.get(ConfigOptions.TABLE_DELETE_BEHAVIOR);
boolean ignoreUpdateBefore =
tableConf.get(ConfigOptions.TABLE_CHANGELOG_IGNORE_UPDATE_BEFORE);
if (ignoreUpdateBefore) {
// When ignoring UPDATE_BEFORE, only produce INSERT, UPDATE_AFTER (and
// DELETE if allowed)
if (deleteBehavior == DeleteBehavior.ALLOW) {
// DELETE is still produced when delete behavior is allowed
return ChangelogMode.newBuilder()
.addContainedKind(RowKind.INSERT)
.addContainedKind(RowKind.UPDATE_AFTER)
.addContainedKind(RowKind.DELETE)
.build();
} else {
// No DELETE when delete operations are ignored or disabled
return ChangelogMode.newBuilder()
.addContainedKind(RowKind.INSERT)
.addContainedKind(RowKind.UPDATE_AFTER)
.build();
}
}

// Not ignoring UPDATE_BEFORE, produce full changelog
if (deleteBehavior == DeleteBehavior.ALLOW) {
return ChangelogMode.all();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,8 @@ public KvTablet getOrCreateKv(
kvFormat,
merger,
arrowCompressionInfo,
schemaGetter);
schemaGetter,
tableConfig.isChangelogIgnoreUpdateBefore());
currentKvs.put(tableBucket, tablet);

LOG.info(
Expand Down Expand Up @@ -277,9 +278,8 @@ public KvTablet loadKv(File tabletDir, SchemaGetter schemaGetter) throws Excepti
TablePath tablePath = physicalTablePath.getTablePath();
TableInfo tableInfo = getTableInfo(zkClient, tablePath);

RowMerger rowMerger =
RowMerger.create(
tableInfo.getTableConfig(), tableInfo.getTableConfig().getKvFormat());
TableConfig tableConfig = tableInfo.getTableConfig();
RowMerger rowMerger = RowMerger.create(tableConfig, tableConfig.getKvFormat());
KvTablet kvTablet =
KvTablet.create(
physicalTablePath,
Expand All @@ -290,10 +290,11 @@ public KvTablet loadKv(File tabletDir, SchemaGetter schemaGetter) throws Excepti
serverMetricGroup,
arrowBufferAllocator,
memorySegmentPool,
tableInfo.getTableConfig().getKvFormat(),
tableConfig.getKvFormat(),
rowMerger,
tableInfo.getTableConfig().getArrowCompressionInfo(),
schemaGetter);
tableConfig.getArrowCompressionInfo(),
schemaGetter,
tableConfig.isChangelogIgnoreUpdateBefore());
if (this.currentKvs.containsKey(tableBucket)) {
throw new IllegalStateException(
String.format(
Expand Down
131 changes: 89 additions & 42 deletions fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.fluss.server.kv.rocksdb.RocksDBKv;
import org.apache.fluss.server.kv.rocksdb.RocksDBKvBuilder;
import org.apache.fluss.server.kv.rocksdb.RocksDBResourceContainer;
import org.apache.fluss.server.kv.rowmerger.DefaultRowMerger;
import org.apache.fluss.server.kv.rowmerger.RowMerger;
import org.apache.fluss.server.kv.snapshot.KvFileHandleAndLocalPath;
import org.apache.fluss.server.kv.snapshot.KvSnapshotDataUploader;
Expand Down Expand Up @@ -113,6 +114,9 @@ public final class KvTablet {

private final SchemaGetter schemaGetter;

// whether to ignore UPDATE_BEFORE records in changelog
private final boolean ignoreUpdateBefore;

/**
* The kv data in pre-write buffer whose log offset is less than the flushedLogOffset has been
* flushed into kv.
Expand All @@ -136,7 +140,8 @@ private KvTablet(
KvFormat kvFormat,
RowMerger rowMerger,
ArrowCompressionInfo arrowCompressionInfo,
SchemaGetter schemaGetter) {
SchemaGetter schemaGetter,
boolean ignoreUpdateBefore) {
this.physicalPath = physicalPath;
this.tableBucket = tableBucket;
this.logTablet = logTablet;
Expand All @@ -151,6 +156,7 @@ private KvTablet(
this.rowMerger = rowMerger;
this.arrowCompressionInfo = arrowCompressionInfo;
this.schemaGetter = schemaGetter;
this.ignoreUpdateBefore = ignoreUpdateBefore;
}

public static KvTablet create(
Expand All @@ -163,7 +169,8 @@ public static KvTablet create(
KvFormat kvFormat,
RowMerger rowMerger,
ArrowCompressionInfo arrowCompressionInfo,
SchemaGetter schemaGetter)
SchemaGetter schemaGetter,
boolean ignoreUpdateBefore)
throws IOException {
Tuple2<PhysicalTablePath, TableBucket> tablePathAndBucket =
FlussPaths.parseTabletDir(kvTabletDir);
Expand All @@ -179,7 +186,8 @@ public static KvTablet create(
kvFormat,
rowMerger,
arrowCompressionInfo,
schemaGetter);
schemaGetter,
ignoreUpdateBefore);
}

public static KvTablet create(
Expand All @@ -194,7 +202,8 @@ public static KvTablet create(
KvFormat kvFormat,
RowMerger rowMerger,
ArrowCompressionInfo arrowCompressionInfo,
SchemaGetter schemaGetter)
SchemaGetter schemaGetter,
boolean ignoreUpdateBefore)
throws IOException {
RocksDBKv kv = buildRocksDBKv(serverConf, kvTabletDir);
return new KvTablet(
Expand All @@ -211,7 +220,8 @@ public static KvTablet create(
kvFormat,
rowMerger,
arrowCompressionInfo,
schemaGetter);
schemaGetter,
ignoreUpdateBefore);
}

private static RocksDBKv buildRocksDBKv(Configuration configuration, File kvDir)
Expand Down Expand Up @@ -345,52 +355,89 @@ public LogAppendInfo putAsLeader(KvRecordBatch kvRecords, @Nullable int[] target
latestSchemaRow.replaceRow(oldValue.row));
kvPreWriteBuffer.delete(key, logOffset++);
} else {
// otherwise, it's a partial update, should produce -U,+U
walBuilder.append(
ChangeType.UPDATE_BEFORE,
latestSchemaRow.replaceRow(oldValue.row));
walBuilder.append(
ChangeType.UPDATE_AFTER,
latestSchemaRow.replaceRow(newValue.row));
kvPreWriteBuffer.put(
key, newValue.encodeValue(), logOffset + 1);
logOffset += 2;
// otherwise, it's a partial update
if (ignoreUpdateBefore) {
// only produce +U
walBuilder.append(
ChangeType.UPDATE_AFTER,
latestSchemaRow.replaceRow(newValue.row));
kvPreWriteBuffer.put(
key, newValue.encodeValue(), logOffset);
logOffset++;
} else {
// produce -U, +U
walBuilder.append(
ChangeType.UPDATE_BEFORE,
latestSchemaRow.replaceRow(oldValue.row));
walBuilder.append(
ChangeType.UPDATE_AFTER,
latestSchemaRow.replaceRow(newValue.row));
kvPreWriteBuffer.put(
key, newValue.encodeValue(), logOffset + 1);
logOffset += 2;
}
}
}
} else {
// upsert operation
byte[] oldValueBytes = getFromBufferOrKv(key);
// it's update
if (oldValueBytes != null) {
BinaryValue oldValue = valueDecoder.decodeValue(oldValueBytes);
BinaryValue newValue =
currentMerger.merge(oldValue, currentValue);
if (newValue == oldValue) {
// newValue is the same to oldValue, means nothing
// happens (no update/delete), and input should be ignored
continue;
}

walBuilder.append(
ChangeType.UPDATE_BEFORE,
latestSchemaRow.replaceRow(oldValue.row));
// Optimization: when ignoring UPDATE_BEFORE and merger is
// DefaultRowMerger (full update, not partial update), we can skip
// fetching old value for better performance since it always returns
// new value.
if (ignoreUpdateBefore
&& currentMerger instanceof DefaultRowMerger) {
// Fast path: directly produce +U without fetching old value
walBuilder.append(
ChangeType.UPDATE_AFTER,
latestSchemaRow.replaceRow(newValue.row));
// logOffset is for -U, logOffset + 1 is for +U, we need to use
// the log offset for +U
kvPreWriteBuffer.put(
key, newValue.encodeValue(), logOffset + 1);
logOffset += 2;
} else {
// it's insert
// TODO: we should add guarantees that all non-specified columns
// of the input row are set to null.
walBuilder.append(
ChangeType.INSERT,
latestSchemaRow.replaceRow(currentValue.row));
kvPreWriteBuffer.put(
key, currentValue.encodeValue(), logOffset++);
} else {
byte[] oldValueBytes = getFromBufferOrKv(key);
// it's update
if (oldValueBytes != null) {
BinaryValue oldValue =
valueDecoder.decodeValue(oldValueBytes);
BinaryValue newValue =
currentMerger.merge(oldValue, currentValue);
if (newValue == oldValue) {
// newValue is the same to oldValue, means nothing
// happens (no update/delete), and input should be
// ignored
continue;
}

if (ignoreUpdateBefore) {
// only produce +U when ignoring UPDATE_BEFORE
walBuilder.append(
ChangeType.UPDATE_AFTER,
latestSchemaRow.replaceRow(newValue.row));
kvPreWriteBuffer.put(
key, newValue.encodeValue(), logOffset);
logOffset++;
} else {
walBuilder.append(
ChangeType.UPDATE_BEFORE,
latestSchemaRow.replaceRow(oldValue.row));
walBuilder.append(
ChangeType.UPDATE_AFTER,
latestSchemaRow.replaceRow(newValue.row));
// logOffset is for -U, logOffset + 1 is for +U, we need
// to use the log offset for +U
kvPreWriteBuffer.put(
key, newValue.encodeValue(), logOffset + 1);
logOffset += 2;
}
} else {
// it's insert
// TODO: we should add guarantees that all non-specified
// columns of the input row are set to null.
walBuilder.append(
ChangeType.INSERT,
latestSchemaRow.replaceRow(currentValue.row));
kvPreWriteBuffer.put(
key, currentValue.encodeValue(), logOffset++);
}
}
}
}
Expand Down
Loading