Skip to content

Commit c8bb2d4

Browse files
fix unit test and refine document
1 parent d0a9513 commit c8bb2d4

File tree

6 files changed

+29
-23
lines changed

6 files changed

+29
-23
lines changed

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1433,8 +1433,10 @@ public class ConfigOptions {
14331433
+ "This configuration is inspired by similar settings in database systems like MySQL's binlog_row_image and PostgreSQL's replica identity. "
14341434
+ "The supported modes are `FULL` (default) and `WAL`. "
14351435
+ "The `FULL` mode produces both UPDATE_BEFORE and UPDATE_AFTER records for update operations, capturing complete information about updates and allowing tracking of previous values. "
1436-
+ "The `WAL` mode emits only UPDATE_AFTER records for both INSERT and UPDATE operations, without UPDATE_BEFORE records. This is similar to database WAL (Write-Ahead Log) behavior. "
1437-
+ "DELETE records are emitted if allowed. This mode reduces storage and transmission costs but loses the ability to distinguish between inserts and updates, and to track previous values. "
1436+
+ "The `WAL` mode does not produce UPDATE_BEFORE records. Only INSERT, UPDATE_AFTER (and DELETE if allowed) records are emitted. "
1437+
+ "When WAL mode is enabled with default merge engine (no merge engine configured) and full row updates (not partial update), an optimization is applied to skip looking up old values, "
1438+
+ "and in this case INSERT operations are converted to UPDATE_AFTER events. "
1439+
+ "This mode reduces storage and transmission costs but loses the ability to track previous values. "
14381440
+ "This option only affects primary key tables.");
14391441

14401442
// ------------------------------------------------------------------------

