Skip to content

Commit 44fd0f2

Browse files
Revert "[kv] Support "table.changelog.image" configuration and WAL image mode for Primary Key Tables (apache#2105)"
This reverts commit e5d10ac.
1 parent e5d10ac commit 44fd0f2

File tree

9 files changed

+115
-518
lines changed

9 files changed

+115
-518
lines changed

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.apache.fluss.annotation.Internal;
2121
import org.apache.fluss.annotation.PublicEvolving;
2222
import org.apache.fluss.compression.ArrowCompressionType;
23-
import org.apache.fluss.metadata.ChangelogImage;
2423
import org.apache.fluss.metadata.DataLakeFormat;
2524
import org.apache.fluss.metadata.DeleteBehavior;
2625
import org.apache.fluss.metadata.KvFormat;
@@ -1447,21 +1446,6 @@ public class ConfigOptions {
14471446
+ "The auto increment column can only be used in primary-key table. The data type of the auto increment column must be INT or BIGINT."
14481447
+ "Currently a table can have only one auto-increment column.");
14491448

1450-
public static final ConfigOption<ChangelogImage> TABLE_CHANGELOG_IMAGE =
1451-
key("table.changelog.image")
1452-
.enumType(ChangelogImage.class)
1453-
.defaultValue(ChangelogImage.FULL)
1454-
.withDescription(
1455-
"Defines the changelog image mode for the primary key table. "
1456-
+ "This configuration is inspired by similar settings in database systems like MySQL's binlog_row_image and PostgreSQL's replica identity. "
1457-
+ "The supported modes are `FULL` (default) and `WAL`. "
1458-
+ "The `FULL` mode produces both UPDATE_BEFORE and UPDATE_AFTER records for update operations, capturing complete information about updates and allowing tracking of previous values. "
1459-
+ "The `WAL` mode does not produce UPDATE_BEFORE records. Only INSERT, UPDATE_AFTER (and DELETE if allowed) records are emitted. "
1460-
+ "When WAL mode is enabled with default merge engine (no merge engine configured) and full row updates (not partial update), an optimization is applied to skip looking up old values, "
1461-
+ "and in this case INSERT operations are converted to UPDATE_AFTER events. "
1462-
+ "This mode reduces storage and transmission costs but loses the ability to track previous values. "
1463-
+ "This option only affects primary key tables.");
1464-
14651449
// ------------------------------------------------------------------------
14661450
// ConfigOptions for Kv
14671451
// ------------------------------------------------------------------------

fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import org.apache.fluss.annotation.PublicEvolving;
2121
import org.apache.fluss.compression.ArrowCompressionInfo;
22-
import org.apache.fluss.metadata.ChangelogImage;
2322
import org.apache.fluss.metadata.DataLakeFormat;
2423
import org.apache.fluss.metadata.DeleteBehavior;
2524
import org.apache.fluss.metadata.KvFormat;
@@ -118,14 +117,6 @@ public Optional<DeleteBehavior> getDeleteBehavior() {
118117
return config.getOptional(ConfigOptions.TABLE_DELETE_BEHAVIOR);
119118
}
120119

