Skip to content

Commit 0844d64

Browse files
[KV] Support Ignoring UPDATE_BEFORE Records in Changelog for Primary Key Tables
1 parent 872554b commit 0844d64

File tree

7 files changed

+255
-54
lines changed

7 files changed

+255
-54
lines changed

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1423,6 +1423,17 @@ public class ConfigOptions {
14231423
+ "The auto increment column can only be used in primary-key table. The data type of the auto increment column must be INT or BIGINT."
14241424
+ "Currently a table can have only one auto-increment column.");
14251425

1426+
public static final ConfigOption<Boolean> TABLE_CHANGELOG_IGNORE_UPDATE_BEFORE =
1427+
key("table.changelog.ignore-update-before")
1428+
.booleanType()
1429+
.defaultValue(false)
1430+
.withDescription(
1431+
"Whether to ignore UPDATE_BEFORE records in changelog for the primary key table. "
1432+
+ "When disabled (default), update operations produce both UPDATE_BEFORE and UPDATE_AFTER records. "
1433+
+ "When enabled, update operations only produce UPDATE_AFTER records, "
1434+
+ "which reduces storage and transmission costs but loses the ability to track previous values. "
1435+
+ "This option only affects primary key tables.");
1436+
14261437
// ------------------------------------------------------------------------
14271438
// ConfigOptions for Kv
14281439
// ------------------------------------------------------------------------

fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,15 @@ public Optional<DeleteBehavior> getDeleteBehavior() {
117117
return config.getOptional(ConfigOptions.TABLE_DELETE_BEHAVIOR);
118118
}
119119

