Skip to content

Commit e5d10ac

Browse files
[kv] Support "table.changelog.image" configuration and WAL image mode for Primary Key Tables (apache#2105)
Co-authored-by: Jark Wu <[email protected]>
1 parent 6dd9776 commit e5d10ac

File tree

9 files changed

+518
-115
lines changed

9 files changed

+518
-115
lines changed

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.fluss.annotation.Internal;
2121
import org.apache.fluss.annotation.PublicEvolving;
2222
import org.apache.fluss.compression.ArrowCompressionType;
23+
import org.apache.fluss.metadata.ChangelogImage;
2324
import org.apache.fluss.metadata.DataLakeFormat;
2425
import org.apache.fluss.metadata.DeleteBehavior;
2526
import org.apache.fluss.metadata.KvFormat;
@@ -1446,6 +1447,21 @@ public class ConfigOptions {
14461447
+ "The auto increment column can only be used in primary-key table. The data type of the auto increment column must be INT or BIGINT."
14471448
+ "Currently a table can have only one auto-increment column.");
14481449

1450+
public static final ConfigOption<ChangelogImage> TABLE_CHANGELOG_IMAGE =
1451+
key("table.changelog.image")
1452+
.enumType(ChangelogImage.class)
1453+
.defaultValue(ChangelogImage.FULL)
1454+
.withDescription(
1455+
"Defines the changelog image mode for the primary key table. "
1456+
+ "This configuration is inspired by similar settings in database systems like MySQL's binlog_row_image and PostgreSQL's replica identity. "
1457+
+ "The supported modes are `FULL` (default) and `WAL`. "
1458+
+ "The `FULL` mode produces both UPDATE_BEFORE and UPDATE_AFTER records for update operations, capturing complete information about updates and allowing tracking of previous values. "
1459+
+ "The `WAL` mode does not produce UPDATE_BEFORE records. Only INSERT, UPDATE_AFTER (and DELETE if allowed) records are emitted. "
1460+
+ "When WAL mode is enabled with default merge engine (no merge engine configured) and full row updates (not partial update), an optimization is applied to skip looking up old values, "
1461+
+ "and in this case INSERT operations are converted to UPDATE_AFTER events. "
1462+
+ "This mode reduces storage and transmission costs but loses the ability to track previous values. "
1463+
+ "This option only affects primary key tables.");
1464+
14491465
// ------------------------------------------------------------------------
14501466
// ConfigOptions for Kv
14511467
// ------------------------------------------------------------------------

fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.fluss.annotation.PublicEvolving;
2121
import org.apache.fluss.compression.ArrowCompressionInfo;
22+
import org.apache.fluss.metadata.ChangelogImage;
2223
import org.apache.fluss.metadata.DataLakeFormat;
2324
import org.apache.fluss.metadata.DeleteBehavior;
2425
import org.apache.fluss.metadata.KvFormat;
@@ -117,6 +118,14 @@ public Optional<DeleteBehavior> getDeleteBehavior() {
117118
return config.getOptional(ConfigOptions.TABLE_DELETE_BEHAVIOR);
118119
}
119120

121+
/**
122+
* Gets the changelog image mode of the table. The changelog image mode defines what information
123+
* is included in the changelog for update operations.
124+
*/
125+
public ChangelogImage getChangelogImage() {
126+
return config.get(ConfigOptions.TABLE_CHANGELOG_IMAGE);
127+
}
128+
120129
/** Gets the Arrow compression type and compression level of the table. */
121130
public ArrowCompressionInfo getArrowCompressionInfo() {
122131
return ArrowCompressionInfo.fromConf(config);
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.metadata;
19+
20+
/**
21+
* The changelog image mode for the primary key table.
22+
*
23+
* <p>This enum defines what information is included in the changelog for update operations. It is
24+
* inspired by similar configurations in database systems like MySQL's binlog_row_image and
25+
* PostgreSQL's replica identity.
26+
*
27+
* @since 0.9
28+
*/
29+
public enum ChangelogImage {
30+
31+
/**
32+
* Full changelog with both UPDATE_BEFORE and UPDATE_AFTER records. This is the default behavior
33+
* that captures complete information about updates, allowing tracking of previous values.
34+
*/
35+
FULL,
36+
37+
/**
38+
* WAL mode does not produce UPDATE_BEFORE records. Only INSERT, UPDATE_AFTER (and DELETE if
39+
* allowed) records are emitted. When WAL mode is enabled with default merge engine (no merge
40+
* engine configured) and full row updates (not partial update), an optimization is applied to
41+
* skip looking up old values, and in this case INSERT operations are converted to UPDATE_AFTER
42+
* events, similar to database WAL (Write-Ahead Log) behavior. This mode reduces storage and
43+
* transmission costs but loses the ability to track previous values.
44+
*/
45+
WAL;
46+
47+
/** Creates a {@link ChangelogImage} from the given string. */
48+
public static ChangelogImage fromString(String image) {
49+
switch (image.toUpperCase()) {
50+
case "FULL":
51+
return FULL;
52+
case "WAL":
53+
return WAL;
54+
55+
default:
56+
throw new IllegalArgumentException("Unsupported changelog image: " + image);
57+
}
58+
}
59+
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.fluss.flink.utils.PushdownUtils.FieldEqual;
3232
import org.apache.fluss.lake.source.LakeSource;
3333
import org.apache.fluss.lake.source.LakeSplit;
34+
import org.apache.fluss.metadata.ChangelogImage;
3435
import org.apache.fluss.metadata.DeleteBehavior;
3536
import org.apache.fluss.metadata.MergeEngineType;
3637
import org.apache.fluss.metadata.TablePath;
@@ -206,10 +207,32 @@ public ChangelogMode getChangelogMode() {
206207
if (mergeEngineType == MergeEngineType.FIRST_ROW) {
207208
return ChangelogMode.insertOnly();
208209
} else {
209-
// Check delete behavior configuration
210210
Configuration tableConf = Configuration.fromMap(tableOptions);
211211
DeleteBehavior deleteBehavior =
212212
tableConf.get(ConfigOptions.TABLE_DELETE_BEHAVIOR);
213+
ChangelogImage changelogImage =
214+
tableConf.get(ConfigOptions.TABLE_CHANGELOG_IMAGE);
215+
if (changelogImage == ChangelogImage.WAL) {
216+
// When using WAL mode, produce INSERT and UPDATE_AFTER (and DELETE if
217+
// allowed), without UPDATE_BEFORE. Note: with default merge engine and full
218+
// row updates, an optimization converts INSERT to UPDATE_AFTER.
219+
if (deleteBehavior == DeleteBehavior.ALLOW) {
220+
// DELETE is still produced when delete behavior is allowed
221+
return ChangelogMode.newBuilder()
222+
.addContainedKind(RowKind.INSERT)
223+
.addContainedKind(RowKind.UPDATE_AFTER)
224+
.addContainedKind(RowKind.DELETE)
225+
.build();
226+
} else {
227+
// No DELETE when delete operations are ignored or disabled
228+
return ChangelogMode.newBuilder()
229+
.addContainedKind(RowKind.INSERT)
230+
.addContainedKind(RowKind.UPDATE_AFTER)
231+
.build();
232+
}
233+
}
234+
235+
// Using FULL mode, produce full changelog
213236
if (deleteBehavior == DeleteBehavior.ALLOW) {
214237
return ChangelogMode.all();
215238
} else {

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1384,4 +1384,74 @@ void testDeleteBehaviorForInsertStmt(String deleteBehavior) throws Exception {
13841384
assertResultsIgnoreOrder(rowIter, expectedRows, true);
13851385
}
13861386
}
1387+
1388+
@Test
1389+
void testWalModeWithDefaultMergeEngineAndAggregation() throws Exception {
1390+
String tableName = "wal_mode_pk_table";
1391+
// Create a table with WAL mode and default merge engine
1392+
tEnv.executeSql(
1393+
String.format(
1394+
"create table %s ("
1395+
+ " id int not null,"
1396+
+ " category string,"
1397+
+ " amount bigint,"
1398+
+ " primary key (id) not enforced"
1399+
+ ") with ('table.changelog.image' = 'wal')",
1400+
tableName));
1401+
1402+
// Insert initial data
1403+
tEnv.executeSql(
1404+
String.format(
1405+
"INSERT INTO %s VALUES "
1406+
+ "(1, 'A', 100), "
1407+
+ "(2, 'B', 200), "
1408+
+ "(3, 'A', 150), "
1409+
+ "(4, 'B', 250)",
1410+
tableName))
1411+
.await();
1412+
1413+
// Use batch mode to update and delete records
1414+
tBatchEnv.executeSql("UPDATE " + tableName + " SET amount = 120 WHERE id = 1").await();
1415+
tBatchEnv.executeSql("UPDATE " + tableName + " SET amount = 180 WHERE id = 3").await();
1416+
tBatchEnv.executeSql("DELETE FROM " + tableName + " WHERE id = 4").await();
1417+
1418+
// Do aggregation on the table and verify ChangelogNormalize node is generated
1419+
String aggQuery =
1420+
String.format(
1421+
"SELECT category, SUM(amount) as total_amount FROM %s /*+ OPTIONS('scan.startup.mode' = 'earliest') */ GROUP BY category",
1422+
tableName);
1423+
1424+
// Explain the aggregation query to check for ChangelogNormalize
1425+
String aggPlan = tEnv.explainSql(aggQuery);
1426+
// ChangelogNormalize should be present to normalize the changelog for aggregation
1427+
// In Flink, when the source produces changelog with primary key semantics (I, UA, D),
1428+
// a ChangelogNormalize operator is inserted before aggregation
1429+
assertThat(aggPlan).contains("ChangelogNormalize");
1430+
1431+
// Execute the aggregation and verify the result
1432+
CloseableIterator<Row> aggIter = tEnv.executeSql(aggQuery).collect();
1433+
1434+
// Expected aggregation results:
1435+
// Category A: 120 (id=1) + 180 (id=3) = 300
1436+
// Category B: 200 (id=2) = 200 (id=4 was deleted)
1437+
List<String> expectedAggResults =
1438+
Arrays.asList(
1439+
"+I[A, 100]",
1440+
"-U[A, 100]",
1441+
"+U[A, 250]",
1442+
"-U[A, 250]",
1443+
"+U[A, 150]",
1444+
"-U[A, 150]",
1445+
"+U[A, 270]",
1446+
"-U[A, 270]",
1447+
"+U[A, 120]",
1448+
"-U[A, 120]",
1449+
"+U[A, 300]",
1450+
"+I[B, 250]",
1451+
"-D[B, 250]",
1452+
"+I[B, 200]");
1453+
1454+
// Collect results with timeout
1455+
assertResultsIgnoreOrder(aggIter, expectedAggResults, true);
1456+
}
13871457
}

fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,8 @@ public KvTablet getOrCreateKv(
185185
kvFormat,
186186
merger,
187187
arrowCompressionInfo,
188-
schemaGetter);
188+
schemaGetter,
189+
tableConfig.getChangelogImage());
189190
currentKvs.put(tableBucket, tablet);
190191

191192
LOG.info(
@@ -277,9 +278,8 @@ public KvTablet loadKv(File tabletDir, SchemaGetter schemaGetter) throws Excepti
277278
TablePath tablePath = physicalTablePath.getTablePath();
278279
TableInfo tableInfo = getTableInfo(zkClient, tablePath);
279280

280-
RowMerger rowMerger =
281-
RowMerger.create(
282-
tableInfo.getTableConfig(), tableInfo.getTableConfig().getKvFormat());
281+
TableConfig tableConfig = tableInfo.getTableConfig();
282+
RowMerger rowMerger = RowMerger.create(tableConfig, tableConfig.getKvFormat());
283283
KvTablet kvTablet =
284284
KvTablet.create(
285285
physicalTablePath,
@@ -290,10 +290,11 @@ public KvTablet loadKv(File tabletDir, SchemaGetter schemaGetter) throws Excepti
290290
serverMetricGroup,
291291
arrowBufferAllocator,
292292
memorySegmentPool,
293-
tableInfo.getTableConfig().getKvFormat(),
293+
tableConfig.getKvFormat(),
294294
rowMerger,
295-
tableInfo.getTableConfig().getArrowCompressionInfo(),
296-
schemaGetter);
295+
tableConfig.getArrowCompressionInfo(),
296+
schemaGetter,
297+
tableConfig.getChangelogImage());
297298
if (this.currentKvs.containsKey(tableBucket)) {
298299
throw new IllegalStateException(
299300
String.format(

0 commit comments

Comments
 (0)