Skip to content

Commit 10076a4

Browse files
committed
Support table.delete.behavior config to disable or ignore deletion on the table
1 parent a8f9574 commit 10076a4

File tree

13 files changed

+471
-12
lines changed

13 files changed

+471
-12
lines changed

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.fluss.annotation.PublicEvolving;
2222
import org.apache.fluss.compression.ArrowCompressionType;
2323
import org.apache.fluss.metadata.DataLakeFormat;
24+
import org.apache.fluss.metadata.DeleteBehavior;
2425
import org.apache.fluss.metadata.KvFormat;
2526
import org.apache.fluss.metadata.LogFormat;
2627
import org.apache.fluss.metadata.MergeEngineType;
@@ -1361,6 +1362,18 @@ public class ConfigOptions {
13611362
"The column name of the version column for the `versioned` merge engine. "
13621363
+ "If the merge engine is set to `versioned`, the version column must be set.");
13631364

1365+
public static final ConfigOption<DeleteBehavior> TABLE_DELETE_BEHAVIOR =
1366+
key("table.delete.behavior")
1367+
.enumType(DeleteBehavior.class)
1368+
.defaultValue(DeleteBehavior.ALLOW)
1369+
.withDescription(
1370+
"Defines the delete behavior for the primary key table. "
1371+
+ "The supported delete behaviors are `allow`, `ignore`, and `disable`. "
1372+
+ "The `allow` behavior allows normal delete operations (default). "
1373+
+ "The `ignore` behavior silently skips delete requests without error. "
1374+
+ "The `disable` behavior rejects delete requests with a clear error message. "
1375+
+ "For tables with FIRST_ROW or VERSIONED merge engines, this option defaults to `ignore`.");
1376+
13641377
// ------------------------------------------------------------------------
13651378
// ConfigOptions for Kv
13661379
// ------------------------------------------------------------------------

fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.fluss.annotation.PublicEvolving;
2121
import org.apache.fluss.compression.ArrowCompressionInfo;
2222
import org.apache.fluss.metadata.DataLakeFormat;
23+
import org.apache.fluss.metadata.DeleteBehavior;
2324
import org.apache.fluss.metadata.KvFormat;
2425
import org.apache.fluss.metadata.LogFormat;
2526
import org.apache.fluss.metadata.MergeEngineType;
@@ -111,6 +112,11 @@ public Optional<String> getMergeEngineVersionColumn() {
111112
return config.getOptional(ConfigOptions.TABLE_MERGE_ENGINE_VERSION_COLUMN);
112113
}
113114

