Skip to content

Commit d0a9513

Browse files
refactor
1 parent 0844d64 commit d0a9513

File tree

8 files changed

+283
-185
lines changed

8 files changed

+283
-185
lines changed

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.fluss.annotation.Internal;
2121
import org.apache.fluss.annotation.PublicEvolving;
2222
import org.apache.fluss.compression.ArrowCompressionType;
23+
import org.apache.fluss.metadata.ChangelogImage;
2324
import org.apache.fluss.metadata.DataLakeFormat;
2425
import org.apache.fluss.metadata.DeleteBehavior;
2526
import org.apache.fluss.metadata.KvFormat;
@@ -1423,15 +1424,17 @@ public class ConfigOptions {
14231424
+ "The auto increment column can only be used in primary-key table. The data type of the auto increment column must be INT or BIGINT."
14241425
+ "Currently a table can have only one auto-increment column.");
14251426

1426-
public static final ConfigOption<Boolean> TABLE_CHANGELOG_IGNORE_UPDATE_BEFORE =
1427-
key("table.changelog.ignore-update-before")
1428-
.booleanType()
1429-
.defaultValue(false)
1430-
.withDescription(
1431-
"Whether to ignore UPDATE_BEFORE records in changelog for the primary key table. "
1432-
+ "When disabled (default), update operations produce both UPDATE_BEFORE and UPDATE_AFTER records. "
1433-
+ "When enabled, update operations only produce UPDATE_AFTER records, "
1434-
+ "which reduces storage and transmission costs but loses the ability to track previous values. "
1427+
public static final ConfigOption<ChangelogImage> TABLE_CHANGELOG_IMAGE =
1428+
key("table.changelog.image")
1429+
.enumType(ChangelogImage.class)
1430+
.defaultValue(ChangelogImage.FULL)
1431+
.withDescription(
1432+
"Defines the changelog image mode for the primary key table. "
1433+
+ "This configuration is inspired by similar settings in database systems like MySQL's binlog_row_image and PostgreSQL's replica identity. "
1434+
+ "The supported modes are `FULL` (default) and `WAL`. "
1435+
+ "The `FULL` mode produces both UPDATE_BEFORE and UPDATE_AFTER records for update operations, capturing complete information about updates and allowing tracking of previous values. "
1436+
+ "The `WAL` mode emits only UPDATE_AFTER records for both INSERT and UPDATE operations, without UPDATE_BEFORE records. This is similar to database WAL (Write-Ahead Log) behavior. "
1437+
+ "DELETE records are emitted if allowed. This mode reduces storage and transmission costs but loses the ability to distinguish between inserts and updates, and to track previous values. "
14351438
+ "This option only affects primary key tables.");
14361439

14371440
// ------------------------------------------------------------------------

fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.fluss.annotation.PublicEvolving;
2121
import org.apache.fluss.compression.ArrowCompressionInfo;
22+
import org.apache.fluss.metadata.ChangelogImage;
2223
import org.apache.fluss.metadata.DataLakeFormat;
2324
import org.apache.fluss.metadata.DeleteBehavior;
2425
import org.apache.fluss.metadata.KvFormat;
@@ -118,12 +119,11 @@ public Optional<DeleteBehavior> getDeleteBehavior() {
118119
}
119120

120121
/**
121-
* Whether to ignore UPDATE_BEFORE records in changelog for the primary key table. When false
122-
* (default), update operations produce both UPDATE_BEFORE and UPDATE_AFTER records. When true,
123-
* update operations only produce UPDATE_AFTER records.
122+
* Gets the changelog image mode of the table. The changelog image mode defines what information
123+
* is included in the changelog for update operations.
124124
*/
125-
public boolean isChangelogIgnoreUpdateBefore() {
126-
return config.get(ConfigOptions.TABLE_CHANGELOG_IGNORE_UPDATE_BEFORE);
125+
public ChangelogImage getChangelogImage() {
126+
return config.get(ConfigOptions.TABLE_CHANGELOG_IMAGE);
127127
}
128128

129129
/** Gets the Arrow compression type and compression level of the table. */
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.metadata;
19+
20+
/**
21+
* The changelog image mode for the primary key table.
22+
*
23+
* <p>This enum defines what information is included in the changelog for update operations. It is
24+
* inspired by similar configurations in database systems like MySQL's binlog_row_image and
25+
* PostgreSQL's replica identity.
26+
*
27+
* @since 0.9
28+
*/
29+
public enum ChangelogImage {
30+
31+
/**
32+
* Full changelog with both UPDATE_BEFORE and UPDATE_AFTER records. This is the default behavior
33+
* that captures complete information about updates, allowing tracking of previous values.
34+
*/
35+
FULL,
36+
37+
/**
38+
* WAL mode produces only UPDATE_AFTER records for both INSERT and UPDATE operations, without
39+
* UPDATE_BEFORE records. This mode is similar to database WAL (Write-Ahead Log) behavior, where
40+
* all inserts and updates are represented as UPDATE_AFTER events. DELETE records are emitted if
41+
* allowed. This mode reduces storage and transmission costs but loses the ability to
42+
* distinguish between inserts and updates, and to track previous values.
43+
*/
44+
WAL;
45+
46+
/** Creates a {@link ChangelogImage} from the given string. */
47+
public static ChangelogImage fromString(String image) {
48+
switch (image.toUpperCase().replace("-", "_")) {
49+
case "FULL":
50+
return FULL;
51+
case "WAL":
52+
return WAL;
53+
54+
default:
55+
throw new IllegalArgumentException("Unsupported changelog image: " + image);
56+
}
57+
}
58+
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.fluss.flink.utils.PushdownUtils.FieldEqual;
3232
import org.apache.fluss.lake.source.LakeSource;
3333
import org.apache.fluss.lake.source.LakeSplit;
34+
import org.apache.fluss.metadata.ChangelogImage;
3435
import org.apache.fluss.metadata.DeleteBehavior;
3536
import org.apache.fluss.metadata.MergeEngineType;
3637
import org.apache.fluss.metadata.TablePath;
@@ -209,28 +210,26 @@ public ChangelogMode getChangelogMode() {
209210
Configuration tableConf = Configuration.fromMap(tableOptions);
210211
DeleteBehavior deleteBehavior =
211212
tableConf.get(ConfigOptions.TABLE_DELETE_BEHAVIOR);
212-
boolean ignoreUpdateBefore =
213-
tableConf.get(ConfigOptions.TABLE_CHANGELOG_IGNORE_UPDATE_BEFORE);
214-
if (ignoreUpdateBefore) {
215-
// When ignoring UPDATE_BEFORE, only produce INSERT, UPDATE_AFTER (and
216-
// DELETE if allowed)
213+
ChangelogImage changelogImage =
214+
tableConf.get(ConfigOptions.TABLE_CHANGELOG_IMAGE);
215+
if (changelogImage == ChangelogImage.WAL) {
216+
// When using WAL mode, only produce UPDATE_AFTER (and DELETE if allowed)
217+
// Note: INSERT operations are also converted to UPDATE_AFTER in WAL mode
217218
if (deleteBehavior == DeleteBehavior.ALLOW) {
218219
// DELETE is still produced when delete behavior is allowed
219220
return ChangelogMode.newBuilder()
220-
.addContainedKind(RowKind.INSERT)
221221
.addContainedKind(RowKind.UPDATE_AFTER)
222222
.addContainedKind(RowKind.DELETE)
223223
.build();
224224
} else {
225225
// No DELETE when delete operations are ignored or disabled
226226
return ChangelogMode.newBuilder()
227-
.addContainedKind(RowKind.INSERT)
228227
.addContainedKind(RowKind.UPDATE_AFTER)
229228
.build();
230229
}
231230
}
232231

233-
// Not ignoring UPDATE_BEFORE, produce full changelog
232+
// Using FULL mode, produce full changelog
234233
if (deleteBehavior == DeleteBehavior.ALLOW) {
235234
return ChangelogMode.all();
236235
} else {

fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ public KvTablet getOrCreateKv(
186186
merger,
187187
arrowCompressionInfo,
188188
schemaGetter,
189-
tableConfig.isChangelogIgnoreUpdateBefore());
189+
tableConfig.getChangelogImage());
190190
currentKvs.put(tableBucket, tablet);
191191

192192
LOG.info(
@@ -294,7 +294,7 @@ public KvTablet loadKv(File tabletDir, SchemaGetter schemaGetter) throws Excepti
294294
rowMerger,
295295
tableConfig.getArrowCompressionInfo(),
296296
schemaGetter,
297-
tableConfig.isChangelogIgnoreUpdateBefore());
297+
tableConfig.getChangelogImage());
298298
if (this.currentKvs.containsKey(tableBucket)) {
299299
throw new IllegalStateException(
300300
String.format(

0 commit comments

Comments
 (0)