120+
/**
121+
* Whether to ignore UPDATE_BEFORE records in changelog for the primary key table. When false
122+
* (default), update operations produce both UPDATE_BEFORE and UPDATE_AFTER records. When true,
123+
* update operations only produce UPDATE_AFTER records.
124+
*/
125+
public boolean isChangelogIgnoreUpdateBefore() {
126+
return config.get(ConfigOptions.TABLE_CHANGELOG_IGNORE_UPDATE_BEFORE);
127+
}
128+
120129
/** Gets the Arrow compression type and compression level of the table. */
121130
public ArrowCompressionInfo getArrowCompressionInfo() {
122131
return ArrowCompressionInfo.fromConf(config);

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,10 +206,31 @@ public ChangelogMode getChangelogMode() {
206206
if (mergeEngineType == MergeEngineType.FIRST_ROW) {
207207
return ChangelogMode.insertOnly();
208208
} else {
209-
// Check delete behavior configuration
210209
Configuration tableConf = Configuration.fromMap(tableOptions);
211210
DeleteBehavior deleteBehavior =
212211
tableConf.get(ConfigOptions.TABLE_DELETE_BEHAVIOR);
212+
boolean ignoreUpdateBefore =
213+
tableConf.get(ConfigOptions.TABLE_CHANGELOG_IGNORE_UPDATE_BEFORE);
214+
if (ignoreUpdateBefore) {
215+
// When ignoring UPDATE_BEFORE, only produce INSERT, UPDATE_AFTER (and
216+
// DELETE if allowed)
217+
if (deleteBehavior == DeleteBehavior.ALLOW) {
218+
// DELETE is still produced when delete behavior is allowed
219+
return ChangelogMode.newBuilder()
220+
.addContainedKind(RowKind.INSERT)
221+
.addContainedKind(RowKind.UPDATE_AFTER)
222+
.addContainedKind(RowKind.DELETE)
223+
.build();
224+
} else {
225+
// No DELETE when delete operations are ignored or disabled
226+
return ChangelogMode.newBuilder()
227+
.addContainedKind(RowKind.INSERT)
228+
.addContainedKind(RowKind.UPDATE_AFTER)
229+
.build();
230+
}
231+
}
232+
233+
// Not ignoring UPDATE_BEFORE, produce full changelog
213234
if (deleteBehavior == DeleteBehavior.ALLOW) {
214235
return ChangelogMode.all();
215236
} else {

fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,8 @@ public KvTablet getOrCreateKv(
185185
kvFormat,
186186
merger,
187187
arrowCompressionInfo,
188-
schemaGetter);
188+
schemaGetter,
189+
tableConfig.isChangelogIgnoreUpdateBefore());
189190
currentKvs.put(tableBucket, tablet);
190191

191192
LOG.info(
@@ -277,9 +278,8 @@ public KvTablet loadKv(File tabletDir, SchemaGetter schemaGetter) throws Excepti
277278
TablePath tablePath = physicalTablePath.getTablePath();
278279
TableInfo tableInfo = getTableInfo(zkClient, tablePath);
279280

280-
RowMerger rowMerger =
281-
RowMerger.create(
282-
tableInfo.getTableConfig(), tableInfo.getTableConfig().getKvFormat());
281+
TableConfig tableConfig = tableInfo.getTableConfig();
282+
RowMerger rowMerger = RowMerger.create(tableConfig, tableConfig.getKvFormat());
283283
KvTablet kvTablet =
284284
KvTablet.create(
285285
physicalTablePath,
@@ -290,10 +290,11 @@ public KvTablet loadKv(File tabletDir, SchemaGetter schemaGetter) throws Excepti
290290
serverMetricGroup,
291291
arrowBufferAllocator,
292292
memorySegmentPool,
293-
tableInfo.getTableConfig().getKvFormat(),
293+
tableConfig.getKvFormat(),
294294
rowMerger,
295-
tableInfo.getTableConfig().getArrowCompressionInfo(),
296-
schemaGetter);
295+
tableConfig.getArrowCompressionInfo(),
296+
schemaGetter,
297+
tableConfig.isChangelogIgnoreUpdateBefore());
297298
if (this.currentKvs.containsKey(tableBucket)) {
298299
throw new IllegalStateException(
299300
String.format(

fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java

Lines changed: 89 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.apache.fluss.server.kv.rocksdb.RocksDBKv;
5050
import org.apache.fluss.server.kv.rocksdb.RocksDBKvBuilder;
5151
import org.apache.fluss.server.kv.rocksdb.RocksDBResourceContainer;
52+
import org.apache.fluss.server.kv.rowmerger.DefaultRowMerger;
5253
import org.apache.fluss.server.kv.rowmerger.RowMerger;
5354
import org.apache.fluss.server.kv.snapshot.KvFileHandleAndLocalPath;
5455
import org.apache.fluss.server.kv.snapshot.KvSnapshotDataUploader;
@@ -113,6 +114,9 @@ public final class KvTablet {
113114

114115
private final SchemaGetter schemaGetter;
115116

117+
// whether to ignore UPDATE_BEFORE records in changelog
118+
private final boolean ignoreUpdateBefore;
119+
116120
/**
117121
* The kv data in pre-write buffer whose log offset is less than the flushedLogOffset has been
118122
* flushed into kv.
@@ -136,7 +140,8 @@ private KvTablet(
136140
KvFormat kvFormat,
137141
RowMerger rowMerger,
138142
ArrowCompressionInfo arrowCompressionInfo,
139-
SchemaGetter schemaGetter) {
143+
SchemaGetter schemaGetter,
144+
boolean ignoreUpdateBefore) {
140145
this.physicalPath = physicalPath;
141146
this.tableBucket = tableBucket;
142147
this.logTablet = logTablet;
@@ -151,6 +156,7 @@ private KvTablet(
151156
this.rowMerger = rowMerger;
152157
this.arrowCompressionInfo = arrowCompressionInfo;
153158
this.schemaGetter = schemaGetter;
159+
this.ignoreUpdateBefore = ignoreUpdateBefore;
154160
}
155161

156162
public static KvTablet create(
@@ -163,7 +169,8 @@ public static KvTablet create(
163169
KvFormat kvFormat,
164170
RowMerger rowMerger,
165171
ArrowCompressionInfo arrowCompressionInfo,
166-
SchemaGetter schemaGetter)
172+
SchemaGetter schemaGetter,
173+
boolean ignoreUpdateBefore)
167174
throws IOException {
168175
Tuple2<PhysicalTablePath, TableBucket> tablePathAndBucket =
169176
FlussPaths.parseTabletDir(kvTabletDir);
@@ -179,7 +186,8 @@ public static KvTablet create(
179186
kvFormat,
180187
rowMerger,
181188
arrowCompressionInfo,
182-
schemaGetter);
189+
schemaGetter,
190+
ignoreUpdateBefore);
183191
}
184192

185193
public static KvTablet create(
@@ -194,7 +202,8 @@ public static KvTablet create(
194202
KvFormat kvFormat,
195203
RowMerger rowMerger,
196204
ArrowCompressionInfo arrowCompressionInfo,
197-
SchemaGetter schemaGetter)
205+
SchemaGetter schemaGetter,
206+
boolean ignoreUpdateBefore)
198207
throws IOException {
199208
RocksDBKv kv = buildRocksDBKv(serverConf, kvTabletDir);
200209
return new KvTablet(
@@ -211,7 +220,8 @@ public static KvTablet create(
211220
kvFormat,
212221
rowMerger,
213222
arrowCompressionInfo,
214-
schemaGetter);
223+
schemaGetter,
224+
ignoreUpdateBefore);
215225
}
216226

217227
private static RocksDBKv buildRocksDBKv(Configuration configuration, File kvDir)
@@ -345,52 +355,89 @@ public LogAppendInfo putAsLeader(KvRecordBatch kvRecords, @Nullable int[] target
345355
latestSchemaRow.replaceRow(oldValue.row));
346356
kvPreWriteBuffer.delete(key, logOffset++);
347357
} else {
348-
// otherwise, it's a partial update, should produce -U,+U
349-
walBuilder.append(
350-
ChangeType.UPDATE_BEFORE,
351-
latestSchemaRow.replaceRow(oldValue.row));
352-
walBuilder.append(
353-
ChangeType.UPDATE_AFTER,
354-
latestSchemaRow.replaceRow(newValue.row));
355-
kvPreWriteBuffer.put(
356-
key, newValue.encodeValue(), logOffset + 1);
357-
logOffset += 2;
358+
// otherwise, it's a partial update
359+
if (ignoreUpdateBefore) {
360+
// only produce +U
361+
walBuilder.append(
362+
ChangeType.UPDATE_AFTER,
363+
latestSchemaRow.replaceRow(newValue.row));
364+
kvPreWriteBuffer.put(
365+
key, newValue.encodeValue(), logOffset);
366+
logOffset++;
367+
} else {
368+
// produce -U, +U
369+
walBuilder.append(
370+
ChangeType.UPDATE_BEFORE,
371+
latestSchemaRow.replaceRow(oldValue.row));
372+
walBuilder.append(
373+
ChangeType.UPDATE_AFTER,
374+
latestSchemaRow.replaceRow(newValue.row));
375+
kvPreWriteBuffer.put(
376+
key, newValue.encodeValue(), logOffset + 1);
377+
logOffset += 2;
378+
}
358379
}
359380
}
360381
} else {
361382
// upsert operation
362-
byte[] oldValueBytes = getFromBufferOrKv(key);
363-
// it's update
364-
if (oldValueBytes != null) {
365-
BinaryValue oldValue = valueDecoder.decodeValue(oldValueBytes);
366-
BinaryValue newValue =
367-
currentMerger.merge(oldValue, currentValue);
368-
if (newValue == oldValue) {
369-
// newValue is the same to oldValue, means nothing
370-
// happens (no update/delete), and input should be ignored
371-
continue;
372-
}
373-
374-
walBuilder.append(
375-
ChangeType.UPDATE_BEFORE,
376-
latestSchemaRow.replaceRow(oldValue.row));
383+
// Optimization: when ignoring UPDATE_BEFORE and merger is
384+
// DefaultRowMerger (full update, not partial update), we can skip
385+
// fetching old value for better performance since it always returns
386+
// new value.
387+
if (ignoreUpdateBefore
388+
&& currentMerger instanceof DefaultRowMerger) {
389+
// Fast path: directly produce +U without fetching old value
377390
walBuilder.append(
378391
ChangeType.UPDATE_AFTER,
379-
latestSchemaRow.replaceRow(newValue.row));
380-
// logOffset is for -U, logOffset + 1 is for +U, we need to use
381-
// the log offset for +U
382-
kvPreWriteBuffer.put(
383-
key, newValue.encodeValue(), logOffset + 1);
384-
logOffset += 2;
385-
} else {
386-
// it's insert
387-
// TODO: we should add guarantees that all non-specified columns
388-
// of the input row are set to null.
389-
walBuilder.append(
390-
ChangeType.INSERT,
391392
latestSchemaRow.replaceRow(currentValue.row));
392393
kvPreWriteBuffer.put(
393394
key, currentValue.encodeValue(), logOffset++);
395+
} else {
396+
byte[] oldValueBytes = getFromBufferOrKv(key);
397+
// it's update
398+
if (oldValueBytes != null) {
399+
BinaryValue oldValue =
400+
valueDecoder.decodeValue(oldValueBytes);
401+
BinaryValue newValue =
402+
currentMerger.merge(oldValue, currentValue);
403+
if (newValue == oldValue) {
404+
// newValue is the same to oldValue, means nothing
405+
// happens (no update/delete), and input should be
406+
// ignored
407+
continue;
408+
}
409+
410+
if (ignoreUpdateBefore) {
411+
// only produce +U when ignoring UPDATE_BEFORE
412+
walBuilder.append(
413+
ChangeType.UPDATE_AFTER,
414+
latestSchemaRow.replaceRow(newValue.row));
415+
kvPreWriteBuffer.put(
416+
key, newValue.encodeValue(), logOffset);
417+
logOffset++;
418+
} else {
419+
walBuilder.append(
420+
ChangeType.UPDATE_BEFORE,
421+
latestSchemaRow.replaceRow(oldValue.row));
422+
walBuilder.append(
423+
ChangeType.UPDATE_AFTER,
424+
latestSchemaRow.replaceRow(newValue.row));
425+
// logOffset is for -U, logOffset + 1 is for +U, we need
426+
// to use the log offset for +U
427+
kvPreWriteBuffer.put(
428+
key, newValue.encodeValue(), logOffset + 1);
429+
logOffset += 2;
430+
}
431+
} else {
432+
// it's insert
433+
// TODO: we should add guarantees that all non-specified
434+
// columns of the input row are set to null.
435+
walBuilder.append(
436+
ChangeType.INSERT,
437+
latestSchemaRow.replaceRow(currentValue.row));
438+
kvPreWriteBuffer.put(
439+
key, currentValue.encodeValue(), logOffset++);
440+
}
394441
}
395442
}
396443
}

0 commit comments

Comments
 (0)