115+
/** Gets the delete behavior of the table. */
116+
public DeleteBehavior getDeleteBehavior() {
117+
return config.get(ConfigOptions.TABLE_DELETE_BEHAVIOR);
118+
}
119+
114120
/** Gets the Arrow compression type and compression level of the table. */
115121
public ArrowCompressionInfo getArrowCompressionInfo() {
116122
return ArrowCompressionInfo.fromConf(config);
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.metadata;
19+
20+
/**
21+
* The delete behavior for the primary key table.
22+
*
23+
* <p>This enum defines how delete operations should be handled for primary key tables. It provides
24+
* different strategies to control whether deletions are allowed, ignored, or explicitly disabled.
25+
*
26+
* @since 0.8
27+
*/
28+
public enum DeleteBehavior {
29+
30+
/**
31+
* Allow normal delete operations. This is the default behavior for primary key tables without
32+
* merge engines.
33+
*/
34+
ALLOW,
35+
36+
/**
37+
* Silently ignore delete requests without error. Delete operations will be dropped at the
38+
* server side, and no deletion will be performed. This is the default behavior for tables with
39+
* FIRST_ROW or VERSIONED merge engines.
40+
*/
41+
IGNORE,
42+
43+
/**
44+
* Reject delete requests with a clear error message. Any attempt to perform delete operations
45+
* will result in an exception being thrown.
46+
*/
47+
DISABLE;
48+
49+
/** Creates a {@link DeleteBehavior} from the given string. */
50+
public static DeleteBehavior fromString(String behavior) {
51+
switch (behavior.toUpperCase()) {
52+
case "ALLOW":
53+
return ALLOW;
54+
case "IGNORE":
55+
return IGNORE;
56+
case "DISABLE":
57+
return DISABLE;
58+
default:
59+
throw new IllegalArgumentException("Unsupported delete behavior: " + behavior);
60+
}
61+
}
62+
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959

6060
import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_FORMAT;
6161
import static org.apache.fluss.config.FlussConfigUtils.CLIENT_PREFIX;
62+
import static org.apache.fluss.config.FlussConfigUtils.TABLE_PREFIX;
6263
import static org.apache.fluss.flink.catalog.FlinkCatalog.LAKE_TABLE_SPLITTER;
6364
import static org.apache.fluss.flink.utils.DataLakeUtils.getDatalakeFormat;
6465
import static org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils.getBucketKeyIndexes;
@@ -225,10 +226,10 @@ private static Configuration toFlussClientConfig(
225226
ConfigOptions.BOOTSTRAP_SERVERS.key(),
226227
tableOptions.get(FlinkConnectorOptions.BOOTSTRAP_SERVERS.key()));
227228

228-
// forward all client configs
229+
// forward all client configs and table configs
229230
tableOptions.forEach(
230231
(key, value) -> {
231-
if (key.startsWith(CLIENT_PREFIX)) {
232+
if (key.startsWith(CLIENT_PREFIX) || key.startsWith(TABLE_PREFIX)) {
232233
flussConfig.setString(key, value);
233234
}
234235
});

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkTableSink.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,15 @@
1717

1818
package org.apache.fluss.flink.sink;
1919

20+
import org.apache.fluss.config.ConfigOptions;
2021
import org.apache.fluss.config.Configuration;
2122
import org.apache.fluss.flink.sink.serializer.RowDataSerializationSchema;
2223
import org.apache.fluss.flink.sink.writer.FlinkSinkWriter;
2324
import org.apache.fluss.flink.utils.PushdownUtils;
2425
import org.apache.fluss.flink.utils.PushdownUtils.FieldEqual;
2526
import org.apache.fluss.flink.utils.PushdownUtils.ValueConversion;
2627
import org.apache.fluss.metadata.DataLakeFormat;
28+
import org.apache.fluss.metadata.DeleteBehavior;
2729
import org.apache.fluss.metadata.MergeEngineType;
2830
import org.apache.fluss.metadata.TablePath;
2931
import org.apache.fluss.row.GenericRow;
@@ -360,6 +362,15 @@ private void validateUpdatableAndDeletable() {
360362
"Table %s uses the '%s' merge engine which does not support DELETE or UPDATE statements.",
361363
tablePath, mergeEngineType));
362364
}
365+
366+
// Check table-level delete behavior configuration
367+
DeleteBehavior deleteBehavior = flussConfig.get(ConfigOptions.TABLE_DELETE_BEHAVIOR);
368+
if (deleteBehavior == DeleteBehavior.DISABLE) {
369+
throw new UnsupportedOperationException(
370+
String.format(
371+
"Table %s has delete behavior set to 'disable' which does not support DELETE statements.",
372+
tablePath));
373+
}
363374
}
364375