fluss-common/src/main/java/org/apache/fluss/metadata/ChangelogImage.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,12 @@ public enum ChangelogImage {
3535
FULL,
3636

3737
/**
38-
* WAL mode produces only UPDATE_AFTER records for both INSERT and UPDATE operations, without
39-
* UPDATE_BEFORE records. This mode is similar to database WAL (Write-Ahead Log) behavior, where
40-
* all inserts and updates are represented as UPDATE_AFTER events. DELETE records are emitted if
41-
* allowed. This mode reduces storage and transmission costs but loses the ability to
42-
* distinguish between inserts and updates, and to track previous values.
38+
* WAL mode does not produce UPDATE_BEFORE records. Only INSERT, UPDATE_AFTER (and DELETE if
39+
* allowed) records are emitted. When WAL mode is enabled with default merge engine (no merge
40+
* engine configured) and full row updates (not partial update), an optimization is applied to
41+
* skip looking up old values, and in this case INSERT operations are converted to UPDATE_AFTER
42+
* events, similar to database WAL (Write-Ahead Log) behavior. This mode reduces storage and
43+
* transmission costs but loses the ability to track previous values.
4344
*/
4445
WAL;
4546

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -213,17 +213,20 @@ public ChangelogMode getChangelogMode() {
213213
ChangelogImage changelogImage =
214214
tableConf.get(ConfigOptions.TABLE_CHANGELOG_IMAGE);
215215
if (changelogImage == ChangelogImage.WAL) {
216-
// When using WAL mode, only produce UPDATE_AFTER (and DELETE if allowed)
217-
// Note: INSERT operations are also converted to UPDATE_AFTER in WAL mode
216+
// When using WAL mode, produce INSERT and UPDATE_AFTER (and DELETE if
217+
// allowed), without UPDATE_BEFORE. Note: with default merge engine and full
218+
// row updates, an optimization converts INSERT to UPDATE_AFTER.
218219
if (deleteBehavior == DeleteBehavior.ALLOW) {
219220
// DELETE is still produced when delete behavior is allowed
220221
return ChangelogMode.newBuilder()
222+
.addContainedKind(RowKind.INSERT)
221223
.addContainedKind(RowKind.UPDATE_AFTER)
222224
.addContainedKind(RowKind.DELETE)
223225
.build();
224226
} else {
225227
// No DELETE when delete operations are ignored or disabled
226228
return ChangelogMode.newBuilder()
229+
.addContainedKind(RowKind.INSERT)
227230
.addContainedKind(RowKind.UPDATE_AFTER)
228231
.build();
229232
}

fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -444,10 +444,10 @@ private long processUpsert(
444444
PaddingRow latestSchemaRow,
445445
long logOffset)
446446
throws Exception {
447-
// Optimization: when using NO_UPDATE_BEFORE mode and merger is
448-
// DefaultRowMerger (full update, not partial update), we can skip
449-
// fetching old value for better performance since it always returns
450-
// new value.
447+
// Optimization: when using WAL mode and merger is DefaultRowMerger (full update, not
448+
// partial update), we can skip fetching old value for better performance since it
449+
// always returns new value. In this case, both INSERT and UPDATE will produce
450+
// UPDATE_AFTER.
451451
if (changelogImage == ChangelogImage.WAL && currentMerger instanceof DefaultRowMerger) {
452452
return applyUpdate(key, null, currentValue, walBuilder, latestSchemaRow, logOffset);
453453
}
@@ -487,10 +487,7 @@ private long applyInsert(
487487
PaddingRow latestSchemaRow,
488488
long logOffset)
489489
throws Exception {
490-
// In WAL mode, INSERT operations are also emitted as UPDATE_AFTER
491-
ChangeType changeType =
492-
changelogImage == ChangelogImage.WAL ? ChangeType.UPDATE_AFTER : ChangeType.INSERT;
493-
walBuilder.append(changeType, latestSchemaRow.replaceRow(currentValue.row));
490+
walBuilder.append(ChangeType.INSERT, latestSchemaRow.replaceRow(currentValue.row));
494491
kvPreWriteBuffer.put(key, currentValue.encodeValue(), logOffset);
495492
return logOffset + 1;
496493
}

fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1045,8 +1045,9 @@ void testAppendDuplicatedKvBatch() throws Exception {
10451045
}
10461046

10471047
@Test
1048-
void testChangelogImageNoUpdateBefore() throws Exception {
1049-
// WAL mode - INSERT and UPDATE both produce UPDATE_AFTER, no UPDATE_BEFORE
1048+
void testWalModeChangelogImageNoUpdateBefore() throws Exception {
1049+
// WAL mode - no UPDATE_BEFORE. With default merge engine and full row update,
1050+
// optimization converts INSERT to UPDATE_AFTER
10501051
Map<String, String> config = new HashMap<>();
10511052
config.put("table.changelog.image", "WAL");
10521053
initLogTabletAndKvTablet(DATA1_SCHEMA_PK, config);
@@ -1061,7 +1062,8 @@ void testChangelogImageNoUpdateBefore() throws Exception {
10611062
kvTablet.putAsLeader(kvRecordBatch1, null);
10621063
long endOffset = logTablet.localLogEndOffset();
10631064

1064-
// Verify inserts produce +U (WAL mode converts INSERT to UPDATE_AFTER)
1065+
// Verify inserts produce +U (optimization in WAL mode with default merge engine and full
1066+
// row update)
10651067
LogRecords actualLogRecords = readLogRecords();
10661068
MemoryLogRecords expectedLogs =
10671069
logRecords(
@@ -1110,8 +1112,9 @@ void testChangelogImageNoUpdateBefore() throws Exception {
11101112
}
11111113

11121114
@Test
1113-
void testChangelogImageNoUpdateBeforeWithPartialUpdate() throws Exception {
1114-
// WAL mode with partial update - still need to fetch oldValue
1115+
void testWalModeChangelogImageNoUpdateBeforeWithPartialUpdate() throws Exception {
1116+
// WAL mode with partial update - INSERT produces INSERT, UPDATE produces UPDATE_AFTER
1117+
// only (no optimization applied)
11151118
Map<String, String> config = new HashMap<>();
11161119
config.put("table.changelog.image", "WAL");
11171120
initLogTabletAndKvTablet(DATA2_SCHEMA, config);

website/docs/engine-flink/options.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ See more details about [ALTER TABLE ... SET](engine-flink/ddl.md#set-properties)
8585
| table.merge-engine | Enum | (None) | Defines the merge engine for the primary key table. By default, primary key table uses the [default merge engine(last_row)](table-design/table-types/pk-table/merge-engines/default.md). It also supports two merge engines are `first_row` and `versioned`. The [first_row merge engine](table-design/table-types/pk-table/merge-engines/first-row.md) will keep the first row of the same primary key. The [versioned merge engine](table-design/table-types/pk-table/merge-engines/versioned.md) will keep the row with the largest version of the same primary key. |
8686
| table.merge-engine.versioned.ver-column | String | (None) | The column name of the version column for the `versioned` merge engine. If the merge engine is set to `versioned`, the version column must be set. |
8787
| table.delete.behavior | Enum | ALLOW | Controls the behavior of delete operations on primary key tables. Three modes are supported: `ALLOW` (default) - allows normal delete operations; `IGNORE` - silently ignores delete requests without errors; `DISABLE` - rejects delete requests and throws explicit errors. This configuration provides system-level guarantees for some downstream pipelines (e.g., Flink Delta Join) that must not receive any delete events in the changelog of the table. For tables with `first_row` or `versioned` merge engines, this option is automatically set to `IGNORE` and cannot be overridden. Only applicable to primary key tables. |
88-
| table.changelog.image | Enum | FULL | Defines the changelog image mode for primary key tables. This configuration is inspired by similar settings in database systems like MySQL's `binlog_row_image` and PostgreSQL's `replica identity`. Two modes are supported: `FULL` (default) - produces both UPDATE_BEFORE and UPDATE_AFTER records for update operations, capturing complete information about updates and allowing tracking of previous values; `WAL` - emits only UPDATE_AFTER records for both INSERT and UPDATE operations, without UPDATE_BEFORE records. This is similar to database WAL (Write-Ahead Log) behavior. DELETE records are emitted if allowed. This mode reduces storage and transmission costs but loses the ability to distinguish between inserts and updates, and to track previous values. Only applicable to primary key tables. |
88+
| table.changelog.image | Enum | FULL | Defines the changelog image mode for primary key tables. This configuration is inspired by similar settings in database systems like MySQL's `binlog_row_image` and PostgreSQL's `replica identity`. Two modes are supported: `FULL` (default) - produces both UPDATE_BEFORE and UPDATE_AFTER records for update operations, capturing complete information about updates and allowing tracking of previous values; `WAL` - does not produce UPDATE_BEFORE records. Only INSERT, UPDATE_AFTER (and DELETE if allowed) records are emitted. When WAL mode is enabled with default merge engine (no merge engine configured) and full row updates (not partial update), an optimization is applied to skip looking up old values, and in this case INSERT operations are converted to UPDATE_AFTER events. This mode reduces storage and transmission costs but loses the ability to track previous values. Only applicable to primary key tables. |
8989

9090

9191
## Read Options

0 commit comments

Comments
 (0)