121-
/**
122-
* Gets the changelog image mode of the table. The changelog image mode defines what information
123-
* is included in the changelog for update operations.
124-
*/
125-
public ChangelogImage getChangelogImage() {
126-
return config.get(ConfigOptions.TABLE_CHANGELOG_IMAGE);
127-
}
128-
129120
/** Gets the Arrow compression type and compression level of the table. */
130121
public ArrowCompressionInfo getArrowCompressionInfo() {
131122
return ArrowCompressionInfo.fromConf(config);

fluss-common/src/main/java/org/apache/fluss/metadata/ChangelogImage.java

Lines changed: 0 additions & 59 deletions
This file was deleted.

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java

Lines changed: 1 addition & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.apache.fluss.flink.utils.PushdownUtils.FieldEqual;
3232
import org.apache.fluss.lake.source.LakeSource;
3333
import org.apache.fluss.lake.source.LakeSplit;
34-
import org.apache.fluss.metadata.ChangelogImage;
3534
import org.apache.fluss.metadata.DeleteBehavior;
3635
import org.apache.fluss.metadata.MergeEngineType;
3736
import org.apache.fluss.metadata.TablePath;
@@ -207,32 +206,10 @@ public ChangelogMode getChangelogMode() {
207206
if (mergeEngineType == MergeEngineType.FIRST_ROW) {
208207
return ChangelogMode.insertOnly();
209208
} else {
209+
// Check delete behavior configuration
210210
Configuration tableConf = Configuration.fromMap(tableOptions);
211211
DeleteBehavior deleteBehavior =
212212
tableConf.get(ConfigOptions.TABLE_DELETE_BEHAVIOR);
213-
ChangelogImage changelogImage =
214-
tableConf.get(ConfigOptions.TABLE_CHANGELOG_IMAGE);
215-
if (changelogImage == ChangelogImage.WAL) {
216-
// When using WAL mode, produce INSERT and UPDATE_AFTER (and DELETE if
217-
// allowed), without UPDATE_BEFORE. Note: with default merge engine and full
218-
// row updates, an optimization converts INSERT to UPDATE_AFTER.
219-
if (deleteBehavior == DeleteBehavior.ALLOW) {
220-
// DELETE is still produced when delete behavior is allowed
221-
return ChangelogMode.newBuilder()
222-
.addContainedKind(RowKind.INSERT)
223-
.addContainedKind(RowKind.UPDATE_AFTER)
224-
.addContainedKind(RowKind.DELETE)
225-
.build();
226-
} else {
227-
// No DELETE when delete operations are ignored or disabled
228-
return ChangelogMode.newBuilder()
229-
.addContainedKind(RowKind.INSERT)
230-
.addContainedKind(RowKind.UPDATE_AFTER)
231-
.build();
232-
}
233-
}
234-
235-
// Using FULL mode, produce full changelog
236213
if (deleteBehavior == DeleteBehavior.ALLOW) {
237214
return ChangelogMode.all();
238215
} else {

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java

Lines changed: 0 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -1384,74 +1384,4 @@ void testDeleteBehaviorForInsertStmt(String deleteBehavior) throws Exception {
13841384
assertResultsIgnoreOrder(rowIter, expectedRows, true);
13851385
}
13861386
}
1387-
1388-
@Test
1389-
void testWalModeWithDefaultMergeEngineAndAggregation() throws Exception {
1390-
String tableName = "wal_mode_pk_table";
1391-
// Create a table with WAL mode and default merge engine
1392-
tEnv.executeSql(
1393-
String.format(
1394-
"create table %s ("
1395-
+ " id int not null,"
1396-
+ " category string,"
1397-
+ " amount bigint,"
1398-
+ " primary key (id) not enforced"
1399-
+ ") with ('table.changelog.image' = 'wal')",
1400-
tableName));
1401-
1402-
// Insert initial data
1403-
tEnv.executeSql(
1404-
String.format(
1405-
"INSERT INTO %s VALUES "
1406-
+ "(1, 'A', 100), "
1407-
+ "(2, 'B', 200), "
1408-
+ "(3, 'A', 150), "
1409-
+ "(4, 'B', 250)",
1410-
tableName))
1411-
.await();
1412-
1413-
// Use batch mode to update and delete records
1414-
tBatchEnv.executeSql("UPDATE " + tableName + " SET amount = 120 WHERE id = 1").await();
1415-
tBatchEnv.executeSql("UPDATE " + tableName + " SET amount = 180 WHERE id = 3").await();
1416-
tBatchEnv.executeSql("DELETE FROM " + tableName + " WHERE id = 4").await();
1417-
1418-
// Do aggregation on the table and verify ChangelogNormalize node is generated
1419-
String aggQuery =
1420-
String.format(
1421-
"SELECT category, SUM(amount) as total_amount FROM %s /*+ OPTIONS('scan.startup.mode' = 'earliest') */ GROUP BY category",
1422-
tableName);
1423-
1424-
// Explain the aggregation query to check for ChangelogNormalize
1425-
String aggPlan = tEnv.explainSql(aggQuery);
1426-
// ChangelogNormalize should be present to normalize the changelog for aggregation
1427-
// In Flink, when the source produces changelog with primary key semantics (I, UA, D),
1428-
// a ChangelogNormalize operator is inserted before aggregation
1429-
assertThat(aggPlan).contains("ChangelogNormalize");
1430-
1431-
// Execute the aggregation and verify the result
1432-
CloseableIterator<Row> aggIter = tEnv.executeSql(aggQuery).collect();
1433-
1434-
// Expected aggregation results:
1435-
// Category A: 120 (id=1) + 180 (id=3) = 300
1436-
// Category B: 200 (id=2) = 200 (id=4 was deleted)
1437-
List<String> expectedAggResults =
1438-
Arrays.asList(
1439-
"+I[A, 100]",
1440-
"-U[A, 100]",
1441-
"+U[A, 250]",
1442-
"-U[A, 250]",
1443-
"+U[A, 150]",
1444-
"-U[A, 150]",
1445-
"+U[A, 270]",
1446-
"-U[A, 270]",
1447-
"+U[A, 120]",
1448-
"-U[A, 120]",
1449-
"+U[A, 300]",
1450-
"+I[B, 250]",
1451-
"-D[B, 250]",
1452-
"+I[B, 200]");
1453-
1454-
// Collect results with timeout
1455-
assertResultsIgnoreOrder(aggIter, expectedAggResults, true);
1456-
}
14571387
}

fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -185,8 +185,7 @@ public KvTablet getOrCreateKv(
185185
kvFormat,
186186
merger,
187187
arrowCompressionInfo,
188-
schemaGetter,
189-
tableConfig.getChangelogImage());
188+
schemaGetter);
190189
currentKvs.put(tableBucket, tablet);
191190

192191
LOG.info(
@@ -278,8 +277,9 @@ public KvTablet loadKv(File tabletDir, SchemaGetter schemaGetter) throws Excepti
278277
TablePath tablePath = physicalTablePath.getTablePath();
279278
TableInfo tableInfo = getTableInfo(zkClient, tablePath);
280279

281-
TableConfig tableConfig = tableInfo.getTableConfig();
282-
RowMerger rowMerger = RowMerger.create(tableConfig, tableConfig.getKvFormat());
280+
RowMerger rowMerger =
281+
RowMerger.create(
282+
tableInfo.getTableConfig(), tableInfo.getTableConfig().getKvFormat());
283283
KvTablet kvTablet =
284284
KvTablet.create(
285285
physicalTablePath,
@@ -290,11 +290,10 @@ public KvTablet loadKv(File tabletDir, SchemaGetter schemaGetter) throws Excepti
290290
serverMetricGroup,
291291
arrowBufferAllocator,
292292
memorySegmentPool,
293-
tableConfig.getKvFormat(),
293+
tableInfo.getTableConfig().getKvFormat(),
294294
rowMerger,
295-
tableConfig.getArrowCompressionInfo(),
296-
schemaGetter,
297-
tableConfig.getChangelogImage());
295+
tableInfo.getTableConfig().getArrowCompressionInfo(),
296+
schemaGetter);
298297
if (this.currentKvs.containsKey(tableBucket)) {
299298
throw new IllegalStateException(
300299
String.format(

0 commit comments

Comments
 (0)