365376
private Map<Integer, LogicalType> getPrimaryKeyTypes() {

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.fluss.flink.source;
1919

20+
import org.apache.fluss.config.ConfigOptions;
2021
import org.apache.fluss.config.Configuration;
2122
import org.apache.fluss.flink.FlinkConnectorOptions;
2223
import org.apache.fluss.flink.source.deserializer.RowDataDeserializationSchema;
@@ -30,6 +31,7 @@
3031
import org.apache.fluss.flink.utils.PushdownUtils.FieldEqual;
3132
import org.apache.fluss.lake.source.LakeSource;
3233
import org.apache.fluss.lake.source.LakeSplit;
34+
import org.apache.fluss.metadata.DeleteBehavior;
3335
import org.apache.fluss.metadata.MergeEngineType;
3436
import org.apache.fluss.metadata.TablePath;
3537
import org.apache.fluss.predicate.GreaterOrEqual;
@@ -74,6 +76,7 @@
7476
import org.apache.flink.table.functions.LookupFunction;
7577
import org.apache.flink.table.types.DataType;
7678
import org.apache.flink.table.types.logical.LogicalType;
79+
import org.apache.flink.types.RowKind;
7780
import org.slf4j.Logger;
7881
import org.slf4j.LoggerFactory;
7982

@@ -203,10 +206,29 @@ public ChangelogMode getChangelogMode() {
203206
} else {
204207
if (hasPrimaryKey()) {
205208
// pk table
206-
if (mergeEngineType == MergeEngineType.FIRST_ROW) {
209+
if (mergeEngineType == MergeEngineType.FIRST_ROW
210+
|| mergeEngineType == MergeEngineType.VERSIONED) {
207211
return ChangelogMode.insertOnly();
208212
} else {
209-
return ChangelogMode.all();
213+
// Check delete behavior configuration
214+
Configuration tableConf = Configuration.fromMap(tableOptions);
215+
DeleteBehavior deleteBehavior =
216+
tableConf.get(ConfigOptions.TABLE_DELETE_BEHAVIOR);
217+
if (deleteBehavior == DeleteBehavior.IGNORE) {
218+
// If delete operations are ignored, only insert and update are relevant
219+
return ChangelogMode.newBuilder()
220+
.addContainedKind(RowKind.INSERT)
221+
.addContainedKind(RowKind.UPDATE_BEFORE)
222+
.addContainedKind(RowKind.UPDATE_AFTER)
223+
.build();
224+
} else if (deleteBehavior == DeleteBehavior.DISABLE) {
225+
throw new UnsupportedOperationException(
226+
String.format(
227+
"Table %s has delete behavior set to 'disable' which does not support DELETE statements.",
228+
tablePath));
229+
} else {
230+
return ChangelogMode.all();
231+
}
210232
}
211233
} else {
212234
// append only

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1095,4 +1095,107 @@ public InsertAndExpectValues(List<String> insertValues, List<String> expectedRow
10951095
this.expectedRows = expectedRows;
10961096
}
10971097
}
1098+
1099+
@Test
1100+
void testDeleteBehaviorDisable() {
1101+
String tableName = "delete_behavior_disable_table";
1102+
tBatchEnv.executeSql(
1103+
String.format(
1104+
"create table %s ("
1105+
+ " a int not null,"
1106+
+ " b bigint null, "
1107+
+ " c string null, "
1108+
+ " primary key (a) not enforced"
1109+
+ ") with ('table.delete.behavior' = 'disable')",
1110+
tableName));
1111+
1112+
TablePath tablePath = TablePath.of(DEFAULT_DB, tableName);
1113+
assertThatThrownBy(
1114+
() ->
1115+
tBatchEnv
1116+
.executeSql("DELETE FROM " + tableName + " WHERE a = 1")
1117+
.await())
1118+
.isInstanceOf(UnsupportedOperationException.class)
1119+
.hasMessage(
1120+
String.format(
1121+
"Table %s has delete behavior set to 'disable' which does not support DELETE statements.",
1122+
tablePath));
1123+
}
1124+
1125+
@Test
1126+
void testDeleteBehaviorIgnore() throws Exception {
1127+
String tableName = "delete_behavior_ignore_table";
1128+
tEnv.executeSql(
1129+
String.format(
1130+
"create table %s ("
1131+
+ " a int not null primary key not enforced,"
1132+
+ " b string"
1133+
+ ") with ('table.delete.behavior' = 'ignore')",
1134+
tableName));
1135+
1136+
// Insert some data
1137+
tEnv.executeSql(
1138+
String.format(
1139+
"INSERT INTO %s VALUES (1, 'test1'), (2, 'test2'), (3, 'test3')",
1140+
tableName))
1141+
.await();
1142+
1143+
// Create a changelog stream with deletes that should be ignored
1144+
org.apache.flink.table.api.Table changelogData =
1145+
tEnv.fromChangelogStream(
1146+
env.fromCollection(
1147+
Arrays.asList(
1148+
Row.ofKind(RowKind.INSERT, 4, "test4"),
1149+
Row.ofKind(RowKind.DELETE, 1, "test1"), // Should be ignored
1150+
Row.ofKind(RowKind.UPDATE_AFTER, 2, "updated_test2"))));
1151+
tEnv.createTemporaryView("changelog_source", changelogData);
1152+
1153+
// Insert changelog data
1154+
tEnv.executeSql(String.format("INSERT INTO %s SELECT * FROM changelog_source", tableName));
1155+
1156+
CloseableIterator<Row> rowIter =
1157+
tEnv.executeSql(String.format("select * from %s", tableName)).collect();
1158+
1159+
// Row with a=1 should still exist (delete was ignored)
1160+
List<String> expectedRows =
1161+
Arrays.asList(
1162+
"+I[1, test1]", // Delete was ignored
1163+
"+I[2, test2]",
1164+
"-U[2, test2]",
1165+
"+U[2, updated_test2]",
1166+
"+I[3, test3]",
1167+
"+I[4, test4]");
1168+
assertResultsIgnoreOrder(rowIter, expectedRows, true);
1169+
}
1170+
1171+
@Test
1172+
void testDeleteBehaviorAllow() throws Exception {
1173+
String tableName = "delete_behavior_allow_table";
1174+
tBatchEnv.executeSql(
1175+
String.format(
1176+
"create table %s ("
1177+
+ " a int not null,"
1178+
+ " b bigint null, "
1179+
+ " c string null, "
1180+
+ " primary key (a) not enforced"
1181+
+ ") with ('table.delete.behavior' = 'allow')",
1182+
tableName));
1183+
1184+
// Insert and delete should work normally
1185+
tBatchEnv
1186+
.executeSql(
1187+
String.format(
1188+
"INSERT INTO %s VALUES (1, 100, 'test1'), (2, 200, 'test2')",
1189+
tableName))
1190+
.await();
1191+
1192+
tBatchEnv.executeSql("DELETE FROM " + tableName + " WHERE a = 1").await();
1193+
1194+
CloseableIterator<Row> rowIter =
1195+
tEnv.executeSql(String.format("select * from %s", tableName)).collect();
1196+
1197+
List<String> expectedRows =
1198+
Arrays.asList("+I[1, 100, test1]", "+I[2, 200, test2]", "-D[1, 100, test1]");
1199+
assertResultsIgnoreOrder(rowIter, expectedRows, true);
1200+
}
10981201
}

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import org.apache.fluss.lake.lakestorage.LakeCatalog;
3434
import org.apache.fluss.metadata.DataLakeFormat;
3535
import org.apache.fluss.metadata.DatabaseDescriptor;
36+
import org.apache.fluss.metadata.DeleteBehavior;
37+
import org.apache.fluss.metadata.MergeEngineType;
3638
import org.apache.fluss.metadata.PartitionSpec;
3739
import org.apache.fluss.metadata.ResolvedPartitionSpec;
3840
import org.apache.fluss.metadata.TableChange;
@@ -389,6 +391,20 @@ private TableDescriptor applySystemDefaults(TableDescriptor tableDescriptor) {
389391
ConfigOptions.TABLE_DATALAKE_ENABLED.key()));
390392
}
391393

394+
// For tables with first_row or versioned merge engines, automatically set to IGNORE if
395+
// delete behavior is ALLOW
396+
Configuration tableConf = Configuration.fromMap(tableDescriptor.getProperties());
397+
DeleteBehavior deleteBehavior = tableConf.get(ConfigOptions.TABLE_DELETE_BEHAVIOR);
398+
MergeEngineType mergeEngine = tableConf.get(ConfigOptions.TABLE_MERGE_ENGINE);
399+
if (mergeEngine == MergeEngineType.FIRST_ROW || mergeEngine == MergeEngineType.VERSIONED) {
400+
if (deleteBehavior == DeleteBehavior.ALLOW) {
401+
Map<String, String> newProperties = new HashMap<>(newDescriptor.getProperties());
402+
newProperties.put(
403+
ConfigOptions.TABLE_DELETE_BEHAVIOR.key(), DeleteBehavior.IGNORE.name());
404+
newDescriptor = newDescriptor.withProperties(newProperties);
405+
}
406+
}
407+
392408
return newDescriptor;
393409
}
394410

0 commit comments

Comments
